From f490d45300f0319f025c9ea93d538d21dfd9e61d Mon Sep 17 00:00:00 2001
From: Sergey Chubaryan
Date: Sun, 23 Feb 2025 16:34:24 +0300
Subject: [PATCH 1/5] shortlinks fix
---
internal/core/repos/shortlink_repo.go | 6 +++---
sql/01_user.sql | 2 --
sql/02_shortlinks.sql | 17 ++---------------
3 files changed, 5 insertions(+), 20 deletions(-)
diff --git a/internal/core/repos/shortlink_repo.go b/internal/core/repos/shortlink_repo.go
index 2db06ce..565e4db 100644
--- a/internal/core/repos/shortlink_repo.go
+++ b/internal/core/repos/shortlink_repo.go
@@ -35,8 +35,8 @@ func (u *shortlinkRepo) AddShortlink(ctx context.Context, dto ShortlinkDTO) erro
_, span := u.tracer.Start(ctx, "postgres::AddShortlink")
defer span.End()
- query := `insert into shortlinks (url, expires_at) values ($1, $2);`
- _, err := u.db.ExecContext(ctx, query, dto.Url, dto.ExpiresAt)
+ query := `insert into shortlinks (id, url, expires_at) values ($1, $2, $3);`
+ _, err := u.db.ExecContext(ctx, query, dto.Id, dto.Url, dto.ExpiresAt)
return err
}
@@ -72,7 +72,7 @@ func (u *shortlinkRepo) DeleteExpiredShortlinks(ctx context.Context, limit int)
where id in (
select id
from shortlinks
- where current_date > expiration
+ where current_date > expires_at
limit $1
)
returning *
diff --git a/sql/01_user.sql b/sql/01_user.sql
index 1b38857..fd39f6a 100644
--- a/sql/01_user.sql
+++ b/sql/01_user.sql
@@ -9,8 +9,6 @@ create table if not exists users (
updated_at timestamp
);
-alter table users alter column active set default true;
-
create index if not exists idx_users_email on users(email);
create or replace trigger trg_user_created
diff --git a/sql/02_shortlinks.sql b/sql/02_shortlinks.sql
index bb0611a..e85199b 100644
--- a/sql/02_shortlinks.sql
+++ b/sql/02_shortlinks.sql
@@ -1,18 +1,5 @@
create table if not exists shortlinks (
- id int generated always as identity,
+ id text primary key,
url text not null,
- expires_at timestamp not null,
- created_at timestamp,
- updated_at timestamp
+ expires_at timestamp not null
);
-
-create or replace trigger trg_shortlink_created
- before insert on shortlinks
- for each row
- execute function trg_proc_row_created();
-
-create or replace trigger trg_shortlink_updated
- before update on shortlinks
- for each row
- when (new is distinct from old)
- execute function trg_proc_row_updated();
\ No newline at end of file
From 16260ecedb89d4398f9b7ba64aa2802a9f471097 Mon Sep 17 00:00:00 2001
From: Sergey Chubaryan
Date: Sun, 23 Feb 2025 20:09:55 +0300
Subject: [PATCH 2/5] add wrapper for prometheus
---
cmd/backend/server/server.go | 12 +--
cmd/notifyer/config.go | 1 +
cmd/notifyer/config.yaml | 1 +
cmd/notifyer/event_handler.go | 90 +++++++++++++++++++++
cmd/notifyer/main.go | 98 +++++++++--------------
cmd/shortlinks/main.go | 9 ++-
internal/http_server/metrics.go | 49 ++++++++++++
internal/http_server/recovery.go | 5 +-
internal/http_server/request_log.go | 16 ++--
internal/integrations/prometheus.go | 117 +++++++++++-----------------
10 files changed, 245 insertions(+), 153 deletions(-)
create mode 100644 cmd/notifyer/event_handler.go
create mode 100644 internal/http_server/metrics.go
diff --git a/cmd/backend/server/server.go b/cmd/backend/server/server.go
index e76b7d4..028ee83 100644
--- a/cmd/backend/server/server.go
+++ b/cmd/backend/server/server.go
@@ -28,14 +28,14 @@ func NewServer(opts NewServerOpts) *httpserver.Server {
r := gin.New()
r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context()
- // r.Static("/webapp", "./webapp")
+ metrics := integrations.NewMetrics("backend")
+ serverMetrics := httpserver.NewServerMetrics(metrics)
+
r.GET("/health", handlers.New200OkHandler())
+ r.Any("/metrics", gin.WrapH(metrics.HttpHandler()))
- prometheus := integrations.NewPrometheus()
- r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler()))
-
- r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode))
- r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus))
+ r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, serverMetrics, opts.DebugMode))
+ r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, serverMetrics))
r.Use(httpserver.NewTracingMiddleware(opts.Tracer))
r.GET("/verify-user", handlers.NewUserVerifyEmailHandler(opts.Logger, opts.UserService))
diff --git a/cmd/notifyer/config.go b/cmd/notifyer/config.go
index e40f597..f0c0437 100644
--- a/cmd/notifyer/config.go
+++ b/cmd/notifyer/config.go
@@ -8,6 +8,7 @@ func LoadConfig(filePath string) (Config, error) {
type Config struct {
App struct {
+ Port uint16 `yaml:"port"`
LogFile string `yaml:"logFile"`
ServiceUrl string `yaml:"serviceUrl"`
}
diff --git a/cmd/notifyer/config.yaml b/cmd/notifyer/config.yaml
index 4de10a5..1762b99 100644
--- a/cmd/notifyer/config.yaml
+++ b/cmd/notifyer/config.yaml
@@ -1,4 +1,5 @@
app:
+ port: 8082
serviceUrl: "http://localhost:8080"
kafka:
brokers:
diff --git a/cmd/notifyer/event_handler.go b/cmd/notifyer/event_handler.go
new file mode 100644
index 0000000..9d2b58e
--- /dev/null
+++ b/cmd/notifyer/event_handler.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "backend/internal/integrations"
+ "backend/pkg/logger"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+
+ "github.com/segmentio/kafka-go"
+)
+
+type SendEmailEvent struct {
+ Email string `json:"email"`
+ Token string `json:"token"`
+}
+
+func NewEventHandler(
+ config Config,
+ logger logger.Logger,
+ metrics *integrations.Metrics,
+ emailer *Emailer,
+) *EventHandler {
+ eventsCounter := metrics.NewCounter("events_counter", "total events handled")
+ return &EventHandler{
+ config: config,
+ logger: logger,
+ emailer: emailer,
+ eventsCounter: eventsCounter,
+ }
+}
+
+type EventHandler struct {
+ config Config
+ logger logger.Logger
+ emailer *Emailer
+ eventsCounter integrations.Counter
+}
+
+func (e *EventHandler) eventLoop(ctx context.Context, kafkaReader *kafka.Reader) {
+ for {
+ msg, err := kafkaReader.FetchMessage(ctx)
+ if err == io.EOF {
+ e.logger.Fatal().Err(err)
+ }
+ if err != nil {
+ e.logger.Fatal().Err(err)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ e.logger.Log().Msgf("event: %s", msg.Key)
+ e.eventsCounter.Inc()
+
+ if err := kafkaReader.CommitMessages(ctx, msg); err != nil {
+ e.logger.Error().Err(err).Msg("failed to commit offset")
+ continue
+ }
+
+ if err := e.handleEvent(ctx, msg); err != nil {
+ e.logger.Error().Err(err).Msg("failed to handle event")
+ continue
+ }
+ }
+}
+
+func (e *EventHandler) handleEvent(ctx context.Context, msg kafka.Message) error {
+ event := SendEmailEvent{}
+ if err := json.Unmarshal(msg.Value, &event); err != nil {
+ return err
+ }
+
+ // TODO: add context somehow
+ switch string(msg.Key) {
+ case "email_forgot_password":
+ return e.emailer.SendRestorePassword(event.Email, event.Token)
+ case "email_password_changed":
+ return e.emailer.SendPasswordChanged(event.Email)
+ case "email_verify_user":
+ link := fmt.Sprintf("%s/verify-user?token=%s", e.config.App.ServiceUrl, event.Token)
+ return e.emailer.SendVerifyUser(event.Email, link)
+ }
+
+ return fmt.Errorf("unknown event type")
+}
diff --git a/cmd/notifyer/main.go b/cmd/notifyer/main.go
index c328b89..71fb044 100644
--- a/cmd/notifyer/main.go
+++ b/cmd/notifyer/main.go
@@ -1,21 +1,16 @@
package main
import (
+ httpserver "backend/internal/http_server"
+ "backend/internal/integrations"
"backend/pkg/logger"
"context"
- "encoding/json"
- "fmt"
- "io"
"log"
+ "github.com/gin-gonic/gin"
"github.com/segmentio/kafka-go"
)
-type SendEmailEvent struct {
- Email string `json:"email"`
- Token string `json:"token"`
-}
-
func main() {
ctx := context.Background()
@@ -24,67 +19,44 @@ func main() {
log.Fatal(err.Error())
}
- emailer, err := NewEmailer(config.SMTP)
- if err != nil {
- log.Fatal(err.Error())
- }
-
- r := kafka.NewReader(kafka.ReaderConfig{
- Brokers: config.Kafka.Brokers,
- Topic: config.Kafka.Topic,
- GroupID: config.Kafka.ConsumerGroupId,
- })
-
logger, err := logger.New(ctx, logger.NewLoggerOpts{
Debug: true,
OutputFile: config.App.LogFile,
})
if err != nil {
- log.Fatal(err.Error())
+ logger.Fatal().Err(err)
}
- logger.Printf("notifyer service started\n")
-
- for {
- msg, err := r.FetchMessage(ctx)
- if err == io.EOF {
- log.Fatal("EOF")
- return
- }
- if err != nil {
- log.Fatal(err.Error())
- return
- }
-
- log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
-
- if err := r.CommitMessages(ctx, msg); err != nil {
- log.Fatalf("failed to commit: %s\n", err.Error())
- continue
- }
-
- if err := handleEvent(config, emailer, msg); err != nil {
- log.Printf("failed to handle event: %s\n", err.Error())
- continue
- }
+ emailer, err := NewEmailer(config.SMTP)
+ if err != nil {
+ logger.Fatal().Err(err)
}
-}
-
-func handleEvent(config Config, emailer *Emailer, msg kafka.Message) error {
- event := SendEmailEvent{}
- if err := json.Unmarshal(msg.Value, &event); err != nil {
- return err
- }
-
- switch string(msg.Key) {
- case "email_forgot_password":
- return emailer.SendRestorePassword(event.Email, event.Token)
- case "email_password_changed":
- return emailer.SendPasswordChanged(event.Email)
- case "email_verify_user":
- link := fmt.Sprintf("%s/verify-user?token=%s", config.App.ServiceUrl, event.Token)
- return emailer.SendVerifyUser(event.Email, link)
- }
-
- return fmt.Errorf("unknown event type")
+
+ metrics := integrations.NewMetrics("notifyer")
+
+ ginRouter := gin.New()
+ ginRouter.GET("/metrics", gin.WrapH(metrics.HttpHandler()))
+ ginRouter.GET("/health", func(ctx *gin.Context) {
+ ctx.Status(200)
+ })
+
+ kafkaReader := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: config.Kafka.Brokers,
+ Topic: config.Kafka.Topic,
+ GroupID: config.Kafka.ConsumerGroupId,
+ })
+ kafkaReader.SetOffset(kafka.LastOffset)
+
+ eventHandler := NewEventHandler(config, logger, metrics, emailer)
+ go eventHandler.eventLoop(ctx, kafkaReader)
+
+ logger.Log().Msg("notifyer service started")
+
+ srv := httpserver.New(
+ httpserver.NewServerOpts{
+ Logger: logger,
+ HttpServer: ginRouter,
+ },
+ )
+ srv.Run(ctx, config.App.Port)
}
diff --git a/cmd/shortlinks/main.go b/cmd/shortlinks/main.go
index 2adbf3d..f9765ca 100644
--- a/cmd/shortlinks/main.go
+++ b/cmd/shortlinks/main.go
@@ -85,16 +85,17 @@ func RunServer(ctx context.Context, log logger.Logger, tracer trace.Tracer, conf
gin.SetMode(gin.ReleaseMode)
}
- prometheus := integrations.NewPrometheus()
+ metrics := integrations.NewMetrics("shortlinks")
+ serverMetrics := httpserver.NewServerMetrics(metrics)
r := gin.New()
- r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler()))
+ r.Any("/metrics", gin.WrapH(metrics.HttpHandler()))
r.GET("/health", func(ctx *gin.Context) {
ctx.Status(200)
})
- r.Use(httpserver.NewRecoveryMiddleware(log, prometheus, debugMode))
- r.Use(httpserver.NewRequestLogMiddleware(log, tracer, prometheus))
+ r.Use(httpserver.NewRecoveryMiddleware(log, serverMetrics, debugMode))
+ r.Use(httpserver.NewRequestLogMiddleware(log, tracer, serverMetrics))
r.Use(httpserver.NewTracingMiddleware(tracer))
linkGroup := r.Group("/s")
diff --git a/internal/http_server/metrics.go b/internal/http_server/metrics.go
new file mode 100644
index 0000000..e024d94
--- /dev/null
+++ b/internal/http_server/metrics.go
@@ -0,0 +1,49 @@
+package httpserver
+
+import (
+ "backend/internal/integrations"
+)
+
+func NewServerMetrics(p *integrations.Metrics) *ServerMetrics {
+ errors5xxCounter := p.NewCounter("server_responses_5xx", "5xx responses counter")
+ errors4xxCounter := p.NewCounter("server_responses_4xx", "4xx responses count")
+ requestsCounter := p.NewCounter("server_requests_total", "requests counter")
+ avgReqTimeHist := p.NewHistogram("server_requests_time", "requests time histogram")
+ panicsHist := p.NewHistogram("server_panics", "panics histogram metric")
+
+ return &ServerMetrics{
+ rpsCounter: requestsCounter,
+ avgReqTimeHist: avgReqTimeHist,
+ panicsHist: panicsHist,
+ errors4xxCounter: errors4xxCounter,
+ errors5xxCounter: errors5xxCounter,
+ }
+}
+
+type ServerMetrics struct {
+ rpsCounter integrations.Counter
+ avgReqTimeHist integrations.Histogram
+ panicsHist integrations.Histogram
+ errors4xxCounter integrations.Counter
+ errors5xxCounter integrations.Counter
+}
+
+func (b *ServerMetrics) AddRequest() {
+ b.rpsCounter.Inc()
+}
+
+func (b *ServerMetrics) AddRequestTime(reqTime float64) {
+ b.avgReqTimeHist.Observe(reqTime)
+}
+
+func (b *ServerMetrics) AddPanic() {
+ b.panicsHist.Observe(1)
+}
+
+func (b *ServerMetrics) Add4xxError() {
+ b.errors4xxCounter.Inc()
+}
+
+func (b *ServerMetrics) Add5xxError() {
+ b.errors5xxCounter.Inc()
+}
diff --git a/internal/http_server/recovery.go b/internal/http_server/recovery.go
index 4611dab..6c5a409 100644
--- a/internal/http_server/recovery.go
+++ b/internal/http_server/recovery.go
@@ -3,7 +3,6 @@ package httpserver
// Modified recovery from gin, use own logger
import (
- "backend/internal/integrations"
"backend/pkg/logger"
"bytes"
"errors"
@@ -30,12 +29,12 @@ var (
slash = []byte("/")
)
-func NewRecoveryMiddleware(logger logger.Logger, prometheus *integrations.Prometheus, debugMode bool) gin.HandlerFunc {
+func NewRecoveryMiddleware(logger logger.Logger, serverMetrics *ServerMetrics, debugMode bool) gin.HandlerFunc {
handle := defaultHandleRecovery
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
- prometheus.AddPanic()
+ serverMetrics.AddPanic()
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
diff --git a/internal/http_server/request_log.go b/internal/http_server/request_log.go
index f3c7e5b..24c9c7a 100644
--- a/internal/http_server/request_log.go
+++ b/internal/http_server/request_log.go
@@ -1,7 +1,6 @@
package httpserver
import (
- "backend/internal/integrations"
log "backend/pkg/logger"
"fmt"
"time"
@@ -11,10 +10,13 @@ import (
"go.opentelemetry.io/otel/trace"
)
-func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus *integrations.Prometheus) gin.HandlerFunc {
+func NewRequestLogMiddleware(
+ logger log.Logger,
+ tracer trace.Tracer,
+ serverMetrics *ServerMetrics,
+) gin.HandlerFunc {
return func(c *gin.Context) {
- prometheus.RequestInc()
- defer prometheus.RequestDec()
+ serverMetrics.AddRequest()
requestId := c.GetHeader("X-Request-Id")
if requestId == "" {
@@ -34,7 +36,7 @@ func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus
c.Next()
latency := time.Since(start)
- prometheus.AddRequestTime(float64(latency.Microseconds()))
+ serverMetrics.AddRequestTime(float64(latency.Microseconds()))
method := c.Request.Method
statusCode := c.Writer.Status()
@@ -49,12 +51,12 @@ func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus
}
if statusCode >= 400 && statusCode < 500 {
- prometheus.Add4xxError()
+ serverMetrics.Add4xxError()
ctxLogger.Warning().Msg(msg)
return
}
- prometheus.Add5xxError()
+ serverMetrics.Add5xxError()
ctxLogger.Error().Msg(msg)
}
}
diff --git a/internal/integrations/prometheus.go b/internal/integrations/prometheus.go
index 9fecfba..31533c6 100644
--- a/internal/integrations/prometheus.go
+++ b/internal/integrations/prometheus.go
@@ -8,90 +8,67 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
-type Prometheus struct {
- reg *prometheus.Registry
- rpsCounter prometheus.Counter
- avgReqTimeHist prometheus.Histogram
- panicsHist prometheus.Histogram
- errors4xxCounter prometheus.Counter
- errors5xxCounter prometheus.Counter
+type Counter interface {
+ Inc()
}
-func NewPrometheus() *Prometheus {
- reg := prometheus.NewRegistry()
+type Gauge interface {
+ Set(float64)
+ Inc()
+ Dec()
+}
- // Add go runtime metrics and process collectors.
- reg.MustRegister(
+type Histogram interface {
+ Observe(float64)
+}
+
+type Metrics struct {
+ registry *prometheus.Registry
+ registerer prometheus.Registerer
+}
+
+func NewMetrics(prefix string) *Metrics {
+ registry := prometheus.NewRegistry()
+ registerer := prometheus.WrapRegistererWithPrefix(prefix, registry)
+
+ registerer.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
- errors5xxCounter := prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "backend_errors_count_5xx",
- Help: "5xx errors count",
- },
- )
- errors4xxCounter := prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "backend_errors_count_4xx",
- Help: "4xx errors count",
- },
- )
- rpsCounter := prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "backend_requests_per_second",
- Help: "Requests per second metric",
- },
- )
- avgReqTimeHist := prometheus.NewHistogram(
- prometheus.HistogramOpts{
- Name: "backend_requests_average_time",
- Help: "Average time of requests",
- },
- )
- panicsHist := prometheus.NewHistogram(
- prometheus.HistogramOpts{
- Name: "backend_panics",
- Help: "Panics histogram metric",
- },
- )
- reg.MustRegister(rpsCounter, avgReqTimeHist, panicsHist, errors4xxCounter, errors5xxCounter)
-
- return &Prometheus{
- panicsHist: panicsHist,
- avgReqTimeHist: avgReqTimeHist,
- rpsCounter: rpsCounter,
- errors4xxCounter: errors4xxCounter,
- errors5xxCounter: errors5xxCounter,
- reg: reg,
+ return &Metrics{
+ registry: registry,
+ registerer: registerer,
}
}
-func (p *Prometheus) GetRequestHandler() http.Handler {
- return promhttp.HandlerFor(p.reg, promhttp.HandlerOpts{Registry: p.reg})
+func (m *Metrics) NewCounter(name, description string) Counter {
+ collector := prometheus.NewCounter(prometheus.CounterOpts{
+ Name: name,
+ Help: description,
+ })
+ m.registerer.MustRegister(collector)
+ return collector
}
-func (p *Prometheus) RequestInc() {
- p.rpsCounter.Inc()
+func (m *Metrics) NewGauge(name, description string) Gauge {
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: name,
+ Help: description,
+ })
+ m.registerer.MustRegister(collector)
+ return collector
}
-func (p *Prometheus) RequestDec() {
- // p.rpsGauge.Dec()
+func (m *Metrics) NewHistogram(name, description string) Histogram {
+ collector := prometheus.NewHistogram(prometheus.HistogramOpts{
+ Name: name,
+ Help: description,
+ })
+ m.registerer.MustRegister(collector)
+ return collector
}
-func (p *Prometheus) AddRequestTime(reqTime float64) {
- p.avgReqTimeHist.Observe(reqTime)
-}
-
-func (p *Prometheus) AddPanic() {
- p.panicsHist.Observe(1)
-}
-
-func (p *Prometheus) Add4xxError() {
- p.errors4xxCounter.Inc()
-}
-
-func (p *Prometheus) Add5xxError() {
- p.errors5xxCounter.Inc()
+func (m *Metrics) HttpHandler() http.Handler {
+ return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{Registry: m.registerer})
}
From 2e70087f63dd24155c2e0445754bfa5330d0f6f4 Mon Sep 17 00:00:00 2001
From: Sergey Chubaryan
Date: Sun, 23 Feb 2025 20:29:39 +0300
Subject: [PATCH 3/5] improvements for notifyer microservice
---
cmd/notifyer/emailer.go | 13 +++++++------
cmd/notifyer/event_handler.go | 3 ++-
internal/core/repos/event_repo.go | 2 +-
internal/http_server/{wrapper.go => gin_wrapper.go} | 0
internal/integrations/kafka.go | 2 +-
5 files changed, 11 insertions(+), 9 deletions(-)
rename internal/http_server/{wrapper.go => gin_wrapper.go} (100%)
diff --git a/cmd/notifyer/emailer.go b/cmd/notifyer/emailer.go
index 7c496c6..71cc508 100644
--- a/cmd/notifyer/emailer.go
+++ b/cmd/notifyer/emailer.go
@@ -14,7 +14,7 @@ const MSG_TEXT = `
{{.Text}}
{{if .Link}}
- Clicklink
+ link
{{end}}