Add BearerTokenFunc to support dynamic bearer token authentication for OTLP gRPC exporters. Tokens are injected via gRPC PerRPCCredentials on each export request. - Add BearerTokenFunc type and Config field in tracerconfig - Implement bearerCredentials (gRPC) and bearerRoundTripper (HTTP) - Wire bearer auth into all three gRPC exporter creation functions - Add token verification before flushing buffered logs - Fix race condition in buffering exporter initialization Note: HTTP exporters don't support dynamic bearer tokens due to OpenTelemetry SDK limitations (no WithHTTPClient option). Use gRPC protocol for dynamic tokens.
273 lines
6.6 KiB
Go
273 lines
6.6 KiB
Go
package logger
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.ntppool.org/common/internal/tracerconfig"
|
|
otellog "go.opentelemetry.io/otel/sdk/log"
|
|
)
|
|
|
|
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
|
|
type bufferingExporter struct {
|
|
mu sync.RWMutex
|
|
|
|
// Buffered records while waiting for tracing config
|
|
buffer [][]otellog.Record
|
|
bufferSize int
|
|
maxBuffSize int
|
|
|
|
// Real exporter (created when tracing is configured)
|
|
exporter otellog.Exporter
|
|
|
|
// Track whether buffer has been flushed (separate from exporter creation)
|
|
bufferFlushed bool
|
|
|
|
// Thread-safe initialization state (managed only by checkReadiness)
|
|
initErr error
|
|
|
|
// Background checker
|
|
stopChecker chan struct{}
|
|
checkerDone chan struct{}
|
|
}
|
|
|
|
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
|
|
func newBufferingExporter() *bufferingExporter {
|
|
e := &bufferingExporter{
|
|
maxBuffSize: 1000, // Max number of batches to buffer
|
|
stopChecker: make(chan struct{}),
|
|
checkerDone: make(chan struct{}),
|
|
}
|
|
|
|
// Start background readiness checker
|
|
go e.checkReadiness()
|
|
|
|
return e
|
|
}
|
|
|
|
// Export implements otellog.Exporter
|
|
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
|
// Check if exporter is ready (initialization handled by checkReadiness goroutine)
|
|
e.mu.RLock()
|
|
exporter := e.exporter
|
|
e.mu.RUnlock()
|
|
|
|
if exporter != nil {
|
|
return exporter.Export(ctx, records)
|
|
}
|
|
|
|
// Not ready yet, buffer the records
|
|
return e.bufferRecords(records)
|
|
}
|
|
|
|
// initialize attempts to create the real OTLP exporter using tracing config
|
|
func (e *bufferingExporter) initialize() error {
|
|
cfg, ctx, factory := tracerconfig.Get()
|
|
if cfg == nil || ctx == nil || factory == nil {
|
|
return errors.New("tracer not configured yet")
|
|
}
|
|
|
|
// Add timeout for initialization
|
|
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
e.mu.RLock()
|
|
hasExporter := e.exporter != nil
|
|
e.mu.RUnlock()
|
|
|
|
// Create exporter if not already created
|
|
if !hasExporter {
|
|
exporter, err := factory(initCtx, cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create OTLP exporter: %w", err)
|
|
}
|
|
e.mu.Lock()
|
|
// Double-check: another goroutine may have created it while we were waiting
|
|
if e.exporter == nil {
|
|
e.exporter = exporter
|
|
} else {
|
|
// Another goroutine beat us, close the one we created
|
|
_ = exporter.Shutdown(context.Background())
|
|
}
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
// Check if we can flush (token verification if configured)
|
|
if !e.canFlush(initCtx, cfg, false) {
|
|
return errors.New("waiting for token authentication")
|
|
}
|
|
|
|
e.mu.Lock()
|
|
if !e.bufferFlushed {
|
|
flushErr := e.flushBuffer(initCtx)
|
|
if flushErr != nil {
|
|
e.mu.Unlock()
|
|
// Log but don't fail initialization
|
|
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
|
|
return nil
|
|
}
|
|
e.bufferFlushed = true
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// canFlush checks if we're ready to flush buffered logs.
|
|
// If BearerTokenFunc is configured, it must return without error.
|
|
// If forceFlush is true (during shutdown with cancelled context), skip token check.
|
|
func (e *bufferingExporter) canFlush(ctx context.Context, cfg *tracerconfig.Config, forceFlush bool) bool {
|
|
if cfg.BearerTokenFunc == nil {
|
|
return true // No token auth configured, can flush immediately
|
|
}
|
|
|
|
if forceFlush {
|
|
return true // During shutdown, proceed with best-effort flush
|
|
}
|
|
|
|
// Check if token is available (call returns without error)
|
|
_, err := cfg.BearerTokenFunc(ctx)
|
|
return err == nil
|
|
}
|
|
|
|
// bufferRecords adds records to the buffer for later processing
|
|
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
// Buffer the batch if we have space
|
|
if e.bufferSize < e.maxBuffSize {
|
|
// Clone records to avoid retention issues
|
|
cloned := make([]otellog.Record, len(records))
|
|
for i, r := range records {
|
|
cloned[i] = r.Clone()
|
|
}
|
|
e.buffer = append(e.buffer, cloned)
|
|
e.bufferSize++
|
|
}
|
|
|
|
// Always return success to BatchProcessor
|
|
return nil
|
|
}
|
|
|
|
// checkReadiness periodically attempts initialization until successful
|
|
func (e *bufferingExporter) checkReadiness() {
|
|
defer close(e.checkerDone)
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Check if we're fully ready (exporter created AND buffer flushed)
|
|
e.mu.RLock()
|
|
fullyReady := e.exporter != nil && e.bufferFlushed
|
|
e.mu.RUnlock()
|
|
|
|
if fullyReady {
|
|
return // Fully initialized, checker no longer needed
|
|
}
|
|
|
|
// Try to initialize (creates exporter and flushes if token ready)
|
|
err := e.initialize()
|
|
e.mu.Lock()
|
|
e.initErr = err
|
|
e.mu.Unlock()
|
|
|
|
case <-e.stopChecker:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushBuffer sends all buffered batches through the real exporter
|
|
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
|
|
if len(e.buffer) == 0 {
|
|
return nil
|
|
}
|
|
|
|
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
var lastErr error
|
|
for _, batch := range e.buffer {
|
|
if err := e.exporter.Export(flushCtx, batch); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
// Clear buffer after flush attempt
|
|
e.buffer = nil
|
|
e.bufferSize = 0
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// ForceFlush implements otellog.Exporter
|
|
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
if e.exporter != nil {
|
|
return e.exporter.ForceFlush(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Shutdown implements otellog.Exporter
|
|
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
|
|
// Stop the readiness checker from continuing
|
|
close(e.stopChecker)
|
|
|
|
// Wait for readiness checker goroutine to complete
|
|
<-e.checkerDone
|
|
|
|
cfg, _, _ := tracerconfig.Get()
|
|
|
|
// Check if context is cancelled for best-effort flush
|
|
forceFlush := ctx.Err() != nil
|
|
|
|
// Give one final chance for TLS/tracing to become ready for buffer flushing
|
|
e.mu.RLock()
|
|
hasExporter := e.exporter != nil
|
|
bufferFlushed := e.bufferFlushed
|
|
e.mu.RUnlock()
|
|
|
|
if !hasExporter {
|
|
err := e.initialize()
|
|
e.mu.Lock()
|
|
e.initErr = err
|
|
hasExporter = e.exporter != nil
|
|
bufferFlushed = e.bufferFlushed
|
|
e.mu.Unlock()
|
|
}
|
|
|
|
// If exporter exists but buffer not flushed, try to flush now
|
|
if hasExporter && !bufferFlushed {
|
|
canFlushNow := cfg == nil || e.canFlush(ctx, cfg, forceFlush)
|
|
if canFlushNow {
|
|
e.mu.Lock()
|
|
if !e.bufferFlushed {
|
|
flushErr := e.flushBuffer(ctx)
|
|
if flushErr != nil {
|
|
Setup().Warn("buffer flush failed during shutdown", "error", flushErr)
|
|
}
|
|
e.bufferFlushed = true
|
|
}
|
|
e.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.exporter != nil {
|
|
return e.exporter.Shutdown(ctx)
|
|
}
|
|
return nil
|
|
}
|