tracing: refactor code, support more exporters with default environment configuration
This commit is contained in:
@@ -7,14 +7,19 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
"go.ntppool.org/common/version"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"go.opentelemetry.io/contrib/exporters/autoexport"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
@@ -23,6 +28,16 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
// svcNameKey is the environment variable name that Service Name information will be read from.
|
||||
svcNameKey = "OTEL_SERVICE_NAME"
|
||||
|
||||
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
|
||||
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
|
||||
)
|
||||
|
||||
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
|
||||
|
||||
// https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go
|
||||
|
||||
type TpShutdownFunc func(ctx context.Context) error
|
||||
@@ -48,80 +63,170 @@ type TracerConfig struct {
|
||||
RootCAs *x509.CertPool
|
||||
}
|
||||
|
||||
var emptyTpShutdownFunc = func(_ context.Context) error {
|
||||
return nil
|
||||
func InitTracer(ctx context.Context, cfg *TracerConfig) (TpShutdownFunc, error) {
|
||||
// todo: setup environment from cfg
|
||||
return SetupSDK(ctx, cfg)
|
||||
}
|
||||
|
||||
func InitTracer(ctx context.Context, cfg *TracerConfig) (TpShutdownFunc, error) {
|
||||
func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, err error) {
|
||||
|
||||
log := logger.Setup()
|
||||
|
||||
// exporter, err := srv.newStdoutExporter(os.Stdout)
|
||||
if serviceName := os.Getenv(svcNameKey); len(serviceName) == 0 {
|
||||
if len(cfg.ServiceName) > 0 {
|
||||
os.Setenv(svcNameKey, cfg.ServiceName)
|
||||
}
|
||||
}
|
||||
|
||||
resources := []resource.Option{
|
||||
resource.WithFromEnv(), // Discover and provide attributes from OTEL_RESOURCE_ATTRIBUTES and OTEL_SERVICE_NAME environment variables.
|
||||
resource.WithTelemetrySDK(), // Discover and provide information about the OpenTelemetry SDK used.
|
||||
resource.WithProcess(), // Discover and provide process information.
|
||||
resource.WithOS(), // Discover and provide OS information.
|
||||
resource.WithContainer(), // Discover and provide container information.
|
||||
resource.WithHost(), // Discover and provide host information.
|
||||
|
||||
// set above via os.Setenv() for WithFromEnv to find
|
||||
// resource.WithAttributes(semconv.ServiceNameKey.String(cfg.ServiceName)),
|
||||
|
||||
resource.WithAttributes(semconv.ServiceVersionKey.String(version.Version())),
|
||||
}
|
||||
|
||||
if len(cfg.Environment) > 0 {
|
||||
resources = append(resources,
|
||||
resource.WithAttributes(attribute.String("environment", cfg.Environment)),
|
||||
)
|
||||
}
|
||||
|
||||
res, err := resource.New(
|
||||
context.Background(),
|
||||
resources...,
|
||||
)
|
||||
if errors.Is(err, resource.ErrPartialResource) || errors.Is(err, resource.ErrSchemaURLConflict) {
|
||||
log.Warn("otel resource setup", "err", err) // Log non-fatal issues.
|
||||
} else if err != nil {
|
||||
log.Error("otel resource setup", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
var shutdownFuncs []func(context.Context) error
|
||||
shutdown = func(ctx context.Context) error {
|
||||
var err error
|
||||
for _, fn := range shutdownFuncs {
|
||||
err = errors.Join(err, fn(ctx))
|
||||
}
|
||||
shutdownFuncs = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
|
||||
handleErr := func(inErr error) {
|
||||
err = errors.Join(inErr, shutdown(ctx))
|
||||
}
|
||||
|
||||
prop := newPropagator()
|
||||
otel.SetTextMapPropagator(prop)
|
||||
|
||||
var err error
|
||||
var exporter otelsdktrace.SpanExporter
|
||||
|
||||
if otlpEndPoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); len(otlpEndPoint) > 0 || len(cfg.Endpoint) > 0 {
|
||||
switch os.Getenv("OTEL_TRACES_EXPORTER") {
|
||||
case "":
|
||||
exporter, err = newOLTPExporter(ctx, cfg)
|
||||
case "otlp":
|
||||
exporter, err = newOLTPExporter(ctx, cfg)
|
||||
default:
|
||||
exporter, err = autoexport.NewSpanExporter(ctx)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return emptyTpShutdownFunc, err
|
||||
handleErr(err)
|
||||
return
|
||||
}
|
||||
shutdownFuncs = append(shutdownFuncs, exporter.Shutdown)
|
||||
|
||||
if exporter == nil {
|
||||
log.Warn("tracing not configured")
|
||||
return emptyTpShutdownFunc, nil
|
||||
return
|
||||
}
|
||||
|
||||
resource, err := newResource(cfg)
|
||||
// Set up trace provider.
|
||||
tracerProvider, err := newTraceProvider(exporter, res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
handleErr(err)
|
||||
return
|
||||
}
|
||||
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
|
||||
otel.SetTracerProvider(tracerProvider)
|
||||
|
||||
if err != nil {
|
||||
handleErr(err)
|
||||
return
|
||||
}
|
||||
|
||||
tp := otelsdktrace.NewTracerProvider(
|
||||
otelsdktrace.WithSampler(otelsdktrace.AlwaysSample()),
|
||||
otelsdktrace.WithBatcher(exporter),
|
||||
otelsdktrace.WithResource(resource),
|
||||
)
|
||||
|
||||
otel.SetTracerProvider(tp)
|
||||
|
||||
otel.SetTextMapPropagator(
|
||||
propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{}, // W3C Trace Context format; https://www.w3.org/TR/trace-context/
|
||||
propagation.Baggage{},
|
||||
),
|
||||
)
|
||||
|
||||
return tp.Shutdown, nil
|
||||
return
|
||||
}
|
||||
|
||||
func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (otelsdktrace.SpanExporter, error) {
|
||||
|
||||
log := logger.Setup()
|
||||
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
log.Info("configuring", "config", cfg)
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if cfg.CertificateProvider != nil {
|
||||
log.InfoContext(ctx, "setting up cert provider")
|
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(&tls.Config{
|
||||
tlsConfig = &tls.Config{
|
||||
GetClientCertificate: cfg.CertificateProvider,
|
||||
RootCAs: cfg.RootCAs,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
|
||||
proto := os.Getenv(otelExporterOTLPTracesProtoEnvKey)
|
||||
if proto == "" {
|
||||
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
|
||||
}
|
||||
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
|
||||
// Fallback to default, http/protobuf.
|
||||
if proto == "" {
|
||||
proto = "http/protobuf"
|
||||
}
|
||||
|
||||
var client otlptrace.Client
|
||||
|
||||
switch proto {
|
||||
case "grpc":
|
||||
opts := []otlptracegrpc.Option{
|
||||
otlptracegrpc.WithCompressor("gzip"),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracegrpc.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
client = otlptracegrpc.NewClient(opts...)
|
||||
case "http/protobuf":
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
client = otlptracehttp.NewClient(opts...)
|
||||
default:
|
||||
return nil, errInvalidOTLPProtocol
|
||||
}
|
||||
|
||||
client := otlptracehttp.NewClient(opts...)
|
||||
exporter, err := otlptrace.New(ctx, client)
|
||||
if err != nil {
|
||||
log.ErrorContext(ctx, "creating OTLP trace exporter", "err", err)
|
||||
@@ -129,44 +234,19 @@ func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (otelsdktrace.SpanE
|
||||
return exporter, err
|
||||
}
|
||||
|
||||
// func (srv *Server) newStdoutExporter(w io.Writer) (sdktrace.SpanExporter, error) {
|
||||
// return stdouttrace.New(
|
||||
// stdouttrace.WithWriter(w),
|
||||
// // Use human-readable output.
|
||||
// stdouttrace.WithPrettyPrint(),
|
||||
// // Do not print timestamps for the demo.
|
||||
// stdouttrace.WithoutTimestamps(),
|
||||
// )
|
||||
// }
|
||||
|
||||
// newResource returns a resource describing this application.
|
||||
func newResource(cfg *TracerConfig) (*resource.Resource, error) {
|
||||
|
||||
log := logger.Setup()
|
||||
|
||||
defaultResource := resource.Default()
|
||||
log.Debug("default semconv", "url", defaultResource.SchemaURL())
|
||||
|
||||
newResource := resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceNameKey.String(cfg.ServiceName),
|
||||
semconv.ServiceVersionKey.String(version.Version()),
|
||||
attribute.String("environment", cfg.Environment),
|
||||
func newTraceProvider(traceExporter otelsdktrace.SpanExporter, res *resource.Resource) (*otelsdktrace.TracerProvider, error) {
|
||||
traceProvider := otelsdktrace.NewTracerProvider(
|
||||
otelsdktrace.WithResource(res),
|
||||
otelsdktrace.WithBatcher(traceExporter,
|
||||
otelsdktrace.WithBatchTimeout(time.Second*3),
|
||||
),
|
||||
)
|
||||
return traceProvider, nil
|
||||
}
|
||||
|
||||
func newPropagator() propagation.TextMapPropagator {
|
||||
return propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
)
|
||||
|
||||
log.Debug("new resource semconv", "url", newResource.SchemaURL())
|
||||
|
||||
r, err := resource.Merge(
|
||||
defaultResource,
|
||||
newResource,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("could not setup otel resource",
|
||||
"err", err,
|
||||
"default", defaultResource.SchemaURL(),
|
||||
"local", newResource.SchemaURL(),
|
||||
)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user