Merge branch 'master' into dev-tests

This commit is contained in:
Sergey Chubaryan 2025-02-17 00:55:12 +03:00
commit fe538e631f
37 changed files with 503 additions and 637 deletions

View File

@ -186,7 +186,7 @@ func (a *App) Run(p RunParams) {
}()
}
srv := server.New(
srv := server.NewServer(
server.NewServerOpts{
DebugMode: debugMode,
Logger: logger,

View File

@ -2,16 +2,17 @@ package handlers
import (
"backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/pkg/logger"
"encoding/json"
"context"
"github.com/gin-gonic/gin"
)
type createUserInput struct {
Email string `json:"email"`
Password string `json:"password"`
Name string `json:"name"`
Email string `json:"email" validate:"required,email"`
Password string `json:"password" validate:"required"`
Name string `json:"name" validate:"required"`
}
type createUserOutput struct {
@ -20,54 +21,27 @@ type createUserOutput struct {
Name string `json:"name"`
}
func NewUserCreateHandler(logger logger.Logger, userService services.UserService) gin.HandlerFunc {
return func(c *gin.Context) {
ctxLogger := logger.WithContext(c)
func NewUserCreateHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
return httpserver.WrapGin(log,
func(ctx context.Context, input createUserInput) (createUserOutput, error) {
user, err := userService.CreateUser(ctx,
services.UserCreateParams{
Email: input.Email,
Password: input.Password,
Name: input.Name,
},
)
params := createUserInput{}
if err := c.ShouldBindJSON(&params); err != nil {
ctxLogger.Error().Err(err).Msg("bad input body model")
c.Data(400, "plain/text", []byte(err.Error()))
return
}
out := createUserOutput{}
if err != nil {
return out, err
}
dto, err := userService.CreateUser(
c,
services.UserCreateParams{
Email: params.Email,
Password: params.Password,
Name: params.Name,
},
)
if err == services.ErrUserExists {
ctxLogger.Error().Err(err).Msg("user already exists")
c.Data(400, "plain/text", []byte(err.Error()))
return
}
if err == services.ErrUserBadPassword {
ctxLogger.Error().Err(err).Msg("password does not satisfy requirements")
c.Data(400, "plain/text", []byte(err.Error()))
return
}
if err != nil {
ctxLogger.Error().Err(err).Msg("unexpected create user error")
c.Data(500, "plain/text", []byte(err.Error()))
return
}
resultBody, err := json.Marshal(
createUserOutput{
Id: dto.Id,
Email: dto.Email,
Name: dto.Name,
},
)
if err != nil {
ctxLogger.Error().Err(err).Msg("marshal user model error")
c.Data(500, "plain/text", []byte(err.Error()))
return
}
c.Data(200, "application/json", resultBody)
}
return createUserOutput{
Id: user.Id,
Email: user.Email,
Name: user.Name,
}, nil
},
)
}

View File

@ -2,14 +2,15 @@ package handlers
import (
"backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/pkg/logger"
"encoding/json"
"context"
"github.com/gin-gonic/gin"
)
type loginUserInput struct {
Login string `json:"email"`
Login string `json:"email" validate:"required,email"`
Password string `json:"password"`
}
@ -17,43 +18,17 @@ type loginUserOutput struct {
Token string `json:"token"`
}
func NewUserLoginHandler(logger logger.Logger, userService services.UserService) gin.HandlerFunc {
return func(c *gin.Context) {
ctxLogger := logger.WithContext(c).WithPrefix("NewUserLoginHandler")
func NewUserLoginHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
return httpserver.WrapGin(log,
func(ctx context.Context, input loginUserInput) (loginUserOutput, error) {
token, err := userService.AuthenticateUser(ctx, input.Login, input.Password)
if err != nil {
return loginUserOutput{}, err
}
params := loginUserInput{}
if err := c.ShouldBindJSON(&params); err != nil {
ctxLogger.Error().Err(err).Msg("bad input body model")
c.AbortWithError(400, err)
return
}
token, err := userService.AuthenticateUser(c, params.Login, params.Password)
if err == services.ErrUserNotExists {
ctxLogger.Error().Err(err).Msg("user does not exist")
c.AbortWithError(400, err)
return
}
if err == services.ErrUserWrongPassword {
ctxLogger.Error().Err(err).Msg("wrong password")
c.AbortWithError(400, err)
return
}
if err != nil {
ctxLogger.Error().Err(err).Msg("AuthenticateUser internal error")
c.AbortWithError(500, err)
return
}
resultBody, err := json.Marshal(loginUserOutput{
Token: token,
})
if err != nil {
ctxLogger.Error().Err(err).Msg("marshal json internal error")
c.AbortWithError(500, err)
return
}
c.Data(200, "application/json", resultBody)
}
return loginUserOutput{
Token: token,
}, nil
},
)
}

View File

@ -0,0 +1,71 @@
package handlers
import (
"backend/internal/core/services"
"backend/pkg/logger"
"html/template"
"github.com/gin-gonic/gin"
)
type HtmlTemplate struct {
TabTitle string
Title string
Text string
Link string
LinkText string
}
const htmlTemplate = `
<html>
<head>
<title>{{.TabTitle}}</title>
</head>
<body>
{{if .Title}}
<h1>{{.Title}}</h1>
{{end}}
<h3>{{.Text}}</h3>
{{if .Link}}
<a href="{{.Link}}">{{.LinkText}}</a>
{{end}}
</body>
</html>
`
func NewUserVerifyEmailHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
template, err := template.New("verify-email").Parse(htmlTemplate)
if err != nil {
log.Fatal().Err(err).Msg("Error parsing template")
}
return func(c *gin.Context) {
tmp := HtmlTemplate{
TabTitle: "Verify Email",
Text: "Error verifying email",
}
token, ok := c.GetQuery("token")
if !ok || token == "" {
log.Error().Err(err).Msg("No token in query param")
template.Execute(c.Writer, tmp)
c.Status(400)
return
}
err := userService.VerifyEmail(c, token)
if err != nil {
log.Error().Err(err).Msg("Error verifying email")
template.Execute(c.Writer, tmp)
c.Status(400)
return
}
tmp.Text = "Email successfully verified"
template.Execute(c.Writer, tmp)
c.Status(200)
}
}

View File

@ -15,7 +15,7 @@ func NewAuthMiddleware(userService services.UserService) gin.HandlerFunc {
return
}
user, err := userService.ValidateToken(ctx, token)
user, err := userService.ValidateAuthToken(ctx, token)
if err == services.ErrUserWrongToken || err == services.ErrUserNotExists {
ctx.AbortWithError(403, err)
return

View File

@ -1,56 +0,0 @@
package middleware
import (
"backend/internal/integrations"
log "backend/pkg/logger"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)
func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus *integrations.Prometheus) gin.HandlerFunc {
return func(c *gin.Context) {
prometheus.RequestInc()
defer prometheus.RequestDec()
requestId := c.GetHeader("X-Request-Id")
if requestId == "" {
requestId = uuid.New().String()
}
c.Header("X-Request-Id", requestId)
c.Header("Access-Control-Allow-Origin", "*")
log.SetCtxRequestId(c, requestId)
path := c.Request.URL.Path
if c.Request.URL.RawQuery != "" {
path = path + "?" + c.Request.URL.RawQuery
}
start := time.Now()
c.Next()
latency := time.Since(start)
prometheus.AddRequestTime(float64(latency.Microseconds()))
method := c.Request.Method
statusCode := c.Writer.Status()
if statusCode >= 200 && statusCode < 400 {
return
}
ctxLogger := logger.WithContext(c)
if statusCode >= 400 && statusCode < 500 {
prometheus.Add4xxError()
ctxLogger.Warning().Msgf("Request %s %s %d %v", method, path, statusCode, latency)
return
}
prometheus.Add5xxError()
ctxLogger.Error().Msgf("Request %s %s %d %v", method, path, statusCode, latency)
}
}

View File

@ -5,21 +5,14 @@ import (
"backend/cmd/backend/server/middleware"
"backend/cmd/backend/server/utils"
"backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/internal/integrations"
"backend/pkg/logger"
"context"
"fmt"
"net"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel/trace"
)
type Server struct {
logger logger.Logger
ginEngine *gin.Engine
}
type NewServerOpts struct {
DebugMode bool
Logger logger.Logger
@ -28,7 +21,7 @@ type NewServerOpts struct {
Tracer trace.Tracer
}
func New(opts NewServerOpts) *Server {
func NewServer(opts NewServerOpts) *httpserver.Server {
if !opts.DebugMode {
gin.SetMode(gin.ReleaseMode)
}
@ -36,53 +29,42 @@ func New(opts NewServerOpts) *Server {
r := gin.New()
r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context()
r.Static("/webapp", "./webapp")
// r.Static("/webapp", "./webapp")
r.GET("/health", handlers.NewDummyHandler())
prometheus := integrations.NewPrometheus()
r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler()))
r.Use(middleware.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode))
r.Use(middleware.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus))
r.Use(middleware.NewTracingMiddleware(opts.Tracer))
r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode))
r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus))
r.Use(httpserver.NewTracingMiddleware(opts.Tracer))
userGroup := r.Group("/user")
userGroup.POST("/create", handlers.NewUserCreateHandler(opts.Logger, opts.UserService))
userGroup.POST("/login", handlers.NewUserLoginHandler(opts.Logger, opts.UserService))
r.GET("/verify-user", handlers.NewUserVerifyEmailHandler(opts.Logger, opts.UserService))
dummyGroup := r.Group("/dummy")
api := r.Group("/api")
v1 := api.Group("/v1")
userGroup := v1.Group("/user")
{
userGroup.POST("/create", handlers.NewUserCreateHandler(opts.Logger, opts.UserService))
userGroup.POST("/login", handlers.NewUserLoginHandler(opts.Logger, opts.UserService))
}
dummyGroup := v1.Group("/dummy")
dummyGroup.Use(middleware.NewAuthMiddleware(opts.UserService))
{
dummyGroup.Use(middleware.NewAuthMiddleware(opts.UserService))
dummyGroup.GET("", handlers.NewDummyHandler())
dummyGroup.POST("/forgot-password", func(c *gin.Context) {
user := utils.GetUserFromRequest(c)
opts.UserService.ForgotPassword(c, user.Id)
opts.UserService.SendEmailForgotPassword(c, user.Id)
})
}
return &Server{
logger: opts.Logger,
ginEngine: r,
}
}
func (s *Server) Run(ctx context.Context, port uint16) {
listenAddr := fmt.Sprintf("0.0.0.0:%d", port)
s.logger.Log().Msgf("server listening on %s", listenAddr)
listener, err := (&net.ListenConfig{}).Listen(ctx, "tcp", listenAddr)
if err != nil {
s.logger.Fatal().Err(err).Msg("can not create network listener")
}
go func() {
<-ctx.Done()
s.logger.Log().Msg("stopping tcp listener...")
listener.Close()
}()
err = s.ginEngine.RunListener(listener)
if err != nil && err == net.ErrClosed {
s.logger.Fatal().Err(err).Msg("server stopped with error")
}
return httpserver.New(
httpserver.NewServerOpts{
Logger: opts.Logger,
HttpServer: r,
},
)
}

31
cmd/shortlinks/grpc.go Normal file
View File

@ -0,0 +1,31 @@
package main
import (
"backend/internal/core/services"
"backend/internal/grpc_server/shortlinks"
httpserver "backend/internal/http_server"
"backend/pkg/logger"
"context"
)
func NewShortlinksGrpc(log logger.Logger, shortlinkService services.ShortlinkService, host string) *ShortlinksGrpc {
return &ShortlinksGrpc{
handler: NewCreateHandler(log, shortlinkService, host),
}
}
type ShortlinksGrpc struct {
shortlinks.UnimplementedShortlinksServer
handler httpserver.Handler[shortlinkCreateInput, shortlinkCreateOutput]
}
func (s *ShortlinksGrpc) Create(ctx context.Context, req *shortlinks.CreateRequest) (*shortlinks.CreateResponse, error) {
output, err := s.handler(ctx, shortlinkCreateInput{req.Url})
if err != nil {
return nil, err
}
return &shortlinks.CreateResponse{
Link: output.Link,
}, nil
}

View File

@ -2,93 +2,52 @@ package main
import (
"backend/internal/core/services"
"backend/internal/grpc_server/shortlinks"
httpserver "backend/internal/http_server"
"backend/pkg/logger"
"context"
"encoding/json"
"fmt"
"net/url"
"github.com/gin-gonic/gin"
)
type shortlinkCreateInput struct {
Url string `json:"url"`
}
type shortlinkCreateOutput struct {
Link string `json:"link"`
}
type ShortlinksGrpc struct {
shortlinks.UnimplementedShortlinksServer
log logger.Logger
host string
shortlinkService services.ShortlinkService
}
func NewCreateHandler(
log logger.Logger,
shortlinkService services.ShortlinkService,
host string,
) httpserver.Handler[shortlinkCreateInput, shortlinkCreateOutput] {
return func(ctx context.Context, input shortlinkCreateInput) (shortlinkCreateOutput, error) {
output := shortlinkCreateOutput{}
func (s *ShortlinksGrpc) Create(ctx context.Context, req *shortlinks.CreateRequest) (*shortlinks.CreateResponse, error) {
ctxLogger := s.log.WithContext(ctx)
rawUrl := req.GetUrl()
if rawUrl == "" {
ctxLogger.Error().Msg("url query param missing")
return nil, fmt.Errorf("url query param missing")
}
u, err := url.Parse(rawUrl)
if err != nil {
ctxLogger.Error().Err(err).Msg("error parsing url param")
return nil, err
}
u.Scheme = "https"
linkId, err := s.shortlinkService.CreateShortlink(ctx, u.String())
if err != nil {
ctxLogger.Error().Err(err).Msg("err creating shortlink")
return nil, err
}
return &shortlinks.CreateResponse{
Link: fmt.Sprintf("%s/s/%s", s.host, linkId),
}, nil
}
func NewShortlinkCreateHandler(logger logger.Logger, shortlinkService services.ShortlinkService, host string) gin.HandlerFunc {
return func(ctx *gin.Context) {
ctxLogger := logger.WithContext(ctx)
rawUrl := ctx.Query("url")
if rawUrl == "" {
ctxLogger.Error().Msg("url query param missing")
ctx.AbortWithError(400, fmt.Errorf("url query param missing"))
return
}
u, err := url.Parse(rawUrl)
u, err := url.Parse(input.Url)
if err != nil {
ctxLogger.Error().Err(err).Msg("error parsing url param")
ctx.Data(400, "plain/text", []byte(err.Error()))
return
return output, err
}
u.Scheme = "https"
linkId, err := shortlinkService.CreateShortlink(ctx, u.String())
if err != nil {
ctxLogger.Error().Err(err).Msg("err creating shortlink")
ctx.Data(500, "plain/text", []byte(err.Error()))
return
return output, err
}
resultBody, err := json.Marshal(shortlinkCreateOutput{
return shortlinkCreateOutput{
Link: fmt.Sprintf("%s/s/%s", host, linkId),
})
if err != nil {
ctxLogger.Error().Err(err).Msg("err marshalling shortlink")
ctx.AbortWithError(500, err)
return
}
ctx.Data(200, "application/json", resultBody)
}, nil
}
}
func NewShortlinkCreateHandler(log logger.Logger, shortlinkService services.ShortlinkService, host string) gin.HandlerFunc {
return httpserver.WrapGin(log, NewCreateHandler(log, shortlinkService, host))
}
func NewShortlinkResolveHandler(logger logger.Logger, shortlinkService services.ShortlinkService) gin.HandlerFunc {
return func(ctx *gin.Context) {
ctxLogger := logger.WithContext(ctx)

View File

@ -6,7 +6,6 @@ import (
grpcserver "backend/internal/grpc_server"
"backend/internal/grpc_server/shortlinks"
httpserver "backend/internal/http_server"
"backend/internal/http_server/middleware"
"backend/internal/integrations"
"backend/pkg/cache"
"backend/pkg/logger"
@ -94,22 +93,19 @@ func RunServer(ctx context.Context, log logger.Logger, tracer trace.Tracer, conf
ctx.Status(200)
})
r.Use(middleware.NewRecoveryMiddleware(log, prometheus, debugMode))
r.Use(middleware.NewRequestLogMiddleware(log, tracer, prometheus))
r.Use(middleware.NewTracingMiddleware(tracer))
r.Use(httpserver.NewRecoveryMiddleware(log, prometheus, debugMode))
r.Use(httpserver.NewRequestLogMiddleware(log, tracer, prometheus))
r.Use(httpserver.NewTracingMiddleware(tracer))
linkGroup := r.Group("/s")
linkGroup.POST("/new", NewShortlinkCreateHandler(log, shortlinkService, host))
linkGroup.GET("/:linkId", NewShortlinkResolveHandler(log, shortlinkService))
grpcObj := &ShortlinksGrpc{
log: log,
host: host,
shortlinkService: shortlinkService,
}
grpcUnderlying := grpc.NewServer()
shortlinks.RegisterShortlinksServer(grpcUnderlying, grpcObj)
shortlinks.RegisterShortlinksServer(
grpcUnderlying,
NewShortlinksGrpc(log, shortlinkService, host),
)
httpServer := httpserver.New(
httpserver.NewServerOpts{

View File

@ -34,6 +34,6 @@ scrape_configs:
- job_name: 'machine'
scrape_interval: 2s
static_configs:
- targets: ['host.docker.internal:9100']
- targets: ['node_exporter:9100']
labels:
group: 'backend'

View File

@ -56,15 +56,20 @@ services:
node_exporter:
image: quay.io/prometheus/node-exporter:latest
pid: host
command:
- '--path.rootfs=/host'
ports:
- 9100:9100
- '--path.procfs=/host/proc'
- '--path.rootfs=/rootfs'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
extra_hosts:
- "host.docker.internal:host-gateway"
pid: host
volumes:
- '/:/host:ro,rslave'
ports:
- 9100:9100
otel-collector:
image: otel/opentelemetry-collector-contrib:0.108.0
@ -97,12 +102,17 @@ services:
- tempo-init
kafka:
image: apache/kafka:3.8.0
image: &kafkaImage apache/kafka:3.8.0
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server http://127.0.0.1:9092 || exit 1"]
interval: 1s
timeout: 30s
retries: 30
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
@ -113,18 +123,48 @@ services:
KAFKA_NUM_PARTITIONS: 3
ports:
- 9092:9092
# - 9093:9093
# backend:
# build: .
# # dockerfile: ./dockerfile
# volumes:
# - ./:/app
# ports:
# - 8080:8080
kafka-init:
image: *kafkaImage
depends_on:
kafka:
condition: service_healthy
entrypoint: >
/bin/bash -c "/opt/kafka/bin/kafka-topics.sh --bootstrap-server http://kafka:9092 --create --topic events --partitions 6"
minio:
image: quay.io/minio/minio:latest
command: ["server", "/data", "--console-address", ":9001"]
healthcheck:
test: 'mc ready local'
interval: 1s
environment:
MINIO_ROOT_USER: miniouser
MINIO_ROOT_PASSWORD: miniouser
MINIO_ACCESS_KEY: miniokey
MINIO_SECRET_KEY: miniokey
ports:
- 9000:9000
- 9001:9001
volumes:
- minio-volume:/data
minio-init:
image: quay.io/minio/mc:latest
depends_on:
- minio
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 miniouser miniouser;
/usr/bin/mc mb minio/bucket;
/usr/bin/mc anonymous set public minio/bucket;
exit 0;
"
volumes:
postgres-volume:
grafana-volume:
tempo-volume:
prometheus-volume:
minio-volume:

View File

@ -5,8 +5,10 @@ import "time"
type ActionTokenTarget int
const (
ActionTokenTargetForgotPassword ActionTokenTarget = iota
_ ActionTokenTarget = iota
ActionTokenTargetForgotPassword
ActionTokenTargetLogin2FA
ActionTokenVerifyEmail
)
type ActionTokenDTO struct {

View File

@ -1,10 +1,11 @@
package models
type UserDTO struct {
Id string
Email string
Secret string
Name string
Id string
Email string
EmailVerified bool
Secret string
Name string
}
type UserUpdateDTO struct {

View File

@ -9,7 +9,8 @@ import (
type ActionTokenRepo interface {
CreateActionToken(ctx context.Context, dto models.ActionTokenDTO) (*models.ActionTokenDTO, error)
PopActionToken(ctx context.Context, userId, value string, target models.ActionTokenTarget) (*models.ActionTokenDTO, error)
GetActionToken(ctx context.Context, value string, target models.ActionTokenTarget) (*models.ActionTokenDTO, error)
DeleteActionToken(ctx context.Context, id string) error
}
func NewActionTokenRepo(db integrations.SqlDB) ActionTokenRepo {
@ -43,18 +44,17 @@ func (a *actionTokenRepo) CreateActionToken(ctx context.Context, dto models.Acti
}, nil
}
func (a *actionTokenRepo) PopActionToken(ctx context.Context, userId, value string, target models.ActionTokenTarget) (*models.ActionTokenDTO, error) {
query := `
delete
from action_tokens
where
user_id=$1 and value=$2 and target=$3
and CURRENT_TIMESTAMP < expiration
returning id;`
row := a.db.QueryRowContext(ctx, query, userId, value, target)
func (a *actionTokenRepo) GetActionToken(ctx context.Context, value string, target models.ActionTokenTarget) (*models.ActionTokenDTO, error) {
dto := &models.ActionTokenDTO{Value: value, Target: target}
id := ""
err := row.Scan(&id)
query := `
select id, user_id from action_tokens
where
value=$1 and target=$2
and CURRENT_TIMESTAMP < expiration;`
row := a.db.QueryRowContext(ctx, query, value, target)
err := row.Scan(&dto.Id, &dto.UserId)
if err == sql.ErrNoRows {
return nil, nil
}
@ -62,10 +62,13 @@ func (a *actionTokenRepo) PopActionToken(ctx context.Context, userId, value stri
return nil, err
}
return &models.ActionTokenDTO{
Id: id,
UserId: userId,
Value: value,
Target: target,
}, nil
return dto, nil
}
func (a *actionTokenRepo) DeleteActionToken(ctx context.Context, id string) error {
query := `delete from action_tokens where id=$1;`
if _, err := a.db.ExecContext(ctx, query, id); err != nil {
return err
}
return nil
}

View File

@ -16,7 +16,7 @@ type EventRepo struct {
kafka *integrations.Kafka
}
func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error {
func (e *EventRepo) sendEmail(ctx context.Context, email, actionToken, eventType string) error {
value := struct {
Email string `json:"email"`
Token string `json:"token"`
@ -29,5 +29,13 @@ func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionTo
return err
}
return e.kafka.SendMessage(ctx, "email_forgot_password", valueBytes)
return e.kafka.SendMessage(ctx, eventType, valueBytes)
}
func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error {
return e.sendEmail(ctx, email, actionToken, "email_forgot_password")
}
func (e *EventRepo) SendEmailVerifyEmail(ctx context.Context, email, actionToken string) error {
return e.sendEmail(ctx, email, actionToken, "email_verify_email")
}

View File

@ -20,6 +20,7 @@ import (
type UserRepo interface {
CreateUser(ctx context.Context, dto models.UserDTO) (*models.UserDTO, error)
UpdateUser(ctx context.Context, userId string, dto models.UserUpdateDTO) error
SetUserEmailVerified(ctx context.Context, userId string) error
GetUserById(ctx context.Context, id string) (*models.UserDTO, error)
GetUserByEmail(ctx context.Context, login string) (*models.UserDTO, error)
}
@ -66,15 +67,28 @@ func (u *userRepo) UpdateUser(ctx context.Context, userId string, dto models.Use
return nil
}
func (u *userRepo) SetUserEmailVerified(ctx context.Context, userId string) error {
_, span := u.tracer.Start(ctx, "postgres::SetUserEmailVerified")
defer span.End()
query := `update users set email_verified=true where id = $1;`
_, err := u.db.ExecContext(ctx, query, userId)
if err != nil {
return err
}
return nil
}
func (u *userRepo) GetUserById(ctx context.Context, id string) (*models.UserDTO, error) {
_, span := u.tracer.Start(ctx, "postgres::GetUserById")
defer span.End()
query := `select id, email, secret, name from users where id = $1;`
query := `select id, email, secret, name, email_verified from users where id = $1;`
row := u.db.QueryRowContext(ctx, query, id)
dto := &models.UserDTO{}
err := row.Scan(&dto.Id, &dto.Email, &dto.Secret, &dto.Name)
err := row.Scan(&dto.Id, &dto.Email, &dto.Secret, &dto.Name, &dto.EmailVerified)
if err == nil {
return dto, nil
}
@ -89,11 +103,11 @@ func (u *userRepo) GetUserByEmail(ctx context.Context, login string) (*models.Us
_, span := u.tracer.Start(ctx, "postgres::GetUserByEmail")
defer span.End()
query := `select id, email, secret, name from users where email = $1;`
query := `select id, email, secret, name, email_verified from users where email = $1;`
row := u.db.QueryRowContext(ctx, query, login)
dto := &models.UserDTO{}
err := row.Scan(&dto.Id, &dto.Email, &dto.Secret, &dto.Name)
err := row.Scan(&dto.Id, &dto.Email, &dto.Secret, &dto.Name, &dto.EmailVerified)
if err == nil {
return dto, nil
}

View File

@ -13,11 +13,12 @@ import (
)
var (
ErrUserNotExists = fmt.Errorf("no such user")
ErrUserExists = fmt.Errorf("user with this login already exists")
ErrUserWrongPassword = fmt.Errorf("wrong password")
ErrUserWrongToken = fmt.Errorf("bad user token")
ErrUserBadPassword = fmt.Errorf("password must contain at least 8 characters")
ErrUserNotExists = fmt.Errorf("no such user")
ErrUserExists = fmt.Errorf("user with this login already exists")
ErrUserWrongPassword = fmt.Errorf("wrong password")
ErrUserWrongToken = fmt.Errorf("bad user token")
ErrUserBadPassword = fmt.Errorf("password must contain at least 8 characters")
ErrUserEmailUnverified = fmt.Errorf("user has not verified email yet")
// ErrUserInternal = fmt.Errorf("unexpected error. contact tech support")
)
@ -28,9 +29,12 @@ const (
type UserService interface {
CreateUser(ctx context.Context, params UserCreateParams) (*models.UserDTO, error)
AuthenticateUser(ctx context.Context, login, password string) (string, error)
ValidateToken(ctx context.Context, tokenStr string) (*models.UserDTO, error)
ValidateAuthToken(ctx context.Context, tokenStr string) (*models.UserDTO, error)
VerifyEmail(ctx context.Context, actionToken string) error
SendEmailForgotPassword(ctx context.Context, userId string) error
SendEmailVerifyEmail(ctx context.Context, userId string) error
ForgotPassword(ctx context.Context, userId string) error
ChangePassword(ctx context.Context, userId, oldPassword, newPassword string) error
ChangePasswordWithToken(ctx context.Context, userId, actionToken, newPassword string) error
}
@ -87,6 +91,7 @@ func (u *userService) CreateUser(ctx context.Context, params UserCreateParams) (
if err != nil {
return nil, err
}
u.sendEmailVerifyEmail(ctx, result.Id, user.Email)
u.deps.UserCache.Set(result.Id, *result, cache.Expiration{Ttl: userCacheTtl})
@ -106,6 +111,10 @@ func (u *userService) AuthenticateUser(ctx context.Context, email, password stri
return "", ErrUserWrongPassword
}
if !user.EmailVerified {
return "", ErrUserEmailUnverified
}
payload := utils.JwtPayload{UserId: user.Id}
jwt, err := u.deps.Jwt.Create(payload)
if err != nil {
@ -117,8 +126,27 @@ func (u *userService) AuthenticateUser(ctx context.Context, email, password stri
return jwt, nil
}
func (u *userService) ForgotPassword(ctx context.Context, userId string) error {
user, err := u.getUserById(ctx, userId)
func (u *userService) VerifyEmail(ctx context.Context, actionToken string) error {
token, err := u.deps.ActionTokenRepo.GetActionToken(ctx, actionToken, models.ActionTokenVerifyEmail)
if err != nil {
return err
}
if token == nil {
return fmt.Errorf("wrong action token")
}
if err := u.deps.UserRepo.SetUserEmailVerified(ctx, token.UserId); err != nil {
return nil
}
//TODO: log warnings somehow
u.deps.ActionTokenRepo.DeleteActionToken(ctx, token.Id)
return nil
}
func (u *userService) SendEmailForgotPassword(ctx context.Context, email string) error {
// user, err := u.getUserById(ctx, userId)
user, err := u.deps.UserRepo.GetUserByEmail(ctx, email)
if err != nil {
return err
}
@ -139,21 +167,54 @@ func (u *userService) ForgotPassword(ctx context.Context, userId string) error {
return u.deps.EventRepo.SendEmailForgotPassword(ctx, user.Email, actionToken.Value)
}
func (u *userService) sendEmailVerifyEmail(ctx context.Context, userId, email string) error {
actionToken, err := u.deps.ActionTokenRepo.CreateActionToken(
ctx,
models.ActionTokenDTO{
UserId: userId,
Value: uuid.New().String(),
Target: models.ActionTokenVerifyEmail,
Expiration: time.Now().Add(1 * time.Hour),
},
)
if err != nil {
return err
}
return u.deps.EventRepo.SendEmailVerifyEmail(ctx, email, actionToken.Value)
}
func (u *userService) SendEmailVerifyEmail(ctx context.Context, email string) error {
//user, err := u.getUserById(ctx, userId)
user, err := u.deps.UserRepo.GetUserByEmail(ctx, email)
if err != nil {
return err
}
return u.sendEmailVerifyEmail(ctx, user.Id, user.Email)
}
func (u *userService) ChangePasswordWithToken(ctx context.Context, userId, actionToken, newPassword string) error {
user, err := u.getUserById(ctx, userId)
if err != nil {
return err
}
code, err := u.deps.ActionTokenRepo.PopActionToken(ctx, userId, actionToken, models.ActionTokenTargetForgotPassword)
token, err := u.deps.ActionTokenRepo.GetActionToken(ctx, actionToken, models.ActionTokenTargetForgotPassword)
if err != nil {
return err
}
if code == nil {
return fmt.Errorf("wrong user access code")
if token == nil {
return fmt.Errorf("wrong action token")
}
return u.updatePassword(ctx, *user, newPassword)
if err := u.updatePassword(ctx, *user, newPassword); err != nil {
return err
}
//TODO: log warnings somehow
u.deps.ActionTokenRepo.DeleteActionToken(ctx, token.Id)
return nil
}
func (u *userService) ChangePassword(ctx context.Context, userId, oldPassword, newPassword string) error {
@ -205,7 +266,7 @@ func (u *userService) getUserById(ctx context.Context, userId string) (*models.U
return user, nil
}
func (u *userService) ValidateToken(ctx context.Context, tokenStr string) (*models.UserDTO, error) {
func (u *userService) ValidateAuthToken(ctx context.Context, tokenStr string) (*models.UserDTO, error) {
if userId, ok := u.deps.JwtCache.Get(tokenStr); ok {
return u.getUserById(ctx, userId)
}

View File

@ -1,155 +0,0 @@
package middleware
// Modified recovery from gin, use own logger
import (
"backend/internal/integrations"
"backend/pkg/logger"
"bytes"
"errors"
"fmt"
"net"
"net/http"
"net/http/httputil"
"os"
"runtime"
"strings"
"time"
"github.com/gin-gonic/gin"
)
const (
reset = "\033[0m"
)
var (
dunno = []byte("???")
centerDot = []byte("·")
dot = []byte(".")
slash = []byte("/")
)
func NewRecoveryMiddleware(logger logger.Logger, prometheus *integrations.Prometheus, debugMode bool) gin.HandlerFunc {
handle := defaultHandleRecovery
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
prometheus.AddPanic()
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
var brokenPipe bool
if ne, ok := err.(*net.OpError); ok {
var se *os.SyscallError
if errors.As(ne, &se) {
seStr := strings.ToLower(se.Error())
if strings.Contains(seStr, "broken pipe") ||
strings.Contains(seStr, "connection reset by peer") {
brokenPipe = true
}
}
}
if logger != nil {
stack := stack(3)
httpRequest, _ := httputil.DumpRequest(c.Request, false)
headers := strings.Split(string(httpRequest), "\r\n")
for idx, header := range headers {
current := strings.Split(header, ":")
if current[0] == "Authorization" {
headers[idx] = current[0] + ": *"
}
}
headersToStr := strings.Join(headers, "\r\n")
if brokenPipe {
logger.Printf("%s\n%s%s", err, headersToStr, reset)
} else if debugMode {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s\n%s%s",
timeFormat(time.Now()), headersToStr, err, stack, reset)
} else {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s%s",
timeFormat(time.Now()), err, stack, reset)
}
}
if brokenPipe {
// If the connection is dead, we can't write a status to it.
c.Error(err.(error)) //nolint: errcheck
c.Abort()
} else {
handle(c, err)
}
}
}()
c.Next()
}
}
func defaultHandleRecovery(c *gin.Context, _ any) {
c.AbortWithStatus(http.StatusInternalServerError)
}
// stack returns a nicely formatted stack frame, skipping skip frames.
func stack(skip int) []byte {
buf := new(bytes.Buffer) // the returned data
// As we loop, we open files and read them. These variables record the currently
// loaded file.
var lines [][]byte
var lastFile string
for i := skip; ; i++ { // Skip the expected number of frames
pc, file, line, ok := runtime.Caller(i)
if !ok {
break
}
// Print this much at least. If we can't find the source, it won't show.
fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
if file != lastFile {
data, err := os.ReadFile(file)
if err != nil {
continue
}
lines = bytes.Split(data, []byte{'\n'})
lastFile = file
}
fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
}
return buf.Bytes()
}
// source returns a space-trimmed slice of the n'th line.
func source(lines [][]byte, n int) []byte {
n-- // in stack trace, lines are 1-indexed but our array is 0-indexed
if n < 0 || n >= len(lines) {
return dunno
}
return bytes.TrimSpace(lines[n])
}
// function returns, if possible, the name of the function containing the PC.
func function(pc uintptr) []byte {
fn := runtime.FuncForPC(pc)
if fn == nil {
return dunno
}
name := []byte(fn.Name())
// The name includes the path name to the package, which is unnecessary
// since the file name is already included. Plus, it has center dots.
// That is, we see
// runtime/debug.*T·ptrmethod
// and want
// *T.ptrmethod
// Also the package path might contain dot (e.g. code.google.com/...),
// so first eliminate the path prefix
if lastSlash := bytes.LastIndex(name, slash); lastSlash >= 0 {
name = name[lastSlash+1:]
}
if period := bytes.Index(name, dot); period >= 0 {
name = name[period+1:]
}
name = bytes.ReplaceAll(name, centerDot, dot)
return name
}
// timeFormat returns a customized time string for logger.
func timeFormat(t time.Time) string {
return t.Format("2006/01/02 - 15:04:05")
}

View File

@ -1,33 +0,0 @@
package middleware
import (
"fmt"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
func NewTracingMiddleware(tracer trace.Tracer) gin.HandlerFunc {
prop := otel.GetTextMapPropagator()
return func(c *gin.Context) {
savedCtx := c.Request.Context()
defer func() {
c.Request = c.Request.WithContext(savedCtx)
}()
ctx := prop.Extract(savedCtx, propagation.HeaderCarrier(c.Request.Header))
ctx, span := tracer.Start(ctx, fmt.Sprintf("%s %s", c.Request.Method, c.Request.URL.Path))
defer span.End()
traceId := span.SpanContext().TraceID()
c.Header("X-Trace-Id", traceId.String())
c.Request = c.Request.WithContext(ctx)
c.Next()
}
}

View File

@ -1,4 +1,4 @@
package middleware
package httpserver
// Modified recovery from gin, use own logger

View File

@ -1,4 +1,4 @@
package middleware
package httpserver
import (
"backend/internal/integrations"

View File

@ -1,4 +1,4 @@
package middleware
package httpserver
import (
"fmt"

View File

@ -0,0 +1,79 @@
package httpserver
import (
"backend/pkg/logger"
"context"
"encoding/json"
"github.com/gin-gonic/gin"
)
type Handler[Input, Output interface{}] func(ctx context.Context, input Input) (Output, error)
type ResponseOk struct {
Status string `json:"status"`
Result interface{} `json:"result"`
}
type ResponseError struct {
Status string `json:"status"`
Error struct {
Id string `json:"id"`
Message string `json:"message"`
} `json:"error"`
}
func WrapGin[In, Out interface{}](log logger.Logger, handler Handler[In, Out]) gin.HandlerFunc {
return func(c *gin.Context) {
log := log.WithContext(c)
var input In
if err := c.ShouldBindJSON(&input); err != nil {
response := ResponseError{
Status: "error",
Error: struct {
Id string `json:"id"`
Message string `json:"message"`
}{
Id: "WrongBody",
Message: err.Error(),
},
}
body, _ := json.Marshal(response)
c.Data(400, "application/json", body)
return
}
var response interface{}
output, err := handler(c, input)
if err != nil {
log.Error().Err(err).Msg("error in request handler")
response = ResponseError{
Status: "error",
Error: struct {
Id string `json:"id"`
Message string `json:"message"`
}{
Id: "-",
Message: err.Error(),
},
}
} else {
response = ResponseOk{
Status: "success",
Result: output,
}
}
body, err := json.Marshal(response)
if err != nil {
log.Error().Err(err).Msg("marshal response error")
c.Data(500, "plain/text", []byte(err.Error()))
return
}
c.Data(200, "application/json", body)
}
}

View File

@ -16,7 +16,7 @@ func NewKafka(addr, topic string) *Kafka {
Addr: kafka.TCP(addr),
Topic: topic,
Balancer: &kafka.RoundRobin{},
AllowAutoTopicCreation: true,
AllowAutoTopicCreation: false,
BatchSize: 100,
BatchTimeout: 100 * time.Millisecond,
}

View File

@ -1,84 +0,0 @@
import random
import string
from locust import HttpUser, FastHttpUser
class Auth():
token: string
def __init__(self, token):
self.token = token
class User():
email: string
name: string
password: string
def __init__(self, email, password, name, token = ""):
self.email = email
self.password = password
self.name = name
self.token = token
class BackendApi():
http: FastHttpUser
def __init__(self, http: FastHttpUser):
self.http = http
def user_create(self) -> User:
email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@test.test'
name = ''.join(random.choices(string.ascii_letters, k=10))
password = 'Abcdef1!!1'
response = self.http.client.post(
"/user/create",
json={
"email": email,
"password": password,
"name": name,
},
)
if response.status_code != 200:
raise AssertionError('can not create user')
return User(email, password, name)
def user_login(self, user: User) -> Auth:
response = self.http.client.post(
"/user/login",
json={
"email": user.email,
"password": user.password,
},
)
if response.status_code != 200:
raise AssertionError('can not login user')
token = response.json()['token']
if token == '':
raise AssertionError('empty user token')
return Auth(token)
def dummy_get(self, auth: Auth):
headers = {"X-Auth": auth.token}
response = self.http.client.get("/dummy", headers=headers)
if response.status_code != 200:
raise AssertionError('something wrong')
def health_get(self):
response = self.http.client.get("/health")
if response.status_code != 200:
raise AssertionError('something wrong')
def shortlink_create(self, url: string) -> string:
response = self.http.client.post("/s/new?url=" + url)
if response.status_code != 200:
raise AssertionError('can not login user')
link = response.json()['link']
if link == '':
raise AssertionError('empty user token')
return link

View File

@ -3,6 +3,7 @@ create table if not exists users (
email text unique not null,
secret text not null,
name text not null,
email_verified boolean not null default false,
primary key (id)
);

View File

@ -15,7 +15,10 @@ install:
requirements:
pip freeze > requirements.txt
run-web:
locust -f tests,loads --class-picker --host http://localhost:8080 --processes 16
run-integration:
python3 -m pytest integration/
run-performance-web:
locust -f performance --class-picker --host http://localhost:8080 --processes 16

View File

@ -3,15 +3,13 @@ from locust import FastHttpUser, task
from api import BackendApi, Auth
class DummyGet(FastHttpUser):
api: BackendApi
auth: Auth
def on_start(self):
self.api = BackendApi(self.client)
user = self.api.user_create()
self.auth = self.api.user_login(user)
@task
def dummy_test(self):
self.api.dummy_get(self.auth)
def on_start(self):
self.api = BackendApi(self)
user = self.api.user_create()
self.auth = self.api.user_login(user)

View File

@ -3,11 +3,9 @@ from locust import FastHttpUser, task
from api import BackendApi
class HealthGet(FastHttpUser):
api: BackendApi
def on_start(self):
self.api = BackendApi(self.client)
@task
def user_create_test(self):
self.api.health_get()
def on_start(self):
self.api = BackendApi(self)

View File

@ -1,9 +1,9 @@
from locust import LoadTestShape
class LowLoad(LoadTestShape):
time_limit = 600
spawn_rate = 5
max_users = 100
time_limit = 60
spawn_rate = 2
max_users = 10
def tick(self) -> (tuple[float, int] | None):
user_count = self.spawn_rate * self.get_run_time()

View File

@ -3,11 +3,9 @@ from locust import FastHttpUser, task
from api import BackendApi
class ShortlinkCreate(FastHttpUser):
api: BackendApi
def on_start(self):
self.api = BackendApi(self.client)
@task
def user_create_test(self):
self.api.shortlink_create("https://ya.ru")
def on_start(self):
self.api = BackendApi(self)
self.api.shortlink_create("https://example.com")

View File

@ -3,23 +3,18 @@ from locust import FastHttpUser, task
from api import BackendApi, User
class UserCreate(FastHttpUser):
api: BackendApi
def on_start(self):
self.api = BackendApi(self.client)
@task
def user_create_test(self):
self.api.user_create()
def on_start(self):
self.api = BackendApi(self)
class UserLogin(FastHttpUser):
api: BackendApi
user: User
def on_start(self):
self.api = BackendApi(self)
self.user = self.api.user_create()
@task
def user_create_test(self):
self.api.user_login(self.user)
def on_start(self):
self.api = BackendApi(self)
self.user = self.api.user_create()

View File

@ -4,6 +4,7 @@ certifi==2024.7.4
charset-normalizer==3.3.2
click==8.1.7
ConfigArgParse==1.7
exceptiongroup==1.2.2
Flask==3.0.3
Flask-Cors==4.0.1
Flask-Login==0.6.3
@ -11,12 +12,16 @@ gevent==24.2.1
geventhttpclient==2.3.1
greenlet==3.0.3
idna==3.7
iniconfig==2.0.0
itsdangerous==2.2.0
Jinja2==3.1.4
locust==2.31.3
MarkupSafe==2.1.5
msgpack==1.0.8
packaging==24.2
pluggy==1.5.0
psutil==6.0.0
pytest==8.3.4
pyzmq==26.1.0
requests==2.32.3
tomli==2.0.1