common/logger/buffering_exporter.go
Ask Bjørn Hansen a774f92bf7 fix(logger): prevent mutex crash in bufferingExporter
Remove sync.Once reset that caused "unlock of unlocked mutex" panic.
Redesign initialization to use only checkReadiness goroutine for
retry attempts, eliminating race condition while preserving retry
functionality for TLS/tracing setup delays.
2025-08-02 22:55:57 -07:00

205 lines
4.5 KiB
Go

package logger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"go.ntppool.org/common/internal/tracerconfig"
otellog "go.opentelemetry.io/otel/sdk/log"
)
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
type bufferingExporter struct {
mu sync.RWMutex
// Buffered records while waiting for tracing config
buffer [][]otellog.Record
bufferSize int
maxBuffSize int
// Real exporter (created when tracing is configured)
exporter otellog.Exporter
// Thread-safe initialization state (managed only by checkReadiness)
initErr error
// Background checker
stopChecker chan struct{}
checkerDone chan struct{}
}
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
func newBufferingExporter() *bufferingExporter {
e := &bufferingExporter{
maxBuffSize: 1000, // Max number of batches to buffer
stopChecker: make(chan struct{}),
checkerDone: make(chan struct{}),
}
// Start background readiness checker
go e.checkReadiness()
return e
}
// Export implements otellog.Exporter
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
// Check if exporter is ready (initialization handled by checkReadiness goroutine)
e.mu.RLock()
exporter := e.exporter
e.mu.RUnlock()
if exporter != nil {
return exporter.Export(ctx, records)
}
// Not ready yet, buffer the records
return e.bufferRecords(records)
}
// initialize attempts to create the real OTLP exporter using tracing config
func (e *bufferingExporter) initialize() error {
cfg, ctx, factory := tracerconfig.Get()
if cfg == nil || ctx == nil || factory == nil {
return errors.New("tracer not configured yet")
}
// Add timeout for initialization
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
exporter, err := factory(initCtx, cfg)
if err != nil {
return fmt.Errorf("failed to create OTLP exporter: %w", err)
}
e.mu.Lock()
e.exporter = exporter
flushErr := e.flushBuffer(initCtx)
e.mu.Unlock()
if flushErr != nil {
// Log but don't fail initialization
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
}
return nil
}
// bufferRecords adds records to the buffer for later processing
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
e.mu.Lock()
defer e.mu.Unlock()
// Buffer the batch if we have space
if e.bufferSize < e.maxBuffSize {
// Clone records to avoid retention issues
cloned := make([]otellog.Record, len(records))
for i, r := range records {
cloned[i] = r.Clone()
}
e.buffer = append(e.buffer, cloned)
e.bufferSize++
}
// Always return success to BatchProcessor
return nil
}
// checkReadiness periodically attempts initialization until successful
func (e *bufferingExporter) checkReadiness() {
defer close(e.checkerDone)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Check if we already have a working exporter
e.mu.RLock()
hasExporter := e.exporter != nil
e.mu.RUnlock()
if hasExporter {
return // Exporter ready, checker no longer needed
}
// Try to initialize
err := e.initialize()
e.mu.Lock()
e.initErr = err
e.mu.Unlock()
case <-e.stopChecker:
return
}
}
}
// flushBuffer sends all buffered batches through the real exporter
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
if len(e.buffer) == 0 {
return nil
}
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var lastErr error
for _, batch := range e.buffer {
if err := e.exporter.Export(flushCtx, batch); err != nil {
lastErr = err
}
}
// Clear buffer after flush attempt
e.buffer = nil
e.bufferSize = 0
return lastErr
}
// ForceFlush implements otellog.Exporter
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if e.exporter != nil {
return e.exporter.ForceFlush(ctx)
}
return nil
}
// Shutdown implements otellog.Exporter
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
// Stop the readiness checker from continuing
close(e.stopChecker)
// Wait for readiness checker goroutine to complete
<-e.checkerDone
// Give one final chance for TLS/tracing to become ready for buffer flushing
e.mu.RLock()
hasExporter := e.exporter != nil
e.mu.RUnlock()
if !hasExporter {
err := e.initialize()
e.mu.Lock()
e.initErr = err
e.mu.Unlock()
}
e.mu.Lock()
defer e.mu.Unlock()
if e.exporter != nil {
return e.exporter.Shutdown(ctx)
}
return nil
}