commit
a0616c6ea2
@ -129,19 +129,17 @@ func (a *App) Run(p RunParams) {
|
||||
|
||||
// Periodically trigger cache cleanup
|
||||
go func() {
|
||||
tmr := time.NewTicker(5 * time.Minute)
|
||||
tmr := time.NewTicker(15 * time.Minute)
|
||||
defer tmr.Stop()
|
||||
|
||||
batchSize := 100
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tmr.C:
|
||||
userCache.CheckExpired(batchSize)
|
||||
jwtCache.CheckExpired(batchSize)
|
||||
linksCache.CheckExpired(batchSize)
|
||||
userCache.CheckExpired()
|
||||
jwtCache.CheckExpired()
|
||||
linksCache.CheckExpired()
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -155,6 +153,7 @@ func (a *App) Run(p RunParams) {
|
||||
JwtCache: jwtCache,
|
||||
EventRepo: *eventRepo,
|
||||
ActionTokenRepo: actionTokenRepo,
|
||||
Logger: logger,
|
||||
},
|
||||
)
|
||||
shortlinkService = services.NewShortlinkSevice(
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
port: 8080
|
||||
postgres_url: "postgres://postgres:postgres@localhost:5432/postgres"
|
||||
jwt_signing_key: "./jwt_signing_key"
|
||||
kafka_url: "localhost:9092"
|
||||
kafka_topic: "backend_events"
|
||||
kafka_url: "localhost:9091"
|
||||
kafka_topic: "events"
|
||||
@ -2,7 +2,7 @@ package handlers
|
||||
|
||||
import "github.com/gin-gonic/gin"
|
||||
|
||||
func NewDummyHandler() gin.HandlerFunc {
|
||||
func New200OkHandler() gin.HandlerFunc {
|
||||
return func(ctx *gin.Context) {
|
||||
ctx.Status(200)
|
||||
}
|
||||
36
cmd/backend/server/handlers/user_change_password.go
Normal file
36
cmd/backend/server/handlers/user_change_password.go
Normal file
@ -0,0 +1,36 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"backend/cmd/backend/server/middleware"
|
||||
"backend/internal/core/services"
|
||||
httpserver "backend/internal/http_server"
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type inputChangePassword struct {
|
||||
OldPassword string `json:"oldPassword" binding:"required"`
|
||||
NewPassword string `json:"newPassword" binding:"required"`
|
||||
}
|
||||
|
||||
func NewUserChangePasswordHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
|
||||
return httpserver.WrapGin(log,
|
||||
func(ctx context.Context, input inputChangePassword) (interface{}, error) {
|
||||
ginCtx, ok := ctx.(*gin.Context)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("can not cast context")
|
||||
}
|
||||
user := middleware.GetUserFromRequest(ginCtx)
|
||||
|
||||
err := userService.ChangePassword(ctx, user.Id, input.OldPassword, input.NewPassword)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -10,9 +10,9 @@ import (
|
||||
)
|
||||
|
||||
type createUserInput struct {
|
||||
Email string `json:"email" validate:"required,email"`
|
||||
Password string `json:"password" validate:"required"`
|
||||
Name string `json:"name" validate:"required"`
|
||||
Email string `json:"email" binding:"required,email"`
|
||||
Password string `json:"password" binding:"required"`
|
||||
Name string `json:"name" binding:"required"`
|
||||
}
|
||||
|
||||
type createUserOutput struct {
|
||||
@ -32,9 +32,8 @@ func NewUserCreateHandler(log logger.Logger, userService services.UserService) g
|
||||
},
|
||||
)
|
||||
|
||||
out := createUserOutput{}
|
||||
if err != nil {
|
||||
return out, err
|
||||
return createUserOutput{}, err
|
||||
}
|
||||
|
||||
return createUserOutput{
|
||||
@ -10,8 +10,8 @@ import (
|
||||
)
|
||||
|
||||
type loginUserInput struct {
|
||||
Login string `json:"email" validate:"required,email"`
|
||||
Password string `json:"password"`
|
||||
Login string `json:"email" binding:"required,email"`
|
||||
Password string `json:"password" binding:"required"`
|
||||
}
|
||||
|
||||
type loginUserOutput struct {
|
||||
27
cmd/backend/server/handlers/user_restore_password.go
Normal file
27
cmd/backend/server/handlers/user_restore_password.go
Normal file
@ -0,0 +1,27 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"backend/internal/core/services"
|
||||
httpserver "backend/internal/http_server"
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type inputRestorePassword struct {
|
||||
Token string `json:"token" binding:"required"`
|
||||
NewPassword string `json:"password" binding:"required"`
|
||||
}
|
||||
|
||||
func NewUserRestorePasswordHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
|
||||
return httpserver.WrapGin(log,
|
||||
func(ctx context.Context, input inputRestorePassword) (interface{}, error) {
|
||||
err := userService.ChangePasswordWithToken(ctx, input.Token, input.NewPassword)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
)
|
||||
}
|
||||
23
cmd/backend/server/handlers/user_send_restore_password.go
Normal file
23
cmd/backend/server/handlers/user_send_restore_password.go
Normal file
@ -0,0 +1,23 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"backend/internal/core/services"
|
||||
httpserver "backend/internal/http_server"
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type inputSendRestorePassword struct {
|
||||
Email string `json:"email" binding:"required,email"`
|
||||
}
|
||||
|
||||
func NewUserSendRestorePasswordHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
|
||||
return httpserver.WrapGin(log,
|
||||
func(ctx context.Context, input inputSendRestorePassword) (interface{}, error) {
|
||||
err := userService.SendEmailForgotPassword(ctx, input.Email)
|
||||
return nil, err
|
||||
},
|
||||
)
|
||||
}
|
||||
23
cmd/backend/server/handlers/user_send_verify.go
Normal file
23
cmd/backend/server/handlers/user_send_verify.go
Normal file
@ -0,0 +1,23 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"backend/internal/core/services"
|
||||
httpserver "backend/internal/http_server"
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type inputSendVerify struct {
|
||||
Email string `json:"email" binding:"required,email"`
|
||||
}
|
||||
|
||||
func NewUserSendVerifyEmailHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
|
||||
return httpserver.WrapGin(log,
|
||||
func(ctx context.Context, input inputSendVerify) (interface{}, error) {
|
||||
err := userService.SendEmailVerifyUser(ctx, input.Email)
|
||||
return nil, err
|
||||
},
|
||||
)
|
||||
}
|
||||
@ -1,12 +1,20 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"backend/internal/core/models"
|
||||
"backend/internal/core/services"
|
||||
"fmt"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func GetUserFromRequest(c *gin.Context) *models.UserDTO {
|
||||
if user, ok := c.Get("user"); ok {
|
||||
return user.(*models.UserDTO)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewAuthMiddleware(userService services.UserService) gin.HandlerFunc {
|
||||
return func(ctx *gin.Context) {
|
||||
token := ctx.GetHeader("X-Auth")
|
||||
|
||||
@ -3,7 +3,6 @@ package server
|
||||
import (
|
||||
"backend/cmd/backend/server/handlers"
|
||||
"backend/cmd/backend/server/middleware"
|
||||
"backend/cmd/backend/server/utils"
|
||||
"backend/internal/core/services"
|
||||
httpserver "backend/internal/http_server"
|
||||
"backend/internal/integrations"
|
||||
@ -30,7 +29,7 @@ func NewServer(opts NewServerOpts) *httpserver.Server {
|
||||
r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context()
|
||||
|
||||
// r.Static("/webapp", "./webapp")
|
||||
r.GET("/health", handlers.NewDummyHandler())
|
||||
r.GET("/health", handlers.New200OkHandler())
|
||||
|
||||
prometheus := integrations.NewPrometheus()
|
||||
r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler()))
|
||||
@ -48,17 +47,12 @@ func NewServer(opts NewServerOpts) *httpserver.Server {
|
||||
{
|
||||
userGroup.POST("/create", handlers.NewUserCreateHandler(opts.Logger, opts.UserService))
|
||||
userGroup.POST("/login", handlers.NewUserLoginHandler(opts.Logger, opts.UserService))
|
||||
userGroup.POST("/send-verify", handlers.NewUserSendVerifyEmailHandler(opts.Logger, opts.UserService))
|
||||
userGroup.POST("/send-restore-password", handlers.NewUserSendRestorePasswordHandler(opts.Logger, opts.UserService))
|
||||
userGroup.POST("/restore-password", handlers.NewUserRestorePasswordHandler(opts.Logger, opts.UserService))
|
||||
|
||||
}
|
||||
|
||||
dummyGroup := v1.Group("/dummy")
|
||||
dummyGroup.Use(middleware.NewAuthMiddleware(opts.UserService))
|
||||
{
|
||||
dummyGroup.GET("", handlers.NewDummyHandler())
|
||||
dummyGroup.POST("/forgot-password", func(c *gin.Context) {
|
||||
user := utils.GetUserFromRequest(c)
|
||||
opts.UserService.SendEmailForgotPassword(c, user.Id)
|
||||
})
|
||||
userGroup.Use(middleware.NewAuthMiddleware(opts.UserService))
|
||||
userGroup.POST("/change-password", handlers.NewUserChangePasswordHandler(opts.Logger, opts.UserService))
|
||||
}
|
||||
|
||||
return httpserver.New(
|
||||
|
||||
@ -1,14 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"backend/internal/core/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func GetUserFromRequest(c *gin.Context) *models.UserDTO {
|
||||
if user, ok := c.Get("user"); ok {
|
||||
return user.(*models.UserDTO)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,11 +0,0 @@
|
||||
app:
|
||||
serviceUrl: "https://localhost:8080"
|
||||
kafka:
|
||||
brokers:
|
||||
- localhost:9092
|
||||
topic: backend_events
|
||||
smtp:
|
||||
server: smtp.yandex.ru
|
||||
port: 587
|
||||
email: ""
|
||||
password: ""
|
||||
@ -1,130 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
"gopkg.in/gomail.v2"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const MSG_TEXT = `
|
||||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
<p>This message was sent because you forgot a password</p>
|
||||
<p>To change a password, use <a href="{{Link}}"/>this</a> link</p>
|
||||
</body>
|
||||
</html>
|
||||
`
|
||||
|
||||
func SendEmailForgotPassword(dialer *gomail.Dialer, from, to, link string) error {
|
||||
msgText := strings.ReplaceAll(MSG_TEXT, "{{Link}}", link)
|
||||
|
||||
m := gomail.NewMessage()
|
||||
m.SetHeader("From", m.FormatAddress(from, "Pet Backend"))
|
||||
m.SetHeader("To", to)
|
||||
m.SetHeader("Subject", "Hello!")
|
||||
m.SetBody("text/html", msgText)
|
||||
|
||||
return dialer.DialAndSend(m)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
App struct {
|
||||
LogFile string `yaml:"logFile"`
|
||||
ServiceUrl string `yaml:"serviceUrl"`
|
||||
}
|
||||
|
||||
Kafka struct {
|
||||
Brokers []string `yaml:"brokers"`
|
||||
Topic string `yaml:"topic"`
|
||||
ConsumerGroupId string `yaml:"consumerGroupId"`
|
||||
} `yaml:"kafka"`
|
||||
|
||||
SMTP struct {
|
||||
Server string `yaml:"server"`
|
||||
Port int `yaml:"port"`
|
||||
Email string `yaml:"email"`
|
||||
Password string `yaml:"password"`
|
||||
} `yaml:"smtp"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
configFile, err := os.ReadFile("config.yaml")
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
config := &Config{}
|
||||
if err := yaml.Unmarshal(configFile, config); err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
dialer := gomail.NewDialer(config.SMTP.Server, config.SMTP.Port, config.SMTP.Email, config.SMTP.Password)
|
||||
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: config.Kafka.Brokers,
|
||||
Topic: config.Kafka.Topic,
|
||||
GroupID: config.Kafka.ConsumerGroupId,
|
||||
})
|
||||
|
||||
logger, err := logger.New(
|
||||
ctx,
|
||||
logger.NewLoggerOpts{
|
||||
Debug: true,
|
||||
OutputFile: config.App.LogFile,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
logger.Printf("coworker service started\n")
|
||||
|
||||
for {
|
||||
msg, err := r.FetchMessage(ctx)
|
||||
if err == io.EOF {
|
||||
log.Fatal("EOF")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
|
||||
|
||||
if err := r.CommitMessages(ctx, msg); err != nil {
|
||||
log.Fatalf("failed to commit: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
value := struct {
|
||||
Email string `json:"email"`
|
||||
Token string `json:"token"`
|
||||
}{}
|
||||
|
||||
if err := json.Unmarshal(msg.Value, &value); err != nil {
|
||||
log.Fatalf("failed to unmarshal: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
link := fmt.Sprintf("%s/restore-password?token=%s", config.App.ServiceUrl, value.Token)
|
||||
|
||||
if err := SendEmailForgotPassword(dialer, config.SMTP.Email, value.Email, link); err != nil {
|
||||
log.Fatalf("failed to send email: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
30
cmd/notifyer/config.go
Normal file
30
cmd/notifyer/config.go
Normal file
@ -0,0 +1,30 @@
|
||||
package main
|
||||
|
||||
import "backend/pkg/config"
|
||||
|
||||
func LoadConfig(filePath string) (Config, error) {
|
||||
return config.NewFromFile[Config](filePath)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
App struct {
|
||||
LogFile string `yaml:"logFile"`
|
||||
ServiceUrl string `yaml:"serviceUrl"`
|
||||
}
|
||||
|
||||
Kafka struct {
|
||||
Brokers []string `yaml:"brokers"`
|
||||
Topic string `yaml:"topic"`
|
||||
ConsumerGroupId string `yaml:"consumerGroupId"`
|
||||
} `yaml:"kafka"`
|
||||
|
||||
SMTP ConfigSMTP `yaml:"smtp"`
|
||||
}
|
||||
|
||||
type ConfigSMTP struct {
|
||||
Server string `yaml:"server"`
|
||||
Port int `yaml:"port"`
|
||||
Login string `yaml:"login"`
|
||||
Password string `yaml:"password"`
|
||||
Email string `yaml:"email"`
|
||||
}
|
||||
13
cmd/notifyer/config.yaml
Normal file
13
cmd/notifyer/config.yaml
Normal file
@ -0,0 +1,13 @@
|
||||
app:
|
||||
serviceUrl: "http://localhost:8080"
|
||||
kafka:
|
||||
brokers:
|
||||
- localhost:9091
|
||||
topic: events
|
||||
consumerGroupId: notifyer-group
|
||||
smtp:
|
||||
server: localhost
|
||||
port: 12333
|
||||
login: "maillogin"
|
||||
password: "mailpass"
|
||||
email: "notifyer@example.com"
|
||||
87
cmd/notifyer/emailer.go
Normal file
87
cmd/notifyer/emailer.go
Normal file
@ -0,0 +1,87 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"html/template"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/gomail.v2"
|
||||
)
|
||||
|
||||
const MSG_TEXT = `
|
||||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
<p>{{.Text}}</p>
|
||||
{{if .Link}}
|
||||
<a href="{{.Link}}">Click</a>link</p>
|
||||
{{end}}
|
||||
</body>
|
||||
</html>
|
||||
`
|
||||
|
||||
type MailContent struct {
|
||||
Text string
|
||||
Link string
|
||||
}
|
||||
|
||||
func NewEmailer(conf ConfigSMTP) (*Emailer, error) {
|
||||
dialer := gomail.NewDialer(conf.Server, conf.Port, conf.Login, conf.Password)
|
||||
|
||||
closer, err := dialer.Dial()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
htmlTemplate, err := template.New("verify-email").Parse(MSG_TEXT)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Emailer{
|
||||
senderEmail: conf.Email,
|
||||
htmlTemplate: htmlTemplate,
|
||||
dialer: dialer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type Emailer struct {
|
||||
senderEmail string
|
||||
htmlTemplate *template.Template
|
||||
dialer *gomail.Dialer
|
||||
}
|
||||
|
||||
func (e *Emailer) SendRestorePassword(email, token string) error {
|
||||
return e.sendEmail("Restore your password", email, MailContent{
|
||||
Text: "Token: " + token,
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Emailer) SendVerifyUser(email, link string) error {
|
||||
return e.sendEmail("Verify your email", email, MailContent{
|
||||
Text: "You recieved this message due to registration of account. Use this link to verify email:",
|
||||
Link: link,
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Emailer) SendPasswordChanged(email string) error {
|
||||
return e.sendEmail("Password changed", email, MailContent{
|
||||
Text: "You recieved this message due to password change",
|
||||
})
|
||||
}
|
||||
|
||||
func (e *Emailer) sendEmail(subject, to string, content MailContent) error {
|
||||
builder := &strings.Builder{}
|
||||
if err := e.htmlTemplate.Execute(builder, content); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := gomail.NewMessage()
|
||||
m.SetHeader("From", m.FormatAddress(e.senderEmail, "Pet Backend"))
|
||||
m.SetHeader("To", to)
|
||||
m.SetHeader("Subject", subject)
|
||||
m.SetBody("text/html", builder.String())
|
||||
|
||||
return e.dialer.DialAndSend(m)
|
||||
}
|
||||
90
cmd/notifyer/main.go
Normal file
90
cmd/notifyer/main.go
Normal file
@ -0,0 +1,90 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
|
||||
type SendEmailEvent struct {
|
||||
Email string `json:"email"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
config, err := LoadConfig("config.yaml")
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
emailer, err := NewEmailer(config.SMTP)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
r := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: config.Kafka.Brokers,
|
||||
Topic: config.Kafka.Topic,
|
||||
GroupID: config.Kafka.ConsumerGroupId,
|
||||
})
|
||||
|
||||
logger, err := logger.New(ctx, logger.NewLoggerOpts{
|
||||
Debug: true,
|
||||
OutputFile: config.App.LogFile,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
logger.Printf("notifyer service started\n")
|
||||
|
||||
for {
|
||||
msg, err := r.FetchMessage(ctx)
|
||||
if err == io.EOF {
|
||||
log.Fatal("EOF")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
|
||||
|
||||
if err := r.CommitMessages(ctx, msg); err != nil {
|
||||
log.Fatalf("failed to commit: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if err := handleEvent(config, emailer, msg); err != nil {
|
||||
log.Printf("failed to handle event: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleEvent(config Config, emailer *Emailer, msg kafka.Message) error {
|
||||
event := SendEmailEvent{}
|
||||
if err := json.Unmarshal(msg.Value, &event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch string(msg.Key) {
|
||||
case "email_forgot_password":
|
||||
return emailer.SendRestorePassword(event.Email, event.Token)
|
||||
case "email_password_changed":
|
||||
return emailer.SendPasswordChanged(event.Email)
|
||||
case "email_verify_user":
|
||||
link := fmt.Sprintf("%s/verify-user?token=%s", config.App.ServiceUrl, event.Token)
|
||||
return emailer.SendVerifyUser(event.Email, link)
|
||||
}
|
||||
|
||||
return fmt.Errorf("unknown event type")
|
||||
}
|
||||
@ -104,25 +104,26 @@ services:
|
||||
kafka:
|
||||
image: &kafkaImage apache/kafka:3.8.0
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server http://127.0.0.1:9092 || exit 1"]
|
||||
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server http://kafka:9092 || exit 1"]
|
||||
interval: 1s
|
||||
timeout: 30s
|
||||
retries: 30
|
||||
environment:
|
||||
KAFKA_NODE_ID: 1
|
||||
KAFKA_PROCESS_ROLES: broker,controller
|
||||
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
|
||||
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
|
||||
KAFKA_LISTENERS: BACKEND://0.0.0.0:9091, BROKER://kafka:9092, CONTROLLER://kafka:9093
|
||||
KAFKA_ADVERTISED_LISTENERS: BACKEND://localhost:9091, BROKER://kafka:9092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BACKEND:PLAINTEXT, BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT
|
||||
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||
KAFKA_NUM_PARTITIONS: 3
|
||||
ports:
|
||||
- 9092:9092
|
||||
- 9091:9091
|
||||
|
||||
kafka-init:
|
||||
image: *kafkaImage
|
||||
@ -162,6 +163,22 @@ services:
|
||||
exit 0;
|
||||
"
|
||||
|
||||
smtp4dev:
|
||||
image: rnwood/smtp4dev:v3
|
||||
restart: always
|
||||
ports:
|
||||
- '12332:80' #WebUI
|
||||
- '12333:25' #SMTP
|
||||
- '12334:143' #IMAP
|
||||
# volumes:
|
||||
# - smtp4dev-data:/smtp4dev
|
||||
environment:
|
||||
- ServerOptions__Urls=http://*:80
|
||||
- ServerOptions__HostName=localhost
|
||||
- ServerOptions__TlsMode=None
|
||||
- RelayOptions__Login=maillogin
|
||||
- RelayOptions__Password=mailpass
|
||||
|
||||
volumes:
|
||||
postgres-volume:
|
||||
grafana-volume:
|
||||
|
||||
@ -6,6 +6,12 @@ import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
const (
|
||||
EventEmailPasswordChanged = "email_password_changed"
|
||||
EventEmailForgotPassword = "email_forgot_password"
|
||||
EventEmailVerifyUser = "email_verify_user"
|
||||
)
|
||||
|
||||
func NewEventRepo(kafka *integrations.Kafka) *EventRepo {
|
||||
return &EventRepo{
|
||||
kafka: kafka,
|
||||
@ -32,10 +38,14 @@ func (e *EventRepo) sendEmail(ctx context.Context, email, actionToken, eventType
|
||||
return e.kafka.SendMessage(ctx, eventType, valueBytes)
|
||||
}
|
||||
|
||||
func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error {
|
||||
return e.sendEmail(ctx, email, actionToken, "email_forgot_password")
|
||||
func (e *EventRepo) SendEmailPasswordChanged(ctx context.Context, email string) error {
|
||||
return e.sendEmail(ctx, email, "", EventEmailPasswordChanged)
|
||||
}
|
||||
|
||||
func (e *EventRepo) SendEmailVerifyEmail(ctx context.Context, email, actionToken string) error {
|
||||
return e.sendEmail(ctx, email, actionToken, "email_verify_email")
|
||||
func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error {
|
||||
return e.sendEmail(ctx, email, actionToken, EventEmailForgotPassword)
|
||||
}
|
||||
|
||||
func (e *EventRepo) SendEmailVerifyUser(ctx context.Context, email, actionToken string) error {
|
||||
return e.sendEmail(ctx, email, actionToken, EventEmailVerifyUser)
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"backend/internal/core/repos"
|
||||
"backend/internal/core/utils"
|
||||
"backend/pkg/cache"
|
||||
"backend/pkg/logger"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
@ -33,10 +34,10 @@ type UserService interface {
|
||||
VerifyEmail(ctx context.Context, actionToken string) error
|
||||
|
||||
SendEmailForgotPassword(ctx context.Context, userId string) error
|
||||
SendEmailVerifyEmail(ctx context.Context, userId string) error
|
||||
SendEmailVerifyUser(ctx context.Context, email string) error
|
||||
|
||||
ChangePassword(ctx context.Context, userId, oldPassword, newPassword string) error
|
||||
ChangePasswordWithToken(ctx context.Context, userId, actionToken, newPassword string) error
|
||||
ChangePasswordWithToken(ctx context.Context, actionToken, newPassword string) error
|
||||
}
|
||||
|
||||
func NewUserService(deps UserServiceDeps) UserService {
|
||||
@ -51,6 +52,7 @@ type UserServiceDeps struct {
|
||||
JwtCache cache.Cache[string, string]
|
||||
EventRepo repos.EventRepo
|
||||
ActionTokenRepo repos.ActionTokenRepo
|
||||
Logger logger.Logger
|
||||
}
|
||||
|
||||
type userService struct {
|
||||
@ -91,7 +93,10 @@ func (u *userService) CreateUser(ctx context.Context, params UserCreateParams) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.sendEmailVerifyEmail(ctx, result.Id, user.Email)
|
||||
|
||||
if err := u.sendEmailVerifyUser(ctx, result.Id, user.Email); err != nil {
|
||||
u.deps.Logger.Error().Err(err).Msg("error occured on sending email")
|
||||
}
|
||||
|
||||
u.deps.UserCache.Set(result.Id, *result, cache.Expiration{Ttl: userCacheTtl})
|
||||
|
||||
@ -136,7 +141,7 @@ func (u *userService) VerifyEmail(ctx context.Context, actionToken string) error
|
||||
}
|
||||
|
||||
if err := u.deps.UserRepo.SetUserEmailVerified(ctx, token.UserId); err != nil {
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
//TODO: log warnings somehow
|
||||
@ -157,7 +162,7 @@ func (u *userService) SendEmailForgotPassword(ctx context.Context, email string)
|
||||
UserId: user.Id,
|
||||
Value: uuid.New().String(),
|
||||
Target: models.ActionTokenTargetForgotPassword,
|
||||
Expiration: time.Now().Add(1 * time.Hour),
|
||||
Expiration: time.Now().Add(15 * time.Minute),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
@ -167,7 +172,7 @@ func (u *userService) SendEmailForgotPassword(ctx context.Context, email string)
|
||||
return u.deps.EventRepo.SendEmailForgotPassword(ctx, user.Email, actionToken.Value)
|
||||
}
|
||||
|
||||
func (u *userService) sendEmailVerifyEmail(ctx context.Context, userId, email string) error {
|
||||
func (u *userService) sendEmailVerifyUser(ctx context.Context, userId, email string) error {
|
||||
actionToken, err := u.deps.ActionTokenRepo.CreateActionToken(
|
||||
ctx,
|
||||
models.ActionTokenDTO{
|
||||
@ -181,25 +186,26 @@ func (u *userService) sendEmailVerifyEmail(ctx context.Context, userId, email st
|
||||
return err
|
||||
}
|
||||
|
||||
return u.deps.EventRepo.SendEmailVerifyEmail(ctx, email, actionToken.Value)
|
||||
return u.deps.EventRepo.SendEmailVerifyUser(ctx, email, actionToken.Value)
|
||||
}
|
||||
|
||||
func (u *userService) SendEmailVerifyEmail(ctx context.Context, email string) error {
|
||||
func (u *userService) SendEmailVerifyUser(ctx context.Context, email string) error {
|
||||
//user, err := u.getUserById(ctx, userId)
|
||||
user, err := u.deps.UserRepo.GetUserByEmail(ctx, email)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return u.sendEmailVerifyEmail(ctx, user.Id, user.Email)
|
||||
if user == nil {
|
||||
return fmt.Errorf("no such user")
|
||||
}
|
||||
if user.EmailVerified {
|
||||
return fmt.Errorf("user already verified")
|
||||
}
|
||||
|
||||
func (u *userService) ChangePasswordWithToken(ctx context.Context, userId, actionToken, newPassword string) error {
|
||||
user, err := u.getUserById(ctx, userId)
|
||||
if err != nil {
|
||||
return err
|
||||
return u.sendEmailVerifyUser(ctx, user.Id, user.Email)
|
||||
}
|
||||
|
||||
func (u *userService) ChangePasswordWithToken(ctx context.Context, actionToken, newPassword string) error {
|
||||
token, err := u.deps.ActionTokenRepo.GetActionToken(ctx, actionToken, models.ActionTokenTargetForgotPassword)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -208,6 +214,14 @@ func (u *userService) ChangePasswordWithToken(ctx context.Context, userId, actio
|
||||
return fmt.Errorf("wrong action token")
|
||||
}
|
||||
|
||||
user, err := u.getUserById(ctx, token.UserId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if user == nil {
|
||||
return fmt.Errorf("no such user")
|
||||
}
|
||||
|
||||
if err := u.updatePassword(ctx, *user, newPassword); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -242,10 +256,18 @@ func (u *userService) updatePassword(ctx context.Context, user models.UserDTO, n
|
||||
return err
|
||||
}
|
||||
|
||||
return u.deps.UserRepo.UpdateUser(ctx, user.Id, models.UserUpdateDTO{
|
||||
if err = u.deps.UserRepo.UpdateUser(ctx, user.Id, models.UserUpdateDTO{
|
||||
Secret: newSecret,
|
||||
Name: user.Name,
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := u.deps.EventRepo.SendEmailPasswordChanged(ctx, user.Email); err != nil {
|
||||
u.deps.Logger.Error().Err(err).Msg("error occured on sending email")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *userService) getUserById(ctx context.Context, userId string) (*models.UserDTO, error) {
|
||||
|
||||
@ -26,7 +26,7 @@ type passwordUtil struct {
|
||||
}
|
||||
|
||||
func (b *passwordUtil) Hash(password string) (string, error) {
|
||||
bytes, _ := bcrypt.GenerateFromPassword([]byte(password), 8) //bcrypt.DefaultCost)
|
||||
bytes, _ := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
|
||||
@ -41,10 +41,10 @@ func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus
|
||||
|
||||
ctxLogger := logger.WithContext(c)
|
||||
|
||||
msg := fmt.Sprintf("Request %s %s %d %v", method, path, statusCode, latency)
|
||||
msg := fmt.Sprintf("%s %s %d %v", method, path, statusCode, latency)
|
||||
|
||||
if statusCode >= 200 && statusCode < 400 {
|
||||
// ctxLogger.Log().Msg(msg)
|
||||
ctxLogger.Log().Msg(msg)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -12,7 +12,7 @@ type Handler[Input, Output interface{}] func(ctx context.Context, input Input) (
|
||||
|
||||
type ResponseOk struct {
|
||||
Status string `json:"status"`
|
||||
Result interface{} `json:"result"`
|
||||
Result interface{} `json:"result,omitempty"`
|
||||
}
|
||||
|
||||
type ResponseError struct {
|
||||
|
||||
7
pkg/cache/cache_inmem.go
vendored
7
pkg/cache/cache_inmem.go
vendored
@ -78,7 +78,7 @@ func (c *cacheInmem[K, V]) Del(key K) {
|
||||
delete(c.data, key)
|
||||
}
|
||||
|
||||
func (c *cacheInmem[K, V]) CheckExpired(batchSize int) {
|
||||
func (c *cacheInmem[K, V]) CheckExpired() {
|
||||
if len(c.data) == 0 {
|
||||
return
|
||||
}
|
||||
@ -90,10 +90,5 @@ func (c *cacheInmem[K, V]) CheckExpired(batchSize int) {
|
||||
if time.Now().After(item.Expiration) {
|
||||
delete(c.data, key)
|
||||
}
|
||||
|
||||
batchSize--
|
||||
if batchSize <= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
5
pkg/cache/cache_inmem_sharded.go
vendored
5
pkg/cache/cache_inmem_sharded.go
vendored
@ -45,10 +45,9 @@ func (c *cacheInmemSharded[V]) Del(key string) {
|
||||
c.getShard(key).Del(key)
|
||||
}
|
||||
|
||||
func (c *cacheInmemSharded[V]) CheckExpired(batchSize int) {
|
||||
size := batchSize / c.info.Shards
|
||||
func (c *cacheInmemSharded[V]) CheckExpired() {
|
||||
for _, shard := range c.shards {
|
||||
shard.CheckExpired(size)
|
||||
shard.CheckExpired()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
2
pkg/cache/interface.go
vendored
2
pkg/cache/interface.go
vendored
@ -7,5 +7,5 @@ type Cache[K comparable, V any] interface {
|
||||
Set(key K, value V, exp Expiration)
|
||||
|
||||
Del(key K)
|
||||
CheckExpired(batchSize int)
|
||||
CheckExpired()
|
||||
}
|
||||
|
||||
40
tests/api.py
40
tests/api.py
@ -11,7 +11,6 @@ class Requests():
|
||||
|
||||
class Auth():
|
||||
token: string
|
||||
|
||||
def __init__(self, token):
|
||||
self.token = token
|
||||
|
||||
@ -21,20 +20,29 @@ class User():
|
||||
name: string
|
||||
password: string
|
||||
|
||||
def __init__(self, email, password, name, id="", token = ""):
|
||||
def __init__(self, email, password, name, id=""):
|
||||
self.id = id
|
||||
self.email = email
|
||||
self.password = password
|
||||
self.name = name
|
||||
self.token = token
|
||||
|
||||
@classmethod
|
||||
def random(cls):
|
||||
email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@example.com'
|
||||
name = ''.join(random.choices(string.ascii_letters, k=10))
|
||||
password = 'Abcdef1!!1'
|
||||
return cls(email, password, name)
|
||||
|
||||
def rand_email():
|
||||
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@example.com'
|
||||
|
||||
class BackendApi():
|
||||
def __init__(self, httpClient):
|
||||
self.httpClient = httpClient
|
||||
|
||||
def parse_response(self, response):
|
||||
if response.status != 200:
|
||||
raise AssertionError('something wrong')
|
||||
if response.status_code != 200:
|
||||
raise AssertionError('Request error')
|
||||
|
||||
json = response.json()
|
||||
if json['status'] == 'success':
|
||||
@ -45,35 +53,27 @@ class BackendApi():
|
||||
error = json['error']
|
||||
raise AssertionError(error['id'], error['message'])
|
||||
|
||||
def user_create(self, user: User | None) -> User:
|
||||
if user == None:
|
||||
email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@test.test'
|
||||
name = ''.join(random.choices(string.ascii_letters, k=10))
|
||||
password = 'Abcdef1!!1'
|
||||
user = User(email, password, name)
|
||||
|
||||
def user_create(self, user: User) -> User:
|
||||
res = self.parse_response(
|
||||
self.httpClient.post(
|
||||
"/v1/user/create", json={
|
||||
"/api/v1/user/create", json={
|
||||
"email": user.email,
|
||||
"password": user.password,
|
||||
"name": user.name,
|
||||
}
|
||||
)
|
||||
)
|
||||
return User(res['email'], user.password, res['name'], id=res['id'])
|
||||
|
||||
return User(res['email'], res['password'], res['name'], res['id'])
|
||||
|
||||
def user_login(self, user: User) -> Auth:
|
||||
def user_login(self, email, password) -> Auth:
|
||||
res = self.parse_response(
|
||||
self.httpClient.post(
|
||||
"/v1/user/login", json={
|
||||
"email": user.email+"a",
|
||||
"password": user.password,
|
||||
"/api/v1/user/login", json={
|
||||
"email": email,
|
||||
"password": password,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
return Auth(res['status'])
|
||||
|
||||
def dummy_get(self, auth: Auth):
|
||||
|
||||
@ -1,14 +1,53 @@
|
||||
from api import BackendApi, Requests
|
||||
import requests
|
||||
import random
|
||||
import string
|
||||
import pytest
|
||||
from kafka import KafkaConsumer
|
||||
from api import BackendApi, Requests, User, rand_email
|
||||
|
||||
backendUrl = "http://localhost:8080"
|
||||
|
||||
class TestUser:
|
||||
def test_create_user(self):
|
||||
api = BackendApi(Requests(backendUrl))
|
||||
api.user_create()
|
||||
def test_create_user():
|
||||
backend = BackendApi(Requests(backendUrl))
|
||||
|
||||
user = User.random()
|
||||
userWithBadEmail = User("sdfsaadsfgdf", user.password, user.name)
|
||||
userWithBadPassword = User(user.email, "badPassword", user.name)
|
||||
|
||||
with pytest.raises(Exception):
|
||||
backend.user_create(userWithBadEmail)
|
||||
with pytest.raises(Exception):
|
||||
backend.user_create(userWithBadPassword)
|
||||
|
||||
resultUser = backend.user_create(user)
|
||||
|
||||
#should not create user with same email
|
||||
with pytest.raises(Exception):
|
||||
backend.user_create(user)
|
||||
|
||||
assert resultUser.email == user.email
|
||||
assert resultUser.id != ""
|
||||
|
||||
def test_login_user():
|
||||
backend = BackendApi(Requests(backendUrl))
|
||||
|
||||
# consumer = KafkaConsumer(
|
||||
# 'backend_events',
|
||||
# group_id='test-group',
|
||||
# bootstrap_servers=['localhost:9092'],
|
||||
# consumer_timeout_ms=1000)
|
||||
# consumer.seek_to_end()
|
||||
|
||||
user = backend.user_create(User.random())
|
||||
|
||||
with pytest.raises(Exception):
|
||||
backend.user_login(user.email, "badpassword")
|
||||
with pytest.raises(Exception):
|
||||
backend.user_login(rand_email(), user.password)
|
||||
|
||||
#should not login without verified email
|
||||
with pytest.raises(Exception):
|
||||
backend.user_login(user.email, user.password)
|
||||
|
||||
# msgs = consumer.poll(timeout_ms=100)
|
||||
# print(msgs)
|
||||
|
||||
def test_login_user(self):
|
||||
api = BackendApi(Requests(backendUrl))
|
||||
user = api.user_create()
|
||||
api.user_login(user)
|
||||
@ -1,15 +0,0 @@
|
||||
from locust import FastHttpUser, task
|
||||
|
||||
from api import BackendApi, Auth
|
||||
|
||||
class DummyGet(FastHttpUser):
|
||||
def on_start(self):
|
||||
self.api = BackendApi(self.client)
|
||||
user = self.api.user_create()
|
||||
self.auth = self.api.user_login(user)
|
||||
|
||||
@task
|
||||
def dummy_test(self):
|
||||
self.api.dummy_get(self.auth)
|
||||
|
||||
|
||||
@ -1,11 +0,0 @@
|
||||
from locust import FastHttpUser, task
|
||||
|
||||
from api import BackendApi
|
||||
|
||||
class HealthGet(FastHttpUser):
|
||||
def on_start(self):
|
||||
self.api = BackendApi(self.client)
|
||||
|
||||
@task
|
||||
def user_create_test(self):
|
||||
self.api.health_get()
|
||||
@ -7,5 +7,5 @@ class ShortlinkCreate(FastHttpUser):
|
||||
self.api = BackendApi(self.client)
|
||||
|
||||
@task
|
||||
def user_create_test(self):
|
||||
def shortlink_create_test(self):
|
||||
self.api.shortlink_create("https://example.com")
|
||||
Loading…
x
Reference in New Issue
Block a user