diff --git a/internal/tracerconfig/config.go b/internal/tracerconfig/config.go new file mode 100644 index 0000000..16fd5a0 --- /dev/null +++ b/internal/tracerconfig/config.go @@ -0,0 +1,76 @@ +// Package tracerconfig provides a bridge to eliminate circular dependencies between +// the logger and tracing packages. It stores tracer configuration and provides +// factory functions that can be used by the logger package without importing tracing. +package tracerconfig + +import ( + "context" + "crypto/tls" + "crypto/x509" + "sync" + + sdklog "go.opentelemetry.io/otel/sdk/log" +) + +// GetClientCertificate defines a function type for providing client certificates for mutual TLS. +// This is used when exporting telemetry data to secured OTLP endpoints that require +// client certificate authentication. +type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error) + +// Config provides configuration options for OpenTelemetry tracing setup. +// It supplements standard OpenTelemetry environment variables with additional +// NTP Pool-specific configuration including TLS settings for secure OTLP export. +type Config struct { + ServiceName string // Service name for resource identification (overrides OTEL_SERVICE_NAME) + Environment string // Deployment environment (development, staging, production) + Endpoint string // OTLP endpoint hostname/port (e.g., "otlp.example.com:4317") + EndpointURL string // Complete OTLP endpoint URL (e.g., "https://otlp.example.com:4317/v1/traces") + CertificateProvider GetClientCertificate // Client certificate provider for mutual TLS + RootCAs *x509.CertPool // CA certificate pool for server verification +} + +// ExporterFactory creates an OTLP log exporter using the provided configuration. +// This allows the logger package to create exporters without importing the tracing package. +type ExporterFactory func(context.Context, *Config) (sdklog.Exporter, error) + +// Global state for sharing configuration between packages +var ( + globalConfig *Config + globalContext context.Context + exporterFactory ExporterFactory + configMu sync.RWMutex +) + +// Store saves the tracer configuration and exporter factory for use by other packages. +// This should be called by the tracing package during initialization. +func Store(ctx context.Context, cfg *Config, factory ExporterFactory) { + configMu.Lock() + defer configMu.Unlock() + globalConfig = cfg + globalContext = ctx + exporterFactory = factory +} + +// Get returns the stored tracer configuration, context, and exporter factory. +// Returns nil values if no configuration has been stored yet. +func Get() (*Config, context.Context, ExporterFactory) { + configMu.RLock() + defer configMu.RUnlock() + return globalConfig, globalContext, exporterFactory +} + +// IsConfigured returns true if tracer configuration has been stored. +func IsConfigured() bool { + configMu.RLock() + defer configMu.RUnlock() + return globalConfig != nil && globalContext != nil && exporterFactory != nil +} + +// Clear removes the stored configuration. This is primarily useful for testing. +func Clear() { + configMu.Lock() + defer configMu.Unlock() + globalConfig = nil + globalContext = nil + exporterFactory = nil +} diff --git a/logger/buffering_exporter.go b/logger/buffering_exporter.go new file mode 100644 index 0000000..6d7bdb1 --- /dev/null +++ b/logger/buffering_exporter.go @@ -0,0 +1,198 @@ +package logger + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "go.ntppool.org/common/internal/tracerconfig" + otellog "go.opentelemetry.io/otel/sdk/log" +) + +// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured +type bufferingExporter struct { + mu sync.RWMutex + + // Buffered records while waiting for tracing config + buffer [][]otellog.Record + bufferSize int + maxBuffSize int + + // Real exporter (created when tracing is configured) + exporter otellog.Exporter + + // Thread-safe initialization + initOnce sync.Once + initErr error + + // Background checker + stopChecker chan struct{} + checkerDone chan struct{} +} + +// newBufferingExporter creates a new exporter that buffers logs until tracing is configured +func newBufferingExporter() *bufferingExporter { + e := &bufferingExporter{ + maxBuffSize: 1000, // Max number of batches to buffer + stopChecker: make(chan struct{}), + checkerDone: make(chan struct{}), + } + + // Start background readiness checker + go e.checkReadiness() + + return e +} + +// Export implements otellog.Exporter +func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error { + // Try initialization once + e.initOnce.Do(func() { + e.initErr = e.initialize() + }) + + // If initialization succeeded, use the exporter + if e.initErr == nil { + e.mu.RLock() + exporter := e.exporter + e.mu.RUnlock() + + if exporter != nil { + return exporter.Export(ctx, records) + } + } + + // Not ready yet, buffer the records + return e.bufferRecords(records) +} + +// initialize attempts to create the real OTLP exporter using tracing config +func (e *bufferingExporter) initialize() error { + cfg, ctx, factory := tracerconfig.Get() + if cfg == nil || ctx == nil || factory == nil { + return errors.New("tracer not configured yet") + } + + // Add timeout for initialization + initCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + exporter, err := factory(initCtx, cfg) + if err != nil { + return fmt.Errorf("failed to create OTLP exporter: %w", err) + } + + e.mu.Lock() + e.exporter = exporter + flushErr := e.flushBuffer(initCtx) + e.mu.Unlock() + + if flushErr != nil { + // Log but don't fail initialization + Setup().Warn("buffer flush failed during initialization", "error", flushErr) + } + + return nil +} + +// bufferRecords adds records to the buffer for later processing +func (e *bufferingExporter) bufferRecords(records []otellog.Record) error { + e.mu.Lock() + defer e.mu.Unlock() + + // Buffer the batch if we have space + if e.bufferSize < e.maxBuffSize { + // Clone records to avoid retention issues + cloned := make([]otellog.Record, len(records)) + for i, r := range records { + cloned[i] = r.Clone() + } + e.buffer = append(e.buffer, cloned) + e.bufferSize++ + } + + // Always return success to BatchProcessor + return nil +} + +// checkReadiness periodically checks if tracing is configured +func (e *bufferingExporter) checkReadiness() { + defer close(e.checkerDone) + + ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // If initialization failed, reset sync.Once to allow retry + // The OTLP exporter will handle its own retry logic + if e.initErr != nil { + e.initOnce = sync.Once{} + } else if e.exporter != nil { + return // Exporter ready, checker no longer needed + } + + case <-e.stopChecker: + return + } + } +} + +// flushBuffer sends all buffered batches through the real exporter +func (e *bufferingExporter) flushBuffer(ctx context.Context) error { + if len(e.buffer) == 0 { + return nil + } + + flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + var lastErr error + for _, batch := range e.buffer { + if err := e.exporter.Export(flushCtx, batch); err != nil { + lastErr = err + } + } + + // Clear buffer after flush attempt + e.buffer = nil + e.bufferSize = 0 + + return lastErr +} + +// ForceFlush implements otellog.Exporter +func (e *bufferingExporter) ForceFlush(ctx context.Context) error { + e.mu.RLock() + defer e.mu.RUnlock() + + if e.exporter != nil { + return e.exporter.ForceFlush(ctx) + } + return nil +} + +// Shutdown implements otellog.Exporter +func (e *bufferingExporter) Shutdown(ctx context.Context) error { + // Stop the readiness checker from continuing + close(e.stopChecker) + + // Give one final chance for TLS/tracing to become ready before fully shutting down + e.initOnce.Do(func() { + e.initErr = e.initialize() + }) + + // Wait for readiness checker goroutine to complete + <-e.checkerDone + + e.mu.Lock() + defer e.mu.Unlock() + + if e.exporter != nil { + return e.exporter.Shutdown(ctx) + } + return nil +} diff --git a/logger/logfmt.go b/logger/logfmt.go index 9a81b51..67e54f3 100644 --- a/logger/logfmt.go +++ b/logger/logfmt.go @@ -16,23 +16,28 @@ type logfmt struct { mu sync.Mutex } +// createTextHandlerOptions creates the common slog.HandlerOptions used by all logfmt handlers +func createTextHandlerOptions() *slog.HandlerOptions { + return &slog.HandlerOptions{ + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + if a.Key == slog.TimeKey && len(groups) == 0 { + return slog.Attr{} + } + if a.Key == slog.LevelKey && len(groups) == 0 { + return slog.Attr{} + } + return a + }, + } +} + func newLogFmtHandler(next slog.Handler) slog.Handler { buf := bytes.NewBuffer([]byte{}) h := &logfmt{ buf: buf, next: next, - txt: slog.NewTextHandler(buf, &slog.HandlerOptions{ - ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { - if a.Key == slog.TimeKey && len(groups) == 0 { - return slog.Attr{} - } - if a.Key == slog.LevelKey && len(groups) == 0 { - return slog.Attr{} - } - return a - }, - }), + txt: slog.NewTextHandler(buf, createTextHandlerOptions()), } return h @@ -43,10 +48,11 @@ func (h *logfmt) Enabled(ctx context.Context, lvl slog.Level) bool { } func (h *logfmt) WithAttrs(attrs []slog.Attr) slog.Handler { + buf := bytes.NewBuffer([]byte{}) return &logfmt{ - buf: bytes.NewBuffer([]byte{}), + buf: buf, next: h.next.WithAttrs(slices.Clone(attrs)), - txt: h.txt.WithAttrs(slices.Clone(attrs)), + txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithAttrs(slices.Clone(attrs)), } } @@ -54,10 +60,11 @@ func (h *logfmt) WithGroup(g string) slog.Handler { if g == "" { return h } + buf := bytes.NewBuffer([]byte{}) return &logfmt{ - buf: bytes.NewBuffer([]byte{}), + buf: buf, next: h.next.WithGroup(g), - txt: h.txt.WithGroup(g), + txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithGroup(g), } } @@ -69,10 +76,22 @@ func (h *logfmt) Handle(ctx context.Context, r slog.Record) error { panic("buffer wasn't empty") } - h.txt.Handle(ctx, r) - r.Message = h.buf.String() - r.Message = strings.TrimSuffix(r.Message, "\n") + // Format using text handler to get the formatted message + err := h.txt.Handle(ctx, r) + if err != nil { + return err + } + + formattedMessage := h.buf.String() + formattedMessage = strings.TrimSuffix(formattedMessage, "\n") h.buf.Reset() - return h.next.Handle(ctx, r) + // Create a new record with the formatted message + newRecord := slog.NewRecord(r.Time, r.Level, formattedMessage, r.PC) + r.Attrs(func(a slog.Attr) bool { + newRecord.AddAttrs(a) + return true + }) + + return h.next.Handle(ctx, newRecord) } diff --git a/logger/logger.go b/logger/logger.go index f18ac0a..99961dd 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -29,10 +29,13 @@ import ( "os" "strconv" "sync" + "time" slogtraceid "github.com/remychantenay/slog-otel" slogmulti "github.com/samber/slog-multi" "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/otel/log/global" + otellog "go.opentelemetry.io/otel/sdk/log" ) // ConfigPrefix allows customizing the environment variable prefix for configuration. @@ -85,9 +88,28 @@ func setupStdErrHandler() slog.Handler { func setupOtlpLogger() *slog.Logger { setupOtlp.Do(func() { - otlpLogger = slog.New( - newLogFmtHandler(otelslog.NewHandler("common")), + // Create our buffering exporter + // It will buffer until tracing is configured + bufferingExp := newBufferingExporter() + + // Use BatchProcessor with our custom exporter + processor := otellog.NewBatchProcessor(bufferingExp, + otellog.WithExportInterval(10*time.Second), + otellog.WithMaxQueueSize(2048), + otellog.WithExportMaxBatchSize(512), ) + + // Create logger provider + provider := otellog.NewLoggerProvider( + otellog.WithProcessor(processor), + ) + + // Set global provider + global.SetLoggerProvider(provider) + + // Create slog handler + handler := newLogFmtHandler(otelslog.NewHandler("common")) + otlpLogger = slog.New(handler) }) return otlpLogger } diff --git a/tracing/tracing.go b/tracing/tracing.go index 1fae4e0..d389879 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -41,21 +41,24 @@ import ( "crypto/tls" "crypto/x509" "errors" + "log/slog" "os" "slices" "time" - "go.ntppool.org/common/logger" + "go.ntppool.org/common/internal/tracerconfig" "go.ntppool.org/common/version" "google.golang.org/grpc/credentials" "go.opentelemetry.io/contrib/exporters/autoexport" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - logglobal "go.opentelemetry.io/otel/log/global" + "go.opentelemetry.io/otel/log/global" "go.opentelemetry.io/otel/propagation" sdklog "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/resource" @@ -70,10 +73,26 @@ const ( otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL" otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" + otelExporterOTLPLogsProtoEnvKey = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL" ) var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']") +// createOTLPLogExporter creates an OTLP log exporter using the provided configuration. +// This function is used as the ExporterFactory for the tracerconfig bridge. +func createOTLPLogExporter(ctx context.Context, cfg *tracerconfig.Config) (sdklog.Exporter, error) { + // Convert bridge config to local TracerConfig + tracerCfg := &TracerConfig{ + ServiceName: cfg.ServiceName, + Environment: cfg.Environment, + Endpoint: cfg.Endpoint, + EndpointURL: cfg.EndpointURL, + CertificateProvider: cfg.CertificateProvider, + RootCAs: cfg.RootCAs, + } + return newOTLPLogExporter(ctx, tracerCfg) +} + // https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go // TpShutdownFunc represents a function that gracefully shuts down telemetry providers. @@ -98,10 +117,9 @@ func Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) return Tracer().Start(ctx, spanName, opts...) } -// GetClientCertificate defines a function type for providing client certificates for mutual TLS. -// This is used when exporting telemetry data to secured OTLP endpoints that require -// client certificate authentication. -type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error) +// GetClientCertificate is an alias for the type defined in tracerconfig. +// This maintains backward compatibility for existing code. +type GetClientCertificate = tracerconfig.GetClientCertificate // TracerConfig provides configuration options for OpenTelemetry tracing setup. // It supplements standard OpenTelemetry environment variables with additional @@ -143,7 +161,18 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, cfg = &TracerConfig{} } - log := logger.Setup() + // Store configuration for use by logger package via bridge + bridgeConfig := &tracerconfig.Config{ + ServiceName: cfg.ServiceName, + Environment: cfg.Environment, + Endpoint: cfg.Endpoint, + EndpointURL: cfg.EndpointURL, + CertificateProvider: cfg.CertificateProvider, + RootCAs: cfg.RootCAs, + } + tracerconfig.Store(ctx, bridgeConfig, createOTLPLogExporter) + + log := slog.Default() if serviceName := os.Getenv(svcNameKey); len(serviceName) == 0 { if len(cfg.ServiceName) > 0 { @@ -184,13 +213,21 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, var shutdownFuncs []func(context.Context) error shutdown = func(ctx context.Context) error { + // Force flush the global logger provider before shutting down anything else + if loggerProvider := global.GetLoggerProvider(); loggerProvider != nil { + if sdkProvider, ok := loggerProvider.(*sdklog.LoggerProvider); ok { + if flushErr := sdkProvider.ForceFlush(ctx); flushErr != nil { + log.Warn("logger provider force flush failed", "err", flushErr) + } + } + } + var err error // need to shutdown the providers first, // exporters after which is the opposite // order they are setup. slices.Reverse(shutdownFuncs) for _, fn := range shutdownFuncs { - // log.Warn("shutting down", "fn", fn) err = errors.Join(err, fn(ctx)) } shutdownFuncs = nil @@ -225,13 +262,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, } shutdownFuncs = append(shutdownFuncs, spanExporter.Shutdown) - logExporter, err := autoexport.NewLogExporter(ctx) - if err != nil { - handleErr(err) - return - } - shutdownFuncs = append(shutdownFuncs, logExporter.Shutdown) - // Set up trace provider. tracerProvider, err := newTraceProvider(spanExporter, res) if err != nil { @@ -241,19 +271,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) otel.SetTracerProvider(tracerProvider) - logProvider := sdklog.NewLoggerProvider(sdklog.WithResource(res), - sdklog.WithProcessor( - sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(10)), - ), - ) - - logglobal.SetLoggerProvider(logProvider) - shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { - logProvider.ForceFlush(ctx) - return logProvider.Shutdown(ctx) - }, - ) - if err != nil { handleErr(err) return @@ -263,7 +280,7 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, } func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExporter, error) { - log := logger.Setup() + log := slog.Default() var tlsConfig *tls.Config @@ -330,6 +347,66 @@ func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExpor return exporter, err } +func newOTLPLogExporter(ctx context.Context, cfg *TracerConfig) (sdklog.Exporter, error) { + log := slog.Default() + + var tlsConfig *tls.Config + + if cfg.CertificateProvider != nil { + tlsConfig = &tls.Config{ + GetClientCertificate: cfg.CertificateProvider, + RootCAs: cfg.RootCAs, + } + } + + proto := os.Getenv(otelExporterOTLPLogsProtoEnvKey) + if proto == "" { + proto = os.Getenv(otelExporterOTLPProtoEnvKey) + } + + // Fallback to default, http/protobuf. + if proto == "" { + proto = "http/protobuf" + } + + switch proto { + case "grpc": + opts := []otlploggrpc.Option{ + otlploggrpc.WithCompressor("gzip"), + } + if tlsConfig != nil { + opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) + } + if len(cfg.Endpoint) > 0 { + log.Info("adding log option", "Endpoint", cfg.Endpoint) + opts = append(opts, otlploggrpc.WithEndpoint(cfg.Endpoint)) + } + if len(cfg.EndpointURL) > 0 { + log.Info("adding log option", "EndpointURL", cfg.EndpointURL) + opts = append(opts, otlploggrpc.WithEndpointURL(cfg.EndpointURL)) + } + + return otlploggrpc.New(ctx, opts...) + case "http/protobuf", "http/json": + opts := []otlploghttp.Option{ + otlploghttp.WithCompression(otlploghttp.GzipCompression), + } + if tlsConfig != nil { + opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig)) + } + if len(cfg.Endpoint) > 0 { + opts = append(opts, otlploghttp.WithEndpoint(cfg.Endpoint)) + } + if len(cfg.EndpointURL) > 0 { + opts = append(opts, otlploghttp.WithEndpointURL(cfg.EndpointURL)) + } + + return otlploghttp.New(ctx, opts...) + default: + return nil, errInvalidOTLPProtocol + } +} + func newTraceProvider(traceExporter sdktrace.SpanExporter, res *resource.Resource) (*sdktrace.TracerProvider, error) { traceProvider := sdktrace.NewTracerProvider( sdktrace.WithResource(res),