Files
common/logger/buffering_exporter.go
Ask Bjørn Hansen 1df4b0d4b4 feat(tracing): add bearer token authentication for OTLP exporters
Add BearerTokenFunc to support dynamic bearer token authentication
for OTLP exporters. Tokens are injected per-request via gRPC
PerRPCCredentials and HTTP custom RoundTripper.

- Add BearerTokenFunc type and Config field in tracerconfig
- Implement bearerCredentials (gRPC) and bearerRoundTripper (HTTP)
- Wire bearer auth into all exporter creation functions
- Add getHTTPClient helper for DRY HTTP client configuration
- Upgrade OpenTelemetry SDK to v1.39.0 for WithHTTPClient support
2026-01-01 03:39:01 -08:00

273 lines
6.6 KiB
Go

package logger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"go.ntppool.org/common/internal/tracerconfig"
otellog "go.opentelemetry.io/otel/sdk/log"
)
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
type bufferingExporter struct {
mu sync.RWMutex
// Buffered records while waiting for tracing config
buffer [][]otellog.Record
bufferSize int
maxBuffSize int
// Real exporter (created when tracing is configured)
exporter otellog.Exporter
// Track whether buffer has been flushed (separate from exporter creation)
bufferFlushed bool
// Thread-safe initialization state (managed only by checkReadiness)
initErr error
// Background checker
stopChecker chan struct{}
checkerDone chan struct{}
}
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
func newBufferingExporter() *bufferingExporter {
e := &bufferingExporter{
maxBuffSize: 1000, // Max number of batches to buffer
stopChecker: make(chan struct{}),
checkerDone: make(chan struct{}),
}
// Start background readiness checker
go e.checkReadiness()
return e
}
// Export implements otellog.Exporter
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
// Check if exporter is ready (initialization handled by checkReadiness goroutine)
e.mu.RLock()
exporter := e.exporter
e.mu.RUnlock()
if exporter != nil {
return exporter.Export(ctx, records)
}
// Not ready yet, buffer the records
return e.bufferRecords(records)
}
// initialize attempts to create the real OTLP exporter using tracing config
func (e *bufferingExporter) initialize() error {
cfg, ctx, factory := tracerconfig.Get()
if cfg == nil || ctx == nil || factory == nil {
return errors.New("tracer not configured yet")
}
// Add timeout for initialization
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
e.mu.RLock()
hasExporter := e.exporter != nil
e.mu.RUnlock()
// Create exporter if not already created
if !hasExporter {
exporter, err := factory(initCtx, cfg)
if err != nil {
return fmt.Errorf("failed to create OTLP exporter: %w", err)
}
e.mu.Lock()
// Double-check: another goroutine may have created it while we were waiting
if e.exporter == nil {
e.exporter = exporter
} else {
// Another goroutine beat us, close the one we created
_ = exporter.Shutdown(context.Background())
}
e.mu.Unlock()
}
// Check if we can flush (token verification if configured)
if !e.canFlush(initCtx, cfg, false) {
return errors.New("waiting for token authentication")
}
e.mu.Lock()
if !e.bufferFlushed {
flushErr := e.flushBuffer(initCtx)
if flushErr != nil {
e.mu.Unlock()
// Log but don't fail initialization
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
return nil
}
e.bufferFlushed = true
}
e.mu.Unlock()
return nil
}
// canFlush checks if we're ready to flush buffered logs.
// If BearerTokenFunc is configured, it must return without error.
// If forceFlush is true (during shutdown with cancelled context), skip token check.
func (e *bufferingExporter) canFlush(ctx context.Context, cfg *tracerconfig.Config, forceFlush bool) bool {
if cfg.BearerTokenFunc == nil {
return true // No token auth configured, can flush immediately
}
if forceFlush {
return true // During shutdown, proceed with best-effort flush
}
// Check if token is available (call returns without error)
_, err := cfg.BearerTokenFunc(ctx)
return err == nil
}
// bufferRecords adds records to the buffer for later processing
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
e.mu.Lock()
defer e.mu.Unlock()
// Buffer the batch if we have space
if e.bufferSize < e.maxBuffSize {
// Clone records to avoid retention issues
cloned := make([]otellog.Record, len(records))
for i, r := range records {
cloned[i] = r.Clone()
}
e.buffer = append(e.buffer, cloned)
e.bufferSize++
}
// Always return success to BatchProcessor
return nil
}
// checkReadiness periodically attempts initialization until successful
func (e *bufferingExporter) checkReadiness() {
defer close(e.checkerDone)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Check if we're fully ready (exporter created AND buffer flushed)
e.mu.RLock()
fullyReady := e.exporter != nil && e.bufferFlushed
e.mu.RUnlock()
if fullyReady {
return // Fully initialized, checker no longer needed
}
// Try to initialize (creates exporter and flushes if token ready)
err := e.initialize()
e.mu.Lock()
e.initErr = err
e.mu.Unlock()
case <-e.stopChecker:
return
}
}
}
// flushBuffer sends all buffered batches through the real exporter
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
if len(e.buffer) == 0 {
return nil
}
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var lastErr error
for _, batch := range e.buffer {
if err := e.exporter.Export(flushCtx, batch); err != nil {
lastErr = err
}
}
// Clear buffer after flush attempt
e.buffer = nil
e.bufferSize = 0
return lastErr
}
// ForceFlush implements otellog.Exporter
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if e.exporter != nil {
return e.exporter.ForceFlush(ctx)
}
return nil
}
// Shutdown implements otellog.Exporter
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
// Stop the readiness checker from continuing
close(e.stopChecker)
// Wait for readiness checker goroutine to complete
<-e.checkerDone
cfg, _, _ := tracerconfig.Get()
// Check if context is cancelled for best-effort flush
forceFlush := ctx.Err() != nil
// Give one final chance for TLS/tracing to become ready for buffer flushing
e.mu.RLock()
hasExporter := e.exporter != nil
bufferFlushed := e.bufferFlushed
e.mu.RUnlock()
if !hasExporter {
err := e.initialize()
e.mu.Lock()
e.initErr = err
hasExporter = e.exporter != nil
bufferFlushed = e.bufferFlushed
e.mu.Unlock()
}
// If exporter exists but buffer not flushed, try to flush now
if hasExporter && !bufferFlushed {
canFlushNow := cfg == nil || e.canFlush(ctx, cfg, forceFlush)
if canFlushNow {
e.mu.Lock()
if !e.bufferFlushed {
flushErr := e.flushBuffer(ctx)
if flushErr != nil {
Setup().Warn("buffer flush failed during shutdown", "error", flushErr)
}
e.bufferFlushed = true
}
e.mu.Unlock()
}
}
e.mu.Lock()
defer e.mu.Unlock()
if e.exporter != nil {
return e.exporter.Shutdown(ctx)
}
return nil
}