diff --git a/src/logger/bufio_wrapper.go b/src/logger/bufio_wrapper.go index a87f63d..feca55d 100644 --- a/src/logger/bufio_wrapper.go +++ b/src/logger/bufio_wrapper.go @@ -2,32 +2,68 @@ package logger import ( "bufio" + "context" + "io" "sync" + "time" ) +const FlushInterval = 500 * time.Millisecond + type bufioWrapper struct { - *bufio.Writer - m *sync.RWMutex + writer *bufio.Writer + ticker *time.Ticker + mutex *sync.RWMutex +} + +func newWrapper(writer io.Writer) *bufioWrapper { + ticker := time.NewTicker(FlushInterval) + ticker.Stop() + + return &bufioWrapper{ + writer: bufio.NewWriterSize(writer, 128*1024), + mutex: &sync.RWMutex{}, + ticker: ticker, + } +} + +func (b *bufioWrapper) FlushRoutine(ctx context.Context) { + go func() { + b.ticker.Reset(FlushInterval) + defer b.ticker.Stop() + + for { + b.flush() + + select { + case <-ctx.Done(): + b.flush() + return + case <-b.ticker.C: + } + } + }() } func (b *bufioWrapper) Write(p []byte) (nn int, err error) { // TODO: try replace mutex, improve logging perfomance - b.m.RLock() - defer b.m.RUnlock() + b.mutex.RLock() + defer b.mutex.RUnlock() - return b.Writer.Write(p) + if len(p) > b.writer.Available() { + b.ticker.Reset(FlushInterval) + } + + return b.writer.Write(p) } -func (b *bufioWrapper) Flush() error { - b.m.Lock() - defer b.m.Unlock() +func (b *bufioWrapper) flush() error { + b.mutex.Lock() + defer b.mutex.Unlock() - return b.Writer.Flush() + return b.writer.Flush() } func (b *bufioWrapper) Close() error { - b.m.Lock() - defer b.m.Unlock() - - return b.Writer.Flush() + return b.flush() } diff --git a/src/logger/new.go b/src/logger/new.go index 2840358..3fddd3f 100644 --- a/src/logger/new.go +++ b/src/logger/new.go @@ -1,12 +1,9 @@ package logger import ( - "bufio" "context" "io" "os" - "sync" - "time" "github.com/rs/zerolog" ) @@ -25,7 +22,7 @@ type NewLoggerOpts struct { // OutputStream OutputStream } -func New(opts NewLoggerOpts) (Logger, error) { +func New(ctx context.Context, opts NewLoggerOpts) (Logger, error) { // TODO: pass output streams from opts writers := []io.Writer{} writers = append(writers, os.Stderr) @@ -45,26 +42,9 @@ func New(opts NewLoggerOpts) (Logger, error) { } // TODO: move to wrapper, determine optimal buffer size - writer := bufio.NewWriterSize(io.MultiWriter(writers...), 8*1024) - wrapper := &bufioWrapper{writer, &sync.RWMutex{}} - - // Periodically flush buffer - go func() { - // TODO: add cooldown if flush was triggered by overfow - tmr := time.NewTicker(500 * time.Millisecond) - defer tmr.Stop() - - for { - wrapper.Flush() - - select { - case <-context.Background().Done(): - wrapper.Flush() - return - case <-tmr.C: - } - } - }() + writer := io.MultiWriter(writers...) + wrapper := newWrapper(writer) + wrapper.FlushRoutine(ctx) l := zerolog.New(wrapper).Level(level).With().Timestamp().Logger() return &logger{