feat(logger): add buffering exporter with TLS support for OTLP logs

Add buffering exporter to queue OTLP logs until tracing is configured.
Support TLS configuration for OpenTelemetry log export with client
certificate authentication. Improve logfmt formatting and tracing setup.
This commit is contained in:
2025-07-27 16:36:18 -07:00
parent da13a371b4
commit 6a3bc7bab3
5 changed files with 442 additions and 50 deletions

View File

@@ -41,21 +41,24 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"log/slog"
"os"
"slices"
"time"
"go.ntppool.org/common/logger"
"go.ntppool.org/common/internal/tracerconfig"
"go.ntppool.org/common/version"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/contrib/exporters/autoexport"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
logglobal "go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/resource"
@@ -70,10 +73,26 @@ const (
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
otelExporterOTLPLogsProtoEnvKey = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL"
)
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
// createOTLPLogExporter creates an OTLP log exporter using the provided configuration.
// This function is used as the ExporterFactory for the tracerconfig bridge.
func createOTLPLogExporter(ctx context.Context, cfg *tracerconfig.Config) (sdklog.Exporter, error) {
// Convert bridge config to local TracerConfig
tracerCfg := &TracerConfig{
ServiceName: cfg.ServiceName,
Environment: cfg.Environment,
Endpoint: cfg.Endpoint,
EndpointURL: cfg.EndpointURL,
CertificateProvider: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
return newOTLPLogExporter(ctx, tracerCfg)
}
// https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go
// TpShutdownFunc represents a function that gracefully shuts down telemetry providers.
@@ -98,10 +117,9 @@ func Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption)
return Tracer().Start(ctx, spanName, opts...)
}
// GetClientCertificate defines a function type for providing client certificates for mutual TLS.
// This is used when exporting telemetry data to secured OTLP endpoints that require
// client certificate authentication.
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
// GetClientCertificate is an alias for the type defined in tracerconfig.
// This maintains backward compatibility for existing code.
type GetClientCertificate = tracerconfig.GetClientCertificate
// TracerConfig provides configuration options for OpenTelemetry tracing setup.
// It supplements standard OpenTelemetry environment variables with additional
@@ -143,7 +161,18 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
cfg = &TracerConfig{}
}
log := logger.Setup()
// Store configuration for use by logger package via bridge
bridgeConfig := &tracerconfig.Config{
ServiceName: cfg.ServiceName,
Environment: cfg.Environment,
Endpoint: cfg.Endpoint,
EndpointURL: cfg.EndpointURL,
CertificateProvider: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
tracerconfig.Store(ctx, bridgeConfig, createOTLPLogExporter)
log := slog.Default()
if serviceName := os.Getenv(svcNameKey); len(serviceName) == 0 {
if len(cfg.ServiceName) > 0 {
@@ -184,13 +213,21 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
var shutdownFuncs []func(context.Context) error
shutdown = func(ctx context.Context) error {
// Force flush the global logger provider before shutting down anything else
if loggerProvider := global.GetLoggerProvider(); loggerProvider != nil {
if sdkProvider, ok := loggerProvider.(*sdklog.LoggerProvider); ok {
if flushErr := sdkProvider.ForceFlush(ctx); flushErr != nil {
log.Warn("logger provider force flush failed", "err", flushErr)
}
}
}
var err error
// need to shutdown the providers first,
// exporters after which is the opposite
// order they are setup.
slices.Reverse(shutdownFuncs)
for _, fn := range shutdownFuncs {
// log.Warn("shutting down", "fn", fn)
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
@@ -225,13 +262,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
}
shutdownFuncs = append(shutdownFuncs, spanExporter.Shutdown)
logExporter, err := autoexport.NewLogExporter(ctx)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, logExporter.Shutdown)
// Set up trace provider.
tracerProvider, err := newTraceProvider(spanExporter, res)
if err != nil {
@@ -241,19 +271,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)
logProvider := sdklog.NewLoggerProvider(sdklog.WithResource(res),
sdklog.WithProcessor(
sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(10)),
),
)
logglobal.SetLoggerProvider(logProvider)
shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error {
logProvider.ForceFlush(ctx)
return logProvider.Shutdown(ctx)
},
)
if err != nil {
handleErr(err)
return
@@ -263,7 +280,7 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
}
func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExporter, error) {
log := logger.Setup()
log := slog.Default()
var tlsConfig *tls.Config
@@ -330,6 +347,66 @@ func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExpor
return exporter, err
}
func newOTLPLogExporter(ctx context.Context, cfg *TracerConfig) (sdklog.Exporter, error) {
log := slog.Default()
var tlsConfig *tls.Config
if cfg.CertificateProvider != nil {
tlsConfig = &tls.Config{
GetClientCertificate: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
}
proto := os.Getenv(otelExporterOTLPLogsProtoEnvKey)
if proto == "" {
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
}
// Fallback to default, http/protobuf.
if proto == "" {
proto = "http/protobuf"
}
switch proto {
case "grpc":
opts := []otlploggrpc.Option{
otlploggrpc.WithCompressor("gzip"),
}
if tlsConfig != nil {
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
if len(cfg.Endpoint) > 0 {
log.Info("adding log option", "Endpoint", cfg.Endpoint)
opts = append(opts, otlploggrpc.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
log.Info("adding log option", "EndpointURL", cfg.EndpointURL)
opts = append(opts, otlploggrpc.WithEndpointURL(cfg.EndpointURL))
}
return otlploggrpc.New(ctx, opts...)
case "http/protobuf", "http/json":
opts := []otlploghttp.Option{
otlploghttp.WithCompression(otlploghttp.GzipCompression),
}
if tlsConfig != nil {
opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlploghttp.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlploghttp.WithEndpointURL(cfg.EndpointURL))
}
return otlploghttp.New(ctx, opts...)
default:
return nil, errInvalidOTLPProtocol
}
}
func newTraceProvider(traceExporter sdktrace.SpanExporter, res *resource.Resource) (*sdktrace.TracerProvider, error) {
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),