implement sending to kafka, remove email repo
This commit is contained in:
parent
2f9de4f665
commit
e09bf3febc
@ -95,7 +95,7 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
KAFKA_NODE_ID: 1
|
KAFKA_NODE_ID: 1
|
||||||
KAFKA_PROCESS_ROLES: broker,controller
|
KAFKA_PROCESS_ROLES: broker,controller
|
||||||
KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
|
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
|
||||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
|
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
|
||||||
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||||
@ -105,6 +105,9 @@ services:
|
|||||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||||
KAFKA_NUM_PARTITIONS: 3
|
KAFKA_NUM_PARTITIONS: 3
|
||||||
|
ports:
|
||||||
|
- 9092:9092
|
||||||
|
# - 9093:9093
|
||||||
|
|
||||||
# backend:
|
# backend:
|
||||||
# build: .
|
# build: .
|
||||||
|
|||||||
12
helper/go.mod
Normal file
12
helper/go.mod
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
module helper
|
||||||
|
|
||||||
|
go 1.22.5
|
||||||
|
|
||||||
|
require gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/klauspost/compress v1.15.9 // indirect
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.15 // indirect
|
||||||
|
github.com/segmentio/kafka-go v0.4.47 // indirect
|
||||||
|
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||||
|
)
|
||||||
63
helper/go.sum
Normal file
63
helper/go.sum
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
|
||||||
|
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
|
||||||
|
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
|
||||||
|
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||||
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
|
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||||
|
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
|
||||||
|
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
|
||||||
|
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
|
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||||
|
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||||
|
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||||
|
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||||
|
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||||
|
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||||
|
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||||
|
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||||
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||||
|
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
|
||||||
|
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
|
||||||
|
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||||
|
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||||
|
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||||
|
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||||
|
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
|
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||||
|
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||||
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk=
|
||||||
|
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE=
|
||||||
|
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
88
helper/main.go
Normal file
88
helper/main.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/segmentio/kafka-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// type emailHelper struct {
|
||||||
|
// dialer *gomail.Dialer
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func (e *emailHelper) SendEmailForgotPassword(email, token string) {
|
||||||
|
// link := "https://nucrea.ru?token=" + token
|
||||||
|
|
||||||
|
// const MSG_TEXT = `
|
||||||
|
// <html>
|
||||||
|
// <head>
|
||||||
|
// </head>
|
||||||
|
// <body>
|
||||||
|
// <p>This message was sent because you forgot a password</p>
|
||||||
|
// <p>To change a password, use <a href="{{Link}}"/>this</a> link</p>
|
||||||
|
// </body>
|
||||||
|
// </html>
|
||||||
|
// `
|
||||||
|
// msgText := strings.ReplaceAll(MSG_TEXT, "{{Link}}", link)
|
||||||
|
|
||||||
|
// m := gomail.NewMessage()
|
||||||
|
// m.SetHeader("From", "email")
|
||||||
|
// m.SetHeader("To", email)
|
||||||
|
// m.SetHeader("Subject", "Hello!")
|
||||||
|
// m.SetBody("text/html", msgText)
|
||||||
|
|
||||||
|
// if err := d.DialAndSend(m); err != nil {
|
||||||
|
// panic(err)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
const (
|
||||||
|
SMTP_SERVER = ""
|
||||||
|
SMTP_PORT = 0
|
||||||
|
SMTP_LOGIN = ""
|
||||||
|
SMTP_PASSWORD = ""
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// d := gomail.NewDialer(SMTP_SERVER, SMTP_PORT, SMTP_LOGIN, SMTP_PASSWORD)
|
||||||
|
|
||||||
|
// conn, err := kafka.DialLeader(ctx, "", "")
|
||||||
|
// if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
// }
|
||||||
|
// defer conn.Close()
|
||||||
|
|
||||||
|
log.Println("starting reader...")
|
||||||
|
|
||||||
|
r := kafka.NewReader(kafka.ReaderConfig{
|
||||||
|
Brokers: []string{"localhost:9092"},
|
||||||
|
Topic: "topic-A",
|
||||||
|
// Partition: 0,
|
||||||
|
GroupID: "consumer-group-id",
|
||||||
|
})
|
||||||
|
// r.SetOffset(kafka.FirstOffset)
|
||||||
|
|
||||||
|
log.Println("reader started")
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := r.FetchMessage(ctx)
|
||||||
|
if err == io.EOF {
|
||||||
|
log.Fatal("EOF")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
|
||||||
|
|
||||||
|
if err := r.CommitMessages(ctx, msg); err != nil {
|
||||||
|
log.Fatalf("failed to commit: %s\n", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,3 +1,5 @@
|
|||||||
port: 8080
|
port: 8080
|
||||||
postgres_url: "postgres://postgres:postgres@localhost:5432/postgres"
|
postgres_url: "postgres://postgres:postgres@localhost:5432/postgres"
|
||||||
jwt_signing_key: "./misc/jwt_signing_key"
|
jwt_signing_key: "./misc/jwt_signing_key"
|
||||||
|
kafka_url: "localhost:9092"
|
||||||
|
kafka_topic: "backend_events"
|
||||||
@ -92,6 +92,8 @@ func (a *App) Run(p RunParams) {
|
|||||||
logger.Fatal().Err(err).Msg("failed connecting to postgres")
|
logger.Fatal().Err(err).Msg("failed connecting to postgres")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kafka := integrations.NewKafka(conf.GetKafkaUrl(), conf.GetKafkaTopic())
|
||||||
|
|
||||||
var key *rsa.PrivateKey
|
var key *rsa.PrivateKey
|
||||||
{
|
{
|
||||||
keyRawBytes, err := os.ReadFile(conf.GetJwtSigningKey())
|
keyRawBytes, err := os.ReadFile(conf.GetJwtSigningKey())
|
||||||
@ -135,9 +137,9 @@ func (a *App) Run(p RunParams) {
|
|||||||
passwordUtil = utils.NewPasswordUtil()
|
passwordUtil = utils.NewPasswordUtil()
|
||||||
|
|
||||||
userRepo = repos.NewUserRepo(sqlDb, tracer)
|
userRepo = repos.NewUserRepo(sqlDb, tracer)
|
||||||
emailRepo = repos.NewEmailRepo()
|
|
||||||
actionTokenRepo = repos.NewActionTokenRepo(sqlDb)
|
actionTokenRepo = repos.NewActionTokenRepo(sqlDb)
|
||||||
shortlinkRepo = repos.NewShortlinkRepo(sqlDb, tracer)
|
shortlinkRepo = repos.NewShortlinkRepo(sqlDb, tracer)
|
||||||
|
eventRepo = repos.NewEventRepo(kafka)
|
||||||
|
|
||||||
userCache = cache.NewCacheInmemSharded[models.UserDTO](cache.ShardingTypeInteger)
|
userCache = cache.NewCacheInmemSharded[models.UserDTO](cache.ShardingTypeInteger)
|
||||||
jwtCache = cache.NewCacheInmemSharded[string](cache.ShardingTypeJWT)
|
jwtCache = cache.NewCacheInmemSharded[string](cache.ShardingTypeJWT)
|
||||||
@ -170,7 +172,7 @@ func (a *App) Run(p RunParams) {
|
|||||||
UserRepo: userRepo,
|
UserRepo: userRepo,
|
||||||
UserCache: userCache,
|
UserCache: userCache,
|
||||||
JwtCache: jwtCache,
|
JwtCache: jwtCache,
|
||||||
EmailRepo: emailRepo,
|
EventRepo: *eventRepo,
|
||||||
ActionTokenRepo: actionTokenRepo,
|
ActionTokenRepo: actionTokenRepo,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -4,12 +4,16 @@ type IConfig interface {
|
|||||||
GetPort() uint16
|
GetPort() uint16
|
||||||
GetPostgresUrl() string
|
GetPostgresUrl() string
|
||||||
GetJwtSigningKey() string
|
GetJwtSigningKey() string
|
||||||
|
GetKafkaUrl() string
|
||||||
|
GetKafkaTopic() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port uint16 `yaml:"port"`
|
Port uint16 `yaml:"port"`
|
||||||
PostgresUrl string `yaml:"postgres_url"`
|
PostgresUrl string `yaml:"postgres_url"`
|
||||||
JwtSigningKey string `yaml:"jwt_signing_key" validate:"file"`
|
JwtSigningKey string `yaml:"jwt_signing_key" validate:"file"`
|
||||||
|
KafkaUrl string `yaml:"kafka_url"`
|
||||||
|
KafkaTopic string `yaml:"kafka_topic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) GetPort() uint16 {
|
func (c *Config) GetPort() uint16 {
|
||||||
@ -23,3 +27,11 @@ func (c *Config) GetPostgresUrl() string {
|
|||||||
func (c *Config) GetJwtSigningKey() string {
|
func (c *Config) GetJwtSigningKey() string {
|
||||||
return c.JwtSigningKey
|
return c.JwtSigningKey
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetKafkaUrl() string {
|
||||||
|
return c.KafkaUrl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetKafkaTopic() string {
|
||||||
|
return c.KafkaTopic
|
||||||
|
}
|
||||||
|
|||||||
@ -1,48 +0,0 @@
|
|||||||
package repos
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"gopkg.in/gomail.v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
const MSG_TEXT = `
|
|
||||||
<html>
|
|
||||||
<head>
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
<p>This message was sent because you forgot a password</p>
|
|
||||||
<p>To change a password, use <a href="{{Link}}"/>this</a> link</p>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
||||||
`
|
|
||||||
|
|
||||||
type EmailRepo interface {
|
|
||||||
SendEmailForgotPassword(email, token string)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEmailRepo() EmailRepo {
|
|
||||||
return &emailRepo{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type emailRepo struct {
|
|
||||||
// mail *gomail.Dialer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *emailRepo) SendEmailForgotPassword(email, token string) {
|
|
||||||
link := "https://nucrea.ru?token=" + token
|
|
||||||
msgText := strings.ReplaceAll(MSG_TEXT, "{{Link}}", link)
|
|
||||||
|
|
||||||
m := gomail.NewMessage()
|
|
||||||
m.SetHeader("From", "email")
|
|
||||||
m.SetHeader("To", email)
|
|
||||||
m.SetHeader("Subject", "Hello!")
|
|
||||||
m.SetBody("text/html", msgText)
|
|
||||||
|
|
||||||
d := gomail.NewDialer("smtp.yandex.ru", 587, "login", "password")
|
|
||||||
|
|
||||||
// Send the email to Bob, Cora and Dan.
|
|
||||||
if err := d.DialAndSend(m); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
33
src/core/repos/event_repo.go
Normal file
33
src/core/repos/event_repo.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package repos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"backend/src/integrations"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewEventRepo(kafka *integrations.Kafka) *EventRepo {
|
||||||
|
return &EventRepo{
|
||||||
|
kafka: kafka,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventRepo struct {
|
||||||
|
kafka *integrations.Kafka
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error {
|
||||||
|
value := struct {
|
||||||
|
Email string `json:"email"`
|
||||||
|
Token string `json:"token"`
|
||||||
|
}{
|
||||||
|
Email: email,
|
||||||
|
Token: actionToken,
|
||||||
|
}
|
||||||
|
valueBytes, err := json.Marshal(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.kafka.SendMessage(ctx, "email_forgot_password", valueBytes)
|
||||||
|
}
|
||||||
@ -41,7 +41,7 @@ type UserServiceDeps struct {
|
|||||||
UserRepo repos.UserRepo
|
UserRepo repos.UserRepo
|
||||||
UserCache cache.Cache[string, models.UserDTO]
|
UserCache cache.Cache[string, models.UserDTO]
|
||||||
JwtCache cache.Cache[string, string]
|
JwtCache cache.Cache[string, string]
|
||||||
EmailRepo repos.EmailRepo
|
EventRepo repos.EventRepo
|
||||||
ActionTokenRepo repos.ActionTokenRepo
|
ActionTokenRepo repos.ActionTokenRepo
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,8 +132,7 @@ func (u *userService) HelpPasswordForgot(ctx context.Context, userId string) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
u.deps.EmailRepo.SendEmailForgotPassword(user.Email, actionToken.Value)
|
return u.deps.EventRepo.SendEmailForgotPassword(ctx, user.Email, actionToken.Value)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *userService) ChangePasswordForgot(ctx context.Context, userId, newPassword, accessCode string) error {
|
func (u *userService) ChangePasswordForgot(ctx context.Context, userId, newPassword, accessCode string) error {
|
||||||
|
|||||||
@ -2,36 +2,36 @@ package integrations
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/segmentio/kafka-go"
|
"github.com/segmentio/kafka-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Kafka struct {
|
type Kafka struct {
|
||||||
conn *kafka.Conn
|
writer *kafka.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) Connect(ctx context.Context) error {
|
func NewKafka(addr, topic string) *Kafka {
|
||||||
conn, err := kafka.DialContext(ctx, "", "")
|
w := &kafka.Writer{
|
||||||
if err != nil {
|
Addr: kafka.TCP(addr),
|
||||||
return err
|
Topic: topic,
|
||||||
}
|
Balancer: &kafka.RoundRobin{},
|
||||||
defer conn.Close()
|
AllowAutoTopicCreation: true,
|
||||||
|
BatchSize: 100,
|
||||||
// w := &kafka.Writer{
|
BatchTimeout: 100 * time.Millisecond,
|
||||||
// Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
|
|
||||||
// Topic: "topic-A",
|
|
||||||
// Balancer: &kafka.LeastBytes{},
|
|
||||||
// }
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) SendMessage() {
|
return &Kafka{
|
||||||
k.conn.WriteMessages()
|
writer: w,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
msg := kafka.Message{
|
func (k *Kafka) SendMessage(ctx context.Context, key string, value []byte) error {
|
||||||
Topic: "event",
|
return k.writer.WriteMessages(
|
||||||
Key: []byte("send_email"),
|
context.Background(),
|
||||||
Value: []byte("value"),
|
kafka.Message{
|
||||||
}
|
Key: []byte(key),
|
||||||
|
Value: value,
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user