Add buffering exporter to queue OTLP logs until tracing is configured. Support TLS configuration for OpenTelemetry log export with client certificate authentication. Improve logfmt formatting and tracing setup.
199 lines
4.5 KiB
Go
199 lines
4.5 KiB
Go
package logger
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.ntppool.org/common/internal/tracerconfig"
|
|
otellog "go.opentelemetry.io/otel/sdk/log"
|
|
)
|
|
|
|
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
|
|
type bufferingExporter struct {
|
|
mu sync.RWMutex
|
|
|
|
// Buffered records while waiting for tracing config
|
|
buffer [][]otellog.Record
|
|
bufferSize int
|
|
maxBuffSize int
|
|
|
|
// Real exporter (created when tracing is configured)
|
|
exporter otellog.Exporter
|
|
|
|
// Thread-safe initialization
|
|
initOnce sync.Once
|
|
initErr error
|
|
|
|
// Background checker
|
|
stopChecker chan struct{}
|
|
checkerDone chan struct{}
|
|
}
|
|
|
|
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
|
|
func newBufferingExporter() *bufferingExporter {
|
|
e := &bufferingExporter{
|
|
maxBuffSize: 1000, // Max number of batches to buffer
|
|
stopChecker: make(chan struct{}),
|
|
checkerDone: make(chan struct{}),
|
|
}
|
|
|
|
// Start background readiness checker
|
|
go e.checkReadiness()
|
|
|
|
return e
|
|
}
|
|
|
|
// Export implements otellog.Exporter
|
|
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
|
// Try initialization once
|
|
e.initOnce.Do(func() {
|
|
e.initErr = e.initialize()
|
|
})
|
|
|
|
// If initialization succeeded, use the exporter
|
|
if e.initErr == nil {
|
|
e.mu.RLock()
|
|
exporter := e.exporter
|
|
e.mu.RUnlock()
|
|
|
|
if exporter != nil {
|
|
return exporter.Export(ctx, records)
|
|
}
|
|
}
|
|
|
|
// Not ready yet, buffer the records
|
|
return e.bufferRecords(records)
|
|
}
|
|
|
|
// initialize attempts to create the real OTLP exporter using tracing config
|
|
func (e *bufferingExporter) initialize() error {
|
|
cfg, ctx, factory := tracerconfig.Get()
|
|
if cfg == nil || ctx == nil || factory == nil {
|
|
return errors.New("tracer not configured yet")
|
|
}
|
|
|
|
// Add timeout for initialization
|
|
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
exporter, err := factory(initCtx, cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create OTLP exporter: %w", err)
|
|
}
|
|
|
|
e.mu.Lock()
|
|
e.exporter = exporter
|
|
flushErr := e.flushBuffer(initCtx)
|
|
e.mu.Unlock()
|
|
|
|
if flushErr != nil {
|
|
// Log but don't fail initialization
|
|
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// bufferRecords adds records to the buffer for later processing
|
|
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
// Buffer the batch if we have space
|
|
if e.bufferSize < e.maxBuffSize {
|
|
// Clone records to avoid retention issues
|
|
cloned := make([]otellog.Record, len(records))
|
|
for i, r := range records {
|
|
cloned[i] = r.Clone()
|
|
}
|
|
e.buffer = append(e.buffer, cloned)
|
|
e.bufferSize++
|
|
}
|
|
|
|
// Always return success to BatchProcessor
|
|
return nil
|
|
}
|
|
|
|
// checkReadiness periodically checks if tracing is configured
|
|
func (e *bufferingExporter) checkReadiness() {
|
|
defer close(e.checkerDone)
|
|
|
|
ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// If initialization failed, reset sync.Once to allow retry
|
|
// The OTLP exporter will handle its own retry logic
|
|
if e.initErr != nil {
|
|
e.initOnce = sync.Once{}
|
|
} else if e.exporter != nil {
|
|
return // Exporter ready, checker no longer needed
|
|
}
|
|
|
|
case <-e.stopChecker:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushBuffer sends all buffered batches through the real exporter
|
|
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
|
|
if len(e.buffer) == 0 {
|
|
return nil
|
|
}
|
|
|
|
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
var lastErr error
|
|
for _, batch := range e.buffer {
|
|
if err := e.exporter.Export(flushCtx, batch); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
// Clear buffer after flush attempt
|
|
e.buffer = nil
|
|
e.bufferSize = 0
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// ForceFlush implements otellog.Exporter
|
|
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
if e.exporter != nil {
|
|
return e.exporter.ForceFlush(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Shutdown implements otellog.Exporter
|
|
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
|
|
// Stop the readiness checker from continuing
|
|
close(e.stopChecker)
|
|
|
|
// Give one final chance for TLS/tracing to become ready before fully shutting down
|
|
e.initOnce.Do(func() {
|
|
e.initErr = e.initialize()
|
|
})
|
|
|
|
// Wait for readiness checker goroutine to complete
|
|
<-e.checkerDone
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.exporter != nil {
|
|
return e.exporter.Shutdown(ctx)
|
|
}
|
|
return nil
|
|
}
|