Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
0b9769dc39 | |||
9dadd9edc3 | |||
c6230be91e | |||
796b2a8412 | |||
6a3bc7bab3 | |||
da13a371b4 | |||
a1a5a6b8be | |||
96afb77844 | |||
c372d79d1d | |||
b5141d6a70 |
34
CHANGELOG.md
Normal file
34
CHANGELOG.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# Release Notes - v0.5.1
|
||||
|
||||
## Observability Enhancements
|
||||
|
||||
### OTLP Metrics Support
|
||||
- **New `metrics/` package** - OpenTelemetry-native metrics with OTLP export support for structured metrics collection
|
||||
- **Centralized OTLP configuration** - Refactored configuration to `internal/tracerconfig/` to eliminate code duplication across tracing, logging, and metrics
|
||||
- **HTTP retry support** - Added consistent retry configuration for all HTTP OTLP exporters to improve reliability
|
||||
|
||||
### Enhanced Logging
|
||||
- **Buffering exporter** - Added OTLP log buffering to queue logs until tracing configuration is available
|
||||
- **TLS support for logs** - Client certificate authentication support for secure OTLP log export
|
||||
- **Improved logfmt formatting** - Better structured output for log messages
|
||||
|
||||
### Tracing Improvements
|
||||
- **HTTP retry support** - OTLP trace requests now automatically retry on failure when using HTTP transport
|
||||
|
||||
## Build System
|
||||
|
||||
### Version Package Enhancements
|
||||
- **Unix epoch build time support** - Build time can now be injected as Unix timestamps (`$(date +%s)`) in addition to RFC3339 format
|
||||
- **Simplified build commands** - Reduces complexity of ldflags injection while maintaining backward compatibility
|
||||
- **Consistent output format** - All build times normalize to RFC3339 format regardless of input
|
||||
|
||||
## API Changes
|
||||
|
||||
### New Public Interfaces
|
||||
- `metrics.NewMeterProvider()` - Create OTLP metrics provider with centralized configuration
|
||||
- `metrics.Shutdown()` - Graceful shutdown for metrics exporters
|
||||
- `internal/tracerconfig` - Shared OTLP configuration utilities (internal package)
|
||||
|
||||
### Dependencies
|
||||
- Added explicit OpenTelemetry metrics dependencies to `go.mod`
|
||||
- Updated tracing dependencies for retry support
|
61
database/config.go
Normal file
61
database/config.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Config represents the database configuration structure
|
||||
type Config struct {
|
||||
MySQL DBConfig `yaml:"mysql"`
|
||||
}
|
||||
|
||||
// DBConfig represents the MySQL database configuration
|
||||
type DBConfig struct {
|
||||
DSN string `default:"" flag:"dsn" usage:"Database DSN"`
|
||||
User string `default:"" flag:"user"`
|
||||
Pass string `default:"" flag:"pass"`
|
||||
DBName string // Optional database name override
|
||||
}
|
||||
|
||||
// ConfigOptions allows customization of database opening behavior
|
||||
type ConfigOptions struct {
|
||||
// ConfigFiles is a list of config file paths to search for database configuration
|
||||
ConfigFiles []string
|
||||
|
||||
// EnablePoolMonitoring enables connection pool metrics collection
|
||||
EnablePoolMonitoring bool
|
||||
|
||||
// PrometheusRegisterer for metrics collection. If nil, no metrics are collected.
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
|
||||
// Connection pool settings
|
||||
MaxOpenConns int
|
||||
MaxIdleConns int
|
||||
ConnMaxLifetime time.Duration
|
||||
}
|
||||
|
||||
// DefaultConfigOptions returns the standard configuration options used by API package
|
||||
func DefaultConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: true,
|
||||
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
||||
MaxOpenConns: 25,
|
||||
MaxIdleConns: 10,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
||||
func MonitorConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
||||
PrometheusRegisterer: nil, // No Prometheus dependency
|
||||
MaxOpenConns: 10,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
81
database/config_test.go
Normal file
81
database/config_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func TestDefaultConfigOptions(t *testing.T) {
|
||||
opts := DefaultConfigOptions()
|
||||
|
||||
// Verify expected defaults for API package
|
||||
if opts.MaxOpenConns != 25 {
|
||||
t.Errorf("Expected MaxOpenConns=25, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 10 {
|
||||
t.Errorf("Expected MaxIdleConns=10, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
if opts.ConnMaxLifetime != 3*time.Minute {
|
||||
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
|
||||
}
|
||||
if !opts.EnablePoolMonitoring {
|
||||
t.Error("Expected EnablePoolMonitoring=true")
|
||||
}
|
||||
if opts.PrometheusRegisterer != prometheus.DefaultRegisterer {
|
||||
t.Error("Expected PrometheusRegisterer to be DefaultRegisterer")
|
||||
}
|
||||
if len(opts.ConfigFiles) == 0 {
|
||||
t.Error("Expected ConfigFiles to be non-empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorConfigOptions(t *testing.T) {
|
||||
opts := MonitorConfigOptions()
|
||||
|
||||
// Verify expected defaults for Monitor package
|
||||
if opts.MaxOpenConns != 10 {
|
||||
t.Errorf("Expected MaxOpenConns=10, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 5 {
|
||||
t.Errorf("Expected MaxIdleConns=5, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
if opts.ConnMaxLifetime != 3*time.Minute {
|
||||
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
|
||||
}
|
||||
if opts.EnablePoolMonitoring {
|
||||
t.Error("Expected EnablePoolMonitoring=false")
|
||||
}
|
||||
if opts.PrometheusRegisterer != nil {
|
||||
t.Error("Expected PrometheusRegisterer to be nil")
|
||||
}
|
||||
if len(opts.ConfigFiles) == 0 {
|
||||
t.Error("Expected ConfigFiles to be non-empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigStructures(t *testing.T) {
|
||||
// Test that configuration structures can be created and populated
|
||||
config := Config{
|
||||
MySQL: DBConfig{
|
||||
DSN: "user:pass@tcp(localhost:3306)/dbname",
|
||||
User: "testuser",
|
||||
Pass: "testpass",
|
||||
DBName: "testdb",
|
||||
},
|
||||
}
|
||||
|
||||
if config.MySQL.DSN == "" {
|
||||
t.Error("Expected DSN to be set")
|
||||
}
|
||||
if config.MySQL.User != "testuser" {
|
||||
t.Errorf("Expected User='testuser', got '%s'", config.MySQL.User)
|
||||
}
|
||||
if config.MySQL.Pass != "testpass" {
|
||||
t.Errorf("Expected Pass='testpass', got '%s'", config.MySQL.Pass)
|
||||
}
|
||||
if config.MySQL.DBName != "testdb" {
|
||||
t.Errorf("Expected DBName='testdb', got '%s'", config.MySQL.DBName)
|
||||
}
|
||||
}
|
88
database/connector.go
Normal file
88
database/connector.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// from https://github.com/Boostport/dynamic-database-config
|
||||
|
||||
// CreateConnectorFunc is a function that creates a database connector
|
||||
type CreateConnectorFunc func() (driver.Connector, error)
|
||||
|
||||
// Driver implements the sql/driver interface with dynamic configuration
|
||||
type Driver struct {
|
||||
CreateConnectorFunc CreateConnectorFunc
|
||||
}
|
||||
|
||||
// Driver returns the driver instance
|
||||
func (d Driver) Driver() driver.Driver {
|
||||
return d
|
||||
}
|
||||
|
||||
// Connect creates a new database connection using the dynamic connector
|
||||
func (d Driver) Connect(ctx context.Context) (driver.Conn, error) {
|
||||
connector, err := d.CreateConnectorFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating connector from function: %w", err)
|
||||
}
|
||||
|
||||
return connector.Connect(ctx)
|
||||
}
|
||||
|
||||
// Open is not supported for dynamic configuration
|
||||
func (d Driver) Open(name string) (driver.Conn, error) {
|
||||
return nil, errors.New("open is not supported")
|
||||
}
|
||||
|
||||
// createConnector creates a connector function that reads configuration from a file
|
||||
func createConnector(configFile string) CreateConnectorFunc {
|
||||
return func() (driver.Connector, error) {
|
||||
dbFile, err := os.Open(configFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer dbFile.Close()
|
||||
|
||||
dec := yaml.NewDecoder(dbFile)
|
||||
cfg := Config{}
|
||||
|
||||
err = dec.Decode(&cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dsn := cfg.MySQL.DSN
|
||||
if len(dsn) == 0 {
|
||||
dsn = os.Getenv("DATABASE_DSN")
|
||||
if len(dsn) == 0 {
|
||||
return nil, fmt.Errorf("dsn config in database.yaml or DATABASE_DSN environment variable required")
|
||||
}
|
||||
}
|
||||
|
||||
dbcfg, err := mysql.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if user := cfg.MySQL.User; len(user) > 0 {
|
||||
dbcfg.User = user
|
||||
}
|
||||
|
||||
if pass := cfg.MySQL.Pass; len(pass) > 0 {
|
||||
dbcfg.Passwd = pass
|
||||
}
|
||||
|
||||
if name := cfg.MySQL.DBName; len(name) > 0 {
|
||||
dbcfg.DBName = name
|
||||
}
|
||||
|
||||
return mysql.NewConnector(dbcfg)
|
||||
}
|
||||
}
|
117
database/integration_test.go
Normal file
117
database/integration_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Mock types for testing SQLC integration patterns
|
||||
type mockQueries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
type mockQueriesTx struct {
|
||||
*mockQueries
|
||||
tx *sql.Tx
|
||||
}
|
||||
|
||||
// Mock the Begin method pattern that SQLC generates
|
||||
func (q *mockQueries) Begin(ctx context.Context) (*mockQueriesTx, error) {
|
||||
// This would normally be: tx, err := q.db.(*sql.DB).BeginTx(ctx, nil)
|
||||
// For our test, we return a mock
|
||||
return &mockQueriesTx{mockQueries: q, tx: nil}, nil
|
||||
}
|
||||
|
||||
func (qtx *mockQueriesTx) Commit(ctx context.Context) error {
|
||||
return nil // Mock implementation
|
||||
}
|
||||
|
||||
func (qtx *mockQueriesTx) Rollback(ctx context.Context) error {
|
||||
return nil // Mock implementation
|
||||
}
|
||||
|
||||
// This test verifies that our common database interfaces are compatible with SQLC-generated code
|
||||
func TestSQLCIntegration(t *testing.T) {
|
||||
// Test that SQLC's DBTX interface matches our DBTX interface
|
||||
t.Run("DBTX Interface Compatibility", func(t *testing.T) {
|
||||
// Test interface compatibility by assignment without execution
|
||||
var ourDBTX DBTX
|
||||
|
||||
// Test with sql.DB (should implement DBTX)
|
||||
var db *sql.DB
|
||||
ourDBTX = db // This will compile only if interfaces are compatible
|
||||
_ = ourDBTX // Use the variable to avoid "unused" warning
|
||||
|
||||
// Test with sql.Tx (should implement DBTX)
|
||||
var tx *sql.Tx
|
||||
ourDBTX = tx // This will compile only if interfaces are compatible
|
||||
_ = ourDBTX // Use the variable to avoid "unused" warning
|
||||
|
||||
// If we reach here, interfaces are compatible
|
||||
t.Log("DBTX interface is compatible with sql.DB and sql.Tx")
|
||||
})
|
||||
|
||||
t.Run("Transaction Interface Compatibility", func(t *testing.T) {
|
||||
// This test verifies our transaction interfaces work with SQLC patterns
|
||||
// We can't define methods inside a function, so we test interface compatibility
|
||||
|
||||
// Verify our DB interface is compatible with what SQLC expects
|
||||
var dbInterface DB[*mockQueriesTx]
|
||||
var mockDB *mockQueries = &mockQueries{}
|
||||
dbInterface = mockDB
|
||||
|
||||
// Test that our transaction helper can work with this pattern
|
||||
err := WithTransaction(context.Background(), dbInterface, func(ctx context.Context, qtx *mockQueriesTx) error {
|
||||
// This would be where you'd call SQLC-generated query methods
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Transaction helper failed: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test that demonstrates how the common package would be used with real SQLC patterns
|
||||
func TestRealWorldUsagePattern(t *testing.T) {
|
||||
// This test shows how a package would typically use our common database code
|
||||
|
||||
t.Run("Database Opening Pattern", func(t *testing.T) {
|
||||
// Test that our configuration options work as expected
|
||||
opts := DefaultConfigOptions()
|
||||
|
||||
// Modify for test environment (no actual database connection)
|
||||
opts.ConfigFiles = []string{} // No config files for unit test
|
||||
opts.PrometheusRegisterer = nil // No metrics for unit test
|
||||
|
||||
// This would normally open a database: db, err := OpenDB(ctx, opts)
|
||||
// For our unit test, we just verify the options are reasonable
|
||||
if opts.MaxOpenConns <= 0 {
|
||||
t.Error("MaxOpenConns should be positive")
|
||||
}
|
||||
if opts.MaxIdleConns <= 0 {
|
||||
t.Error("MaxIdleConns should be positive")
|
||||
}
|
||||
if opts.ConnMaxLifetime <= 0 {
|
||||
t.Error("ConnMaxLifetime should be positive")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Monitor Package Configuration", func(t *testing.T) {
|
||||
opts := MonitorConfigOptions()
|
||||
|
||||
// Verify monitor-specific settings
|
||||
if opts.EnablePoolMonitoring {
|
||||
t.Error("Monitor package should not enable pool monitoring")
|
||||
}
|
||||
if opts.PrometheusRegisterer != nil {
|
||||
t.Error("Monitor package should not have Prometheus registerer")
|
||||
}
|
||||
if opts.MaxOpenConns != 10 {
|
||||
t.Errorf("Expected MaxOpenConns=10 for monitor, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 5 {
|
||||
t.Errorf("Expected MaxIdleConns=5 for monitor, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
})
|
||||
}
|
34
database/interfaces.go
Normal file
34
database/interfaces.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// DBTX matches the interface expected by SQLC-generated code
|
||||
// This interface is implemented by both *sql.DB and *sql.Tx
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// BaseQuerier provides basic query functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerier interface {
|
||||
WithTx(tx *sql.Tx) BaseQuerier
|
||||
}
|
||||
|
||||
// BaseQuerierTx provides transaction functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerierTx interface {
|
||||
BaseQuerier
|
||||
Begin(ctx context.Context) (BaseQuerierTx, error)
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// TransactionFunc represents a function that operates within a database transaction
|
||||
// This is used by the shared transaction helpers in transaction.go
|
||||
type TransactionFunc[Q any] func(ctx context.Context, q Q) error
|
93
database/metrics.go
Normal file
93
database/metrics.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
|
||||
type DatabaseMetrics struct {
|
||||
ConnectionsOpen prometheus.Gauge
|
||||
ConnectionsIdle prometheus.Gauge
|
||||
ConnectionsInUse prometheus.Gauge
|
||||
ConnectionsWaitCount prometheus.Counter
|
||||
ConnectionsWaitDuration prometheus.Histogram
|
||||
}
|
||||
|
||||
// NewDatabaseMetrics creates a new set of database metrics and registers them
|
||||
func NewDatabaseMetrics(registerer prometheus.Registerer) *DatabaseMetrics {
|
||||
metrics := &DatabaseMetrics{
|
||||
ConnectionsOpen: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_open",
|
||||
Help: "Number of open database connections",
|
||||
}),
|
||||
ConnectionsIdle: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_idle",
|
||||
Help: "Number of idle database connections",
|
||||
}),
|
||||
ConnectionsInUse: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_in_use",
|
||||
Help: "Number of database connections in use",
|
||||
}),
|
||||
ConnectionsWaitCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "database_connections_wait_count_total",
|
||||
Help: "Total number of times a connection had to wait",
|
||||
}),
|
||||
ConnectionsWaitDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "database_connections_wait_duration_seconds",
|
||||
Help: "Time spent waiting for a database connection",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
}),
|
||||
}
|
||||
|
||||
if registerer != nil {
|
||||
registerer.MustRegister(
|
||||
metrics.ConnectionsOpen,
|
||||
metrics.ConnectionsIdle,
|
||||
metrics.ConnectionsInUse,
|
||||
metrics.ConnectionsWaitCount,
|
||||
metrics.ConnectionsWaitDuration,
|
||||
)
|
||||
}
|
||||
|
||||
return metrics
|
||||
}
|
||||
|
||||
// monitorConnectionPool runs a background goroutine to collect connection pool metrics
|
||||
func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheus.Registerer) {
|
||||
if registerer == nil {
|
||||
return // No metrics collection if no registerer provided
|
||||
}
|
||||
|
||||
metrics := NewDatabaseMetrics(registerer)
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
stats := db.Stats()
|
||||
|
||||
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
|
||||
metrics.ConnectionsIdle.Set(float64(stats.Idle))
|
||||
metrics.ConnectionsInUse.Set(float64(stats.InUse))
|
||||
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
|
||||
|
||||
if stats.WaitDuration > 0 {
|
||||
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds())
|
||||
}
|
||||
|
||||
// Log connection pool stats for high usage or waiting
|
||||
if stats.OpenConnections > 20 || stats.WaitCount > 0 {
|
||||
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n",
|
||||
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
78
database/pool.go
Normal file
78
database/pool.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// OpenDB opens a database connection with the specified configuration options
|
||||
func OpenDB(ctx context.Context, options ConfigOptions) (*sql.DB, error) {
|
||||
log := logger.Setup()
|
||||
|
||||
configFile, err := findConfigFile(options.ConfigFiles)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbconn := sql.OpenDB(Driver{
|
||||
CreateConnectorFunc: createConnector(configFile),
|
||||
})
|
||||
|
||||
// Set connection pool parameters
|
||||
dbconn.SetConnMaxLifetime(options.ConnMaxLifetime)
|
||||
dbconn.SetMaxOpenConns(options.MaxOpenConns)
|
||||
dbconn.SetMaxIdleConns(options.MaxIdleConns)
|
||||
|
||||
err = dbconn.Ping()
|
||||
if err != nil {
|
||||
log.Error("could not connect to database", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start optional connection pool monitoring
|
||||
if options.EnablePoolMonitoring && options.PrometheusRegisterer != nil {
|
||||
go monitorConnectionPool(ctx, dbconn, options.PrometheusRegisterer)
|
||||
}
|
||||
|
||||
return dbconn, nil
|
||||
}
|
||||
|
||||
// OpenDBWithConfigFile opens a database connection using an explicit config file path
|
||||
// This is a convenience function for API package compatibility
|
||||
func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, error) {
|
||||
options := DefaultConfigOptions()
|
||||
options.ConfigFiles = []string{configFile}
|
||||
return OpenDB(ctx, options)
|
||||
}
|
||||
|
||||
// OpenDBMonitor opens a database connection with monitor-specific defaults
|
||||
// This is a convenience function for Monitor package compatibility
|
||||
func OpenDBMonitor() (*sql.DB, error) {
|
||||
options := MonitorConfigOptions()
|
||||
return OpenDB(context.Background(), options)
|
||||
}
|
||||
|
||||
// findConfigFile searches for the first existing config file from the list
|
||||
func findConfigFile(configFiles []string) (string, error) {
|
||||
var firstErr error
|
||||
|
||||
for _, configFile := range configFiles {
|
||||
if configFile == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(configFile); err == nil {
|
||||
return configFile, nil
|
||||
} else if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
if firstErr != nil {
|
||||
return "", fmt.Errorf("no config file found: %w", firstErr)
|
||||
}
|
||||
return "", fmt.Errorf("no valid config files provided")
|
||||
}
|
69
database/transaction.go
Normal file
69
database/transaction.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// DB interface for database operations that can begin transactions
|
||||
type DB[Q any] interface {
|
||||
Begin(ctx context.Context) (Q, error)
|
||||
}
|
||||
|
||||
// TX interface for transaction operations
|
||||
type TX interface {
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// WithTransaction executes a function within a database transaction
|
||||
// Handles proper rollback on error and commit on success
|
||||
func WithTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
var committed bool
|
||||
defer func() {
|
||||
if !committed {
|
||||
if rbErr := tx.Rollback(ctx); rbErr != nil {
|
||||
// Log rollback error but don't override original error
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "failed to rollback transaction", "error", rbErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
committed = true // Mark as committed regardless of commit success/failure
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithReadOnlyTransaction executes a read-only function within a transaction
|
||||
// Always rolls back at the end (for consistent read isolation)
|
||||
func WithReadOnlyTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin read-only transaction: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rbErr := tx.Rollback(ctx); rbErr != nil {
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "failed to rollback read-only transaction", "error", rbErr)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, tx)
|
||||
}
|
69
database/transaction_base.go
Normal file
69
database/transaction_base.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// Shared interface definitions that both packages use identically
|
||||
type BaseBeginner interface {
|
||||
Begin(context.Context) (sql.Tx, error)
|
||||
}
|
||||
|
||||
type BaseTx interface {
|
||||
BaseBeginner
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// BeginTransactionForQuerier contains the shared Begin() logic from both packages
|
||||
func BeginTransactionForQuerier(ctx context.Context, db DBTX) (DBTX, error) {
|
||||
if sqlDB, ok := db.(*sql.DB); ok {
|
||||
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tx, nil
|
||||
} else {
|
||||
// Handle transaction case
|
||||
if beginner, ok := db.(BaseBeginner); ok {
|
||||
tx, err := beginner.Begin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tx, nil
|
||||
}
|
||||
return nil, fmt.Errorf("database connection does not support transactions")
|
||||
}
|
||||
}
|
||||
|
||||
// CommitTransactionForQuerier contains the shared Commit() logic from both packages
|
||||
func CommitTransactionForQuerier(ctx context.Context, db DBTX) error {
|
||||
if sqlTx, ok := db.(*sql.Tx); ok {
|
||||
return sqlTx.Commit()
|
||||
}
|
||||
|
||||
tx, ok := db.(BaseTx)
|
||||
if !ok {
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "could not get a Tx", "type", fmt.Sprintf("%T", db))
|
||||
return sql.ErrTxDone
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// RollbackTransactionForQuerier contains the shared Rollback() logic from both packages
|
||||
func RollbackTransactionForQuerier(ctx context.Context, db DBTX) error {
|
||||
if sqlTx, ok := db.(*sql.Tx); ok {
|
||||
return sqlTx.Rollback()
|
||||
}
|
||||
|
||||
tx, ok := db.(BaseTx)
|
||||
if !ok {
|
||||
return sql.ErrTxDone
|
||||
}
|
||||
return tx.Rollback(ctx)
|
||||
}
|
157
database/transaction_test.go
Normal file
157
database/transaction_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Mock implementations for testing
|
||||
type mockDB struct {
|
||||
beginError error
|
||||
txMock *mockTX
|
||||
}
|
||||
|
||||
func (m *mockDB) Begin(ctx context.Context) (*mockTX, error) {
|
||||
if m.beginError != nil {
|
||||
return nil, m.beginError
|
||||
}
|
||||
return m.txMock, nil
|
||||
}
|
||||
|
||||
type mockTX struct {
|
||||
commitError error
|
||||
rollbackError error
|
||||
commitCalled bool
|
||||
rollbackCalled bool
|
||||
}
|
||||
|
||||
func (m *mockTX) Commit(ctx context.Context) error {
|
||||
m.commitCalled = true
|
||||
return m.commitError
|
||||
}
|
||||
|
||||
func (m *mockTX) Rollback(ctx context.Context) error {
|
||||
m.rollbackCalled = true
|
||||
return m.rollbackError
|
||||
}
|
||||
|
||||
func TestWithTransaction_Success(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
var functionCalled bool
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
functionCalled = true
|
||||
if q != tx {
|
||||
t.Error("Expected transaction to be passed to function")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !functionCalled {
|
||||
t.Error("Expected function to be called")
|
||||
}
|
||||
if !tx.commitCalled {
|
||||
t.Error("Expected commit to be called")
|
||||
}
|
||||
if tx.rollbackCalled {
|
||||
t.Error("Expected rollback NOT to be called on success")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_FunctionError(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
expectedError := errors.New("function error")
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return expectedError
|
||||
})
|
||||
|
||||
if err != expectedError {
|
||||
t.Errorf("Expected error %v, got %v", expectedError, err)
|
||||
}
|
||||
if tx.commitCalled {
|
||||
t.Error("Expected commit NOT to be called on function error")
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called on function error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_BeginError(t *testing.T) {
|
||||
expectedError := errors.New("begin error")
|
||||
db := &mockDB{beginError: expectedError}
|
||||
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
t.Error("Function should not be called when Begin fails")
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil || !errors.Is(err, expectedError) {
|
||||
t.Errorf("Expected wrapped begin error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_CommitError(t *testing.T) {
|
||||
commitError := errors.New("commit error")
|
||||
tx := &mockTX{commitError: commitError}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil || !errors.Is(err, commitError) {
|
||||
t.Errorf("Expected wrapped commit error, got %v", err)
|
||||
}
|
||||
if !tx.commitCalled {
|
||||
t.Error("Expected commit to be called")
|
||||
}
|
||||
if tx.rollbackCalled {
|
||||
t.Error("Expected rollback NOT to be called when commit fails")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithReadOnlyTransaction_Success(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
var functionCalled bool
|
||||
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
functionCalled = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !functionCalled {
|
||||
t.Error("Expected function to be called")
|
||||
}
|
||||
if tx.commitCalled {
|
||||
t.Error("Expected commit NOT to be called in read-only transaction")
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called in read-only transaction")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithReadOnlyTransaction_FunctionError(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
expectedError := errors.New("function error")
|
||||
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return expectedError
|
||||
})
|
||||
|
||||
if err != expectedError {
|
||||
t.Errorf("Expected error %v, got %v", expectedError, err)
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called")
|
||||
}
|
||||
}
|
17
go.mod
17
go.mod
@@ -4,10 +4,12 @@ go 1.23.5
|
||||
|
||||
require (
|
||||
github.com/abh/certman v0.4.0
|
||||
github.com/go-sql-driver/mysql v1.9.3
|
||||
github.com/labstack/echo-contrib v0.17.2
|
||||
github.com/labstack/echo/v4 v4.13.3
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
github.com/prometheus/client_model v0.6.1
|
||||
github.com/remychantenay/slog-otel v1.3.2
|
||||
github.com/samber/slog-echo v1.14.8
|
||||
github.com/samber/slog-multi v1.2.4
|
||||
@@ -17,20 +19,28 @@ require (
|
||||
go.opentelemetry.io/contrib/exporters/autoexport v0.58.0
|
||||
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.58.0
|
||||
go.opentelemetry.io/otel v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.9.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.9.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0
|
||||
go.opentelemetry.io/otel/log v0.9.0
|
||||
go.opentelemetry.io/otel/metric v1.33.0
|
||||
go.opentelemetry.io/otel/sdk v1.33.0
|
||||
go.opentelemetry.io/otel/sdk/log v0.9.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.33.0
|
||||
go.opentelemetry.io/otel/trace v1.33.0
|
||||
golang.org/x/mod v0.22.0
|
||||
golang.org/x/net v0.33.0
|
||||
golang.org/x/sync v0.10.0
|
||||
google.golang.org/grpc v1.69.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
filippo.io/edwards25519 v1.1.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
@@ -47,7 +57,6 @@ require (
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.61.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
github.com/samber/lo v1.47.0 // indirect
|
||||
@@ -56,16 +65,10 @@ require (
|
||||
github.com/valyala/fasttemplate v1.2.2 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/bridges/prometheus v0.58.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.9.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.9.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.55.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.9.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
|
12
go.sum
12
go.sum
@@ -1,3 +1,5 @@
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/abh/certman v0.4.0 h1:XHoDtb0YyRQPclaHMrBDlKTVZpNjTK6vhB0S3Bd/Sbs=
|
||||
github.com/abh/certman v0.4.0/go.mod h1:x8QhpKVZifmV1Hdiwdg9gLo2GMPAxezz1s3zrVnPs+I=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@@ -17,6 +19,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
|
||||
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
@@ -30,6 +34,10 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/labstack/echo-contrib v0.17.2 h1:K1zivqmtcC70X9VdBFdLomjPDEVHlrcAObqmuFj1c6w=
|
||||
@@ -65,6 +73,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
|
||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||
github.com/remychantenay/slog-otel v1.3.2 h1:ZBx8qnwfLJ6e18Vba4e9Xp9B7khTmpIwFsU1sAmActw=
|
||||
github.com/remychantenay/slog-otel v1.3.2/go.mod h1:gKW4tQ8cGOKoA+bi7wtYba/tcJ6Tc9XyQ/EW8gHA/2E=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
|
||||
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
|
||||
@@ -211,6 +221,8 @@ google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7
|
||||
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
|
||||
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
378
internal/tracerconfig/config.go
Normal file
378
internal/tracerconfig/config.go
Normal file
@@ -0,0 +1,378 @@
|
||||
// Package tracerconfig provides a bridge to eliminate circular dependencies between
|
||||
// the logger and tracing packages. It stores tracer configuration and provides
|
||||
// factory functions that can be used by the logger package without importing tracing.
|
||||
package tracerconfig
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
sdklog "go.opentelemetry.io/otel/sdk/log"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
|
||||
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
|
||||
otelExporterOTLPLogsProtoEnvKey = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL"
|
||||
otelExporterOTLPMetricsProtoEnvKey = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"
|
||||
)
|
||||
|
||||
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
|
||||
|
||||
// newInvalidProtocolError creates a specific error message for invalid protocols
|
||||
func newInvalidProtocolError(protocol, signalType string) error {
|
||||
return fmt.Errorf("invalid OTLP protocol '%s' for %s - should be one of ['grpc', 'http/protobuf', 'http/json']", protocol, signalType)
|
||||
}
|
||||
|
||||
// Validate checks the configuration for common errors and inconsistencies
|
||||
func (c *Config) Validate() error {
|
||||
var errs []error
|
||||
|
||||
// Check that both Endpoint and EndpointURL are not specified
|
||||
if c.Endpoint != "" && c.EndpointURL != "" {
|
||||
errs = append(errs, errors.New("cannot specify both Endpoint and EndpointURL - use one or the other"))
|
||||
}
|
||||
|
||||
// Validate EndpointURL format if specified
|
||||
if c.EndpointURL != "" {
|
||||
if _, err := url.Parse(c.EndpointURL); err != nil {
|
||||
errs = append(errs, fmt.Errorf("invalid EndpointURL format: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Validate Endpoint format if specified
|
||||
if c.Endpoint != "" {
|
||||
// Basic validation - should not contain protocol scheme
|
||||
if strings.Contains(c.Endpoint, "://") {
|
||||
errs = append(errs, errors.New("Endpoint should not include protocol scheme (use EndpointURL for full URLs)"))
|
||||
}
|
||||
// Should not be empty after trimming whitespace
|
||||
if strings.TrimSpace(c.Endpoint) == "" {
|
||||
errs = append(errs, errors.New("Endpoint cannot be empty or whitespace"))
|
||||
}
|
||||
}
|
||||
|
||||
// Validate TLS configuration consistency
|
||||
if c.CertificateProvider != nil && c.RootCAs == nil {
|
||||
// This is just a warning - client cert without custom CAs is valid
|
||||
// but might indicate a configuration issue
|
||||
}
|
||||
|
||||
// Validate service name if specified
|
||||
if c.ServiceName != "" && strings.TrimSpace(c.ServiceName) == "" {
|
||||
errs = append(errs, errors.New("ServiceName cannot be empty or whitespace"))
|
||||
}
|
||||
|
||||
// Combine all errors
|
||||
if len(errs) > 0 {
|
||||
var errMsgs []string
|
||||
for _, err := range errs {
|
||||
errMsgs = append(errMsgs, err.Error())
|
||||
}
|
||||
return fmt.Errorf("configuration validation failed: %s", strings.Join(errMsgs, "; "))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateAndStore validates the configuration before storing it
|
||||
func ValidateAndStore(ctx context.Context, cfg *Config, logFactory LogExporterFactory, metricFactory MetricExporterFactory, traceFactory TraceExporterFactory) error {
|
||||
if cfg != nil {
|
||||
if err := cfg.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
Store(ctx, cfg, logFactory, metricFactory, traceFactory)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClientCertificate defines a function type for providing client certificates for mutual TLS.
|
||||
// This is used when exporting telemetry data to secured OTLP endpoints that require
|
||||
// client certificate authentication.
|
||||
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
|
||||
// Config provides configuration options for OpenTelemetry tracing setup.
|
||||
// It supplements standard OpenTelemetry environment variables with additional
|
||||
// NTP Pool-specific configuration including TLS settings for secure OTLP export.
|
||||
type Config struct {
|
||||
ServiceName string // Service name for resource identification (overrides OTEL_SERVICE_NAME)
|
||||
Environment string // Deployment environment (development, staging, production)
|
||||
Endpoint string // OTLP endpoint hostname/port (e.g., "otlp.example.com:4317")
|
||||
EndpointURL string // Complete OTLP endpoint URL (e.g., "https://otlp.example.com:4317/v1/traces")
|
||||
CertificateProvider GetClientCertificate // Client certificate provider for mutual TLS
|
||||
RootCAs *x509.CertPool // CA certificate pool for server verification
|
||||
}
|
||||
|
||||
// LogExporterFactory creates an OTLP log exporter using the provided configuration.
|
||||
// This allows the logger package to create exporters without importing the tracing package.
|
||||
type LogExporterFactory func(context.Context, *Config) (sdklog.Exporter, error)
|
||||
|
||||
// MetricExporterFactory creates an OTLP metric exporter using the provided configuration.
|
||||
// This allows the metrics package to create exporters without importing the tracing package.
|
||||
type MetricExporterFactory func(context.Context, *Config) (sdkmetric.Exporter, error)
|
||||
|
||||
// TraceExporterFactory creates an OTLP trace exporter using the provided configuration.
|
||||
// This allows for consistent trace exporter creation across packages.
|
||||
type TraceExporterFactory func(context.Context, *Config) (sdktrace.SpanExporter, error)
|
||||
|
||||
// Global state for sharing configuration between packages
|
||||
var (
|
||||
globalConfig *Config
|
||||
globalContext context.Context
|
||||
logExporterFactory LogExporterFactory
|
||||
metricExporterFactory MetricExporterFactory
|
||||
traceExporterFactory TraceExporterFactory
|
||||
configMu sync.RWMutex
|
||||
)
|
||||
|
||||
// Store saves the tracer configuration and exporter factories for use by other packages.
|
||||
// This should be called by the tracing package during initialization.
|
||||
func Store(ctx context.Context, cfg *Config, logFactory LogExporterFactory, metricFactory MetricExporterFactory, traceFactory TraceExporterFactory) {
|
||||
configMu.Lock()
|
||||
defer configMu.Unlock()
|
||||
globalConfig = cfg
|
||||
globalContext = ctx
|
||||
logExporterFactory = logFactory
|
||||
metricExporterFactory = metricFactory
|
||||
traceExporterFactory = traceFactory
|
||||
}
|
||||
|
||||
// GetLogExporter returns the stored configuration and log exporter factory.
|
||||
// Returns nil values if no configuration has been stored yet.
|
||||
func GetLogExporter() (*Config, context.Context, LogExporterFactory) {
|
||||
configMu.RLock()
|
||||
defer configMu.RUnlock()
|
||||
return globalConfig, globalContext, logExporterFactory
|
||||
}
|
||||
|
||||
// GetMetricExporter returns the stored configuration and metric exporter factory.
|
||||
// Returns nil values if no configuration has been stored yet.
|
||||
func GetMetricExporter() (*Config, context.Context, MetricExporterFactory) {
|
||||
configMu.RLock()
|
||||
defer configMu.RUnlock()
|
||||
return globalConfig, globalContext, metricExporterFactory
|
||||
}
|
||||
|
||||
// GetTraceExporter returns the stored configuration and trace exporter factory.
|
||||
// Returns nil values if no configuration has been stored yet.
|
||||
func GetTraceExporter() (*Config, context.Context, TraceExporterFactory) {
|
||||
configMu.RLock()
|
||||
defer configMu.RUnlock()
|
||||
return globalConfig, globalContext, traceExporterFactory
|
||||
}
|
||||
|
||||
// Get returns the stored tracer configuration, context, and log exporter factory.
|
||||
// This maintains backward compatibility for the logger package.
|
||||
// Returns nil values if no configuration has been stored yet.
|
||||
func Get() (*Config, context.Context, LogExporterFactory) {
|
||||
return GetLogExporter()
|
||||
}
|
||||
|
||||
// IsConfigured returns true if tracer configuration has been stored.
|
||||
func IsConfigured() bool {
|
||||
configMu.RLock()
|
||||
defer configMu.RUnlock()
|
||||
return globalConfig != nil && globalContext != nil
|
||||
}
|
||||
|
||||
// Clear removes the stored configuration. This is primarily useful for testing.
|
||||
func Clear() {
|
||||
configMu.Lock()
|
||||
defer configMu.Unlock()
|
||||
globalConfig = nil
|
||||
globalContext = nil
|
||||
logExporterFactory = nil
|
||||
metricExporterFactory = nil
|
||||
traceExporterFactory = nil
|
||||
}
|
||||
|
||||
// getTLSConfig creates a TLS configuration from the provided Config.
|
||||
func getTLSConfig(cfg *Config) *tls.Config {
|
||||
if cfg.CertificateProvider == nil {
|
||||
return nil
|
||||
}
|
||||
return &tls.Config{
|
||||
GetClientCertificate: cfg.CertificateProvider,
|
||||
RootCAs: cfg.RootCAs,
|
||||
}
|
||||
}
|
||||
|
||||
// getProtocol determines the OTLP protocol to use for the given signal type.
|
||||
// It follows OpenTelemetry environment variable precedence.
|
||||
func getProtocol(signalSpecificEnv string) string {
|
||||
proto := os.Getenv(signalSpecificEnv)
|
||||
if proto == "" {
|
||||
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
|
||||
}
|
||||
// Fallback to default, http/protobuf.
|
||||
if proto == "" {
|
||||
proto = "http/protobuf"
|
||||
}
|
||||
return proto
|
||||
}
|
||||
|
||||
// CreateOTLPLogExporter creates an OTLP log exporter using the provided configuration.
|
||||
func CreateOTLPLogExporter(ctx context.Context, cfg *Config) (sdklog.Exporter, error) {
|
||||
tlsConfig := getTLSConfig(cfg)
|
||||
proto := getProtocol(otelExporterOTLPLogsProtoEnvKey)
|
||||
|
||||
switch proto {
|
||||
case "grpc":
|
||||
opts := []otlploggrpc.Option{
|
||||
otlploggrpc.WithCompressor("gzip"),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlploggrpc.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlploggrpc.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
return otlploggrpc.New(ctx, opts...)
|
||||
case "http/protobuf", "http/json":
|
||||
opts := []otlploghttp.Option{
|
||||
otlploghttp.WithCompression(otlploghttp.GzipCompression),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlploghttp.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlploghttp.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
opts = append(opts, otlploghttp.WithRetry(otlploghttp.RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 3 * time.Second,
|
||||
MaxInterval: 60 * time.Second,
|
||||
MaxElapsedTime: 5 * time.Minute,
|
||||
}))
|
||||
|
||||
return otlploghttp.New(ctx, opts...)
|
||||
default:
|
||||
return nil, newInvalidProtocolError(proto, "logs")
|
||||
}
|
||||
}
|
||||
|
||||
// CreateOTLPMetricExporter creates an OTLP metric exporter using the provided configuration.
|
||||
func CreateOTLPMetricExporter(ctx context.Context, cfg *Config) (sdkmetric.Exporter, error) {
|
||||
tlsConfig := getTLSConfig(cfg)
|
||||
proto := getProtocol(otelExporterOTLPMetricsProtoEnvKey)
|
||||
|
||||
switch proto {
|
||||
case "grpc":
|
||||
opts := []otlpmetricgrpc.Option{
|
||||
otlpmetricgrpc.WithCompressor("gzip"),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlpmetricgrpc.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlpmetricgrpc.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
return otlpmetricgrpc.New(ctx, opts...)
|
||||
case "http/protobuf", "http/json":
|
||||
opts := []otlpmetrichttp.Option{
|
||||
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlpmetrichttp.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlpmetrichttp.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
opts = append(opts, otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 3 * time.Second,
|
||||
MaxInterval: 60 * time.Second,
|
||||
MaxElapsedTime: 5 * time.Minute,
|
||||
}))
|
||||
|
||||
return otlpmetrichttp.New(ctx, opts...)
|
||||
default:
|
||||
return nil, newInvalidProtocolError(proto, "metrics")
|
||||
}
|
||||
}
|
||||
|
||||
// CreateOTLPTraceExporter creates an OTLP trace exporter using the provided configuration.
|
||||
func CreateOTLPTraceExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) {
|
||||
tlsConfig := getTLSConfig(cfg)
|
||||
proto := getProtocol(otelExporterOTLPTracesProtoEnvKey)
|
||||
|
||||
var client otlptrace.Client
|
||||
|
||||
switch proto {
|
||||
case "grpc":
|
||||
opts := []otlptracegrpc.Option{
|
||||
otlptracegrpc.WithCompressor("gzip"),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracegrpc.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
client = otlptracegrpc.NewClient(opts...)
|
||||
case "http/protobuf", "http/json":
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
opts = append(opts, otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
|
||||
Enabled: true,
|
||||
InitialInterval: 3 * time.Second,
|
||||
MaxInterval: 60 * time.Second,
|
||||
MaxElapsedTime: 5 * time.Minute,
|
||||
}))
|
||||
|
||||
client = otlptracehttp.NewClient(opts...)
|
||||
default:
|
||||
return nil, newInvalidProtocolError(proto, "traces")
|
||||
}
|
||||
|
||||
return otlptrace.New(ctx, client)
|
||||
}
|
474
internal/tracerconfig/config_test.go
Normal file
474
internal/tracerconfig/config_test.go
Normal file
@@ -0,0 +1,474 @@
|
||||
package tracerconfig
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
sdklog "go.opentelemetry.io/otel/sdk/log"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
func TestStore_And_Retrieve(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
config := &Config{
|
||||
ServiceName: "test-service",
|
||||
Environment: "test",
|
||||
Endpoint: "localhost:4317",
|
||||
}
|
||||
|
||||
// Create mock factories
|
||||
logFactory := func(context.Context, *Config) (sdklog.Exporter, error) { return nil, nil }
|
||||
metricFactory := func(context.Context, *Config) (sdkmetric.Exporter, error) { return nil, nil }
|
||||
traceFactory := func(context.Context, *Config) (sdktrace.SpanExporter, error) { return nil, nil }
|
||||
|
||||
// Store configuration
|
||||
Store(ctx, config, logFactory, metricFactory, traceFactory)
|
||||
|
||||
// Test IsConfigured
|
||||
if !IsConfigured() {
|
||||
t.Error("IsConfigured() should return true after Store()")
|
||||
}
|
||||
|
||||
// Test GetLogExporter
|
||||
cfg, ctx2, factory := GetLogExporter()
|
||||
if cfg == nil || ctx2 == nil || factory == nil {
|
||||
t.Error("GetLogExporter() should return non-nil values")
|
||||
}
|
||||
if cfg.ServiceName != "test-service" {
|
||||
t.Errorf("Expected ServiceName 'test-service', got '%s'", cfg.ServiceName)
|
||||
}
|
||||
|
||||
// Test GetMetricExporter
|
||||
cfg, ctx3, metricFact := GetMetricExporter()
|
||||
if cfg == nil || ctx3 == nil || metricFact == nil {
|
||||
t.Error("GetMetricExporter() should return non-nil values")
|
||||
}
|
||||
|
||||
// Test GetTraceExporter
|
||||
cfg, ctx4, traceFact := GetTraceExporter()
|
||||
if cfg == nil || ctx4 == nil || traceFact == nil {
|
||||
t.Error("GetTraceExporter() should return non-nil values")
|
||||
}
|
||||
|
||||
// Test backward compatibility Get()
|
||||
cfg, ctx5, logFact := Get()
|
||||
if cfg == nil || ctx5 == nil || logFact == nil {
|
||||
t.Error("Get() should return non-nil values for backward compatibility")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClear(t *testing.T) {
|
||||
// Store some configuration first
|
||||
ctx := context.Background()
|
||||
config := &Config{ServiceName: "test"}
|
||||
Store(ctx, config, nil, nil, nil)
|
||||
|
||||
if !IsConfigured() {
|
||||
t.Error("Should be configured before Clear()")
|
||||
}
|
||||
|
||||
// Clear configuration
|
||||
Clear()
|
||||
|
||||
if IsConfigured() {
|
||||
t.Error("Should not be configured after Clear()")
|
||||
}
|
||||
|
||||
// All getters should return nil
|
||||
cfg, ctx2, factory := GetLogExporter()
|
||||
if cfg != nil || ctx2 != nil || factory != nil {
|
||||
t.Error("GetLogExporter() should return nil values after Clear()")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentAccess(t *testing.T) {
|
||||
Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
config := &Config{ServiceName: "concurrent-test"}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
const numGoroutines = 10
|
||||
|
||||
// Test concurrent Store and Get operations
|
||||
wg.Add(numGoroutines * 2)
|
||||
|
||||
// Concurrent Store operations
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
Store(ctx, config, nil, nil, nil)
|
||||
}()
|
||||
}
|
||||
|
||||
// Concurrent Get operations
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
IsConfigured()
|
||||
GetLogExporter()
|
||||
GetMetricExporter()
|
||||
GetTraceExporter()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Should be configured after all operations
|
||||
if !IsConfigured() {
|
||||
t.Error("Should be configured after concurrent operations")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTLSConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config *Config
|
||||
expected bool // whether TLS config should be nil
|
||||
}{
|
||||
{
|
||||
name: "nil certificate provider",
|
||||
config: &Config{},
|
||||
expected: true, // should be nil
|
||||
},
|
||||
{
|
||||
name: "with certificate provider",
|
||||
config: &Config{
|
||||
CertificateProvider: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
return &tls.Certificate{}, nil
|
||||
},
|
||||
},
|
||||
expected: false, // should not be nil
|
||||
},
|
||||
{
|
||||
name: "with certificate provider and RootCAs",
|
||||
config: &Config{
|
||||
CertificateProvider: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
|
||||
return &tls.Certificate{}, nil
|
||||
},
|
||||
RootCAs: x509.NewCertPool(),
|
||||
},
|
||||
expected: false, // should not be nil
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
tlsConfig := getTLSConfig(tt.config)
|
||||
if tt.expected && tlsConfig != nil {
|
||||
t.Errorf("Expected nil TLS config, got %v", tlsConfig)
|
||||
}
|
||||
if !tt.expected && tlsConfig == nil {
|
||||
t.Error("Expected non-nil TLS config, got nil")
|
||||
}
|
||||
if !tt.expected && tlsConfig != nil {
|
||||
if tlsConfig.GetClientCertificate == nil {
|
||||
t.Error("Expected GetClientCertificate to be set")
|
||||
}
|
||||
if tt.config.RootCAs != nil && tlsConfig.RootCAs != tt.config.RootCAs {
|
||||
t.Error("Expected RootCAs to be set correctly")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetProtocol(t *testing.T) {
|
||||
// Save original env vars
|
||||
originalGeneral := os.Getenv(otelExporterOTLPProtoEnvKey)
|
||||
originalLogs := os.Getenv(otelExporterOTLPLogsProtoEnvKey)
|
||||
|
||||
defer func() {
|
||||
// Restore original env vars
|
||||
if originalGeneral != "" {
|
||||
os.Setenv(otelExporterOTLPProtoEnvKey, originalGeneral)
|
||||
} else {
|
||||
os.Unsetenv(otelExporterOTLPProtoEnvKey)
|
||||
}
|
||||
if originalLogs != "" {
|
||||
os.Setenv(otelExporterOTLPLogsProtoEnvKey, originalLogs)
|
||||
} else {
|
||||
os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
|
||||
}
|
||||
}()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
signalSpecific string
|
||||
generalProto string
|
||||
specificProto string
|
||||
expectedResult string
|
||||
}{
|
||||
{
|
||||
name: "no env vars set - default",
|
||||
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
|
||||
expectedResult: "http/protobuf",
|
||||
},
|
||||
{
|
||||
name: "general env var set",
|
||||
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
|
||||
generalProto: "grpc",
|
||||
expectedResult: "grpc",
|
||||
},
|
||||
{
|
||||
name: "specific env var overrides general",
|
||||
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
|
||||
generalProto: "grpc",
|
||||
specificProto: "http/protobuf",
|
||||
expectedResult: "http/protobuf",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Clear env vars
|
||||
os.Unsetenv(otelExporterOTLPProtoEnvKey)
|
||||
os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
|
||||
|
||||
// Set test env vars
|
||||
if tt.generalProto != "" {
|
||||
os.Setenv(otelExporterOTLPProtoEnvKey, tt.generalProto)
|
||||
}
|
||||
if tt.specificProto != "" {
|
||||
os.Setenv(tt.signalSpecific, tt.specificProto)
|
||||
}
|
||||
|
||||
result := getProtocol(tt.signalSpecific)
|
||||
if result != tt.expectedResult {
|
||||
t.Errorf("Expected protocol '%s', got '%s'", tt.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateExporterErrors(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
config := &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "invalid-endpoint",
|
||||
}
|
||||
|
||||
// Test with invalid protocol for logs
|
||||
os.Setenv(otelExporterOTLPLogsProtoEnvKey, "invalid-protocol")
|
||||
defer os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
|
||||
|
||||
_, err := CreateOTLPLogExporter(ctx, config)
|
||||
if err == nil {
|
||||
t.Error("Expected error for invalid protocol")
|
||||
}
|
||||
// Check that it's a protocol error (the specific message will be different now)
|
||||
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
|
||||
t.Errorf("Expected protocol error, got %v", err)
|
||||
}
|
||||
|
||||
// Test with invalid protocol for metrics
|
||||
os.Setenv(otelExporterOTLPMetricsProtoEnvKey, "invalid-protocol")
|
||||
defer os.Unsetenv(otelExporterOTLPMetricsProtoEnvKey)
|
||||
|
||||
_, err = CreateOTLPMetricExporter(ctx, config)
|
||||
if err == nil {
|
||||
t.Error("Expected error for invalid protocol")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
|
||||
t.Errorf("Expected protocol error, got %v", err)
|
||||
}
|
||||
|
||||
// Test with invalid protocol for traces
|
||||
os.Setenv(otelExporterOTLPTracesProtoEnvKey, "invalid-protocol")
|
||||
defer os.Unsetenv(otelExporterOTLPTracesProtoEnvKey)
|
||||
|
||||
_, err = CreateOTLPTraceExporter(ctx, config)
|
||||
if err == nil {
|
||||
t.Error("Expected error for invalid protocol")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
|
||||
t.Errorf("Expected protocol error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateExporterValidProtocols(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
config := &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "localhost:4317", // This will likely fail to connect, but should create exporter
|
||||
}
|
||||
|
||||
protocols := []string{"grpc", "http/protobuf", "http/json"}
|
||||
|
||||
for _, proto := range protocols {
|
||||
t.Run("logs_"+proto, func(t *testing.T) {
|
||||
os.Setenv(otelExporterOTLPLogsProtoEnvKey, proto)
|
||||
defer os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
|
||||
|
||||
exporter, err := CreateOTLPLogExporter(ctx, config)
|
||||
if err != nil {
|
||||
// Connection errors are expected since we're not running a real OTLP server
|
||||
// but the exporter should be created successfully
|
||||
t.Logf("Connection error expected: %v", err)
|
||||
}
|
||||
if exporter != nil {
|
||||
exporter.Shutdown(ctx)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("metrics_"+proto, func(t *testing.T) {
|
||||
os.Setenv(otelExporterOTLPMetricsProtoEnvKey, proto)
|
||||
defer os.Unsetenv(otelExporterOTLPMetricsProtoEnvKey)
|
||||
|
||||
exporter, err := CreateOTLPMetricExporter(ctx, config)
|
||||
if err != nil {
|
||||
t.Logf("Connection error expected: %v", err)
|
||||
}
|
||||
if exporter != nil {
|
||||
exporter.Shutdown(ctx)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("traces_"+proto, func(t *testing.T) {
|
||||
os.Setenv(otelExporterOTLPTracesProtoEnvKey, proto)
|
||||
defer os.Unsetenv(otelExporterOTLPTracesProtoEnvKey)
|
||||
|
||||
exporter, err := CreateOTLPTraceExporter(ctx, config)
|
||||
if err != nil {
|
||||
t.Logf("Connection error expected: %v", err)
|
||||
}
|
||||
if exporter != nil {
|
||||
exporter.Shutdown(ctx)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigValidation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
config *Config
|
||||
shouldErr bool
|
||||
}{
|
||||
{
|
||||
name: "valid empty config",
|
||||
config: &Config{},
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "valid config with endpoint",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "localhost:4317",
|
||||
},
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "valid config with endpoint URL",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
EndpointURL: "https://otlp.example.com:4317/v1/traces",
|
||||
},
|
||||
shouldErr: false,
|
||||
},
|
||||
{
|
||||
name: "invalid - both endpoint and endpoint URL",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "localhost:4317",
|
||||
EndpointURL: "https://otlp.example.com:4317/v1/traces",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - endpoint with protocol",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "https://localhost:4317",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - empty endpoint",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: " ",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - malformed endpoint URL",
|
||||
config: &Config{
|
||||
ServiceName: "test-service",
|
||||
EndpointURL: "://invalid-url-missing-scheme",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - empty service name",
|
||||
config: &Config{
|
||||
ServiceName: " ",
|
||||
Endpoint: "localhost:4317",
|
||||
},
|
||||
shouldErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := tt.config.Validate()
|
||||
if tt.shouldErr && err == nil {
|
||||
t.Error("Expected validation error, got nil")
|
||||
}
|
||||
if !tt.shouldErr && err != nil {
|
||||
t.Errorf("Expected no validation error, got: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateAndStore(t *testing.T) {
|
||||
Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Test with valid config
|
||||
validConfig := &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "localhost:4317",
|
||||
}
|
||||
|
||||
err := ValidateAndStore(ctx, validConfig, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("ValidateAndStore with valid config should not error: %v", err)
|
||||
}
|
||||
|
||||
if !IsConfigured() {
|
||||
t.Error("Should be configured after ValidateAndStore")
|
||||
}
|
||||
|
||||
Clear()
|
||||
|
||||
// Test with invalid config
|
||||
invalidConfig := &Config{
|
||||
ServiceName: "test-service",
|
||||
Endpoint: "localhost:4317",
|
||||
EndpointURL: "https://example.com:4317", // both specified - invalid
|
||||
}
|
||||
|
||||
err = ValidateAndStore(ctx, invalidConfig, nil, nil, nil)
|
||||
if err == nil {
|
||||
t.Error("ValidateAndStore with invalid config should return error")
|
||||
}
|
||||
|
||||
if IsConfigured() {
|
||||
t.Error("Should not be configured after failed ValidateAndStore")
|
||||
}
|
||||
}
|
198
logger/buffering_exporter.go
Normal file
198
logger/buffering_exporter.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.ntppool.org/common/internal/tracerconfig"
|
||||
otellog "go.opentelemetry.io/otel/sdk/log"
|
||||
)
|
||||
|
||||
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
|
||||
type bufferingExporter struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// Buffered records while waiting for tracing config
|
||||
buffer [][]otellog.Record
|
||||
bufferSize int
|
||||
maxBuffSize int
|
||||
|
||||
// Real exporter (created when tracing is configured)
|
||||
exporter otellog.Exporter
|
||||
|
||||
// Thread-safe initialization
|
||||
initOnce sync.Once
|
||||
initErr error
|
||||
|
||||
// Background checker
|
||||
stopChecker chan struct{}
|
||||
checkerDone chan struct{}
|
||||
}
|
||||
|
||||
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
|
||||
func newBufferingExporter() *bufferingExporter {
|
||||
e := &bufferingExporter{
|
||||
maxBuffSize: 1000, // Max number of batches to buffer
|
||||
stopChecker: make(chan struct{}),
|
||||
checkerDone: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start background readiness checker
|
||||
go e.checkReadiness()
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// Export implements otellog.Exporter
|
||||
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
||||
// Try initialization once
|
||||
e.initOnce.Do(func() {
|
||||
e.initErr = e.initialize()
|
||||
})
|
||||
|
||||
// If initialization succeeded, use the exporter
|
||||
if e.initErr == nil {
|
||||
e.mu.RLock()
|
||||
exporter := e.exporter
|
||||
e.mu.RUnlock()
|
||||
|
||||
if exporter != nil {
|
||||
return exporter.Export(ctx, records)
|
||||
}
|
||||
}
|
||||
|
||||
// Not ready yet, buffer the records
|
||||
return e.bufferRecords(records)
|
||||
}
|
||||
|
||||
// initialize attempts to create the real OTLP exporter using tracing config
|
||||
func (e *bufferingExporter) initialize() error {
|
||||
cfg, ctx, factory := tracerconfig.Get()
|
||||
if cfg == nil || ctx == nil || factory == nil {
|
||||
return errors.New("tracer not configured yet")
|
||||
}
|
||||
|
||||
// Add timeout for initialization
|
||||
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
exporter, err := factory(initCtx, cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OTLP exporter: %w", err)
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
e.exporter = exporter
|
||||
flushErr := e.flushBuffer(initCtx)
|
||||
e.mu.Unlock()
|
||||
|
||||
if flushErr != nil {
|
||||
// Log but don't fail initialization
|
||||
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// bufferRecords adds records to the buffer for later processing
|
||||
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
// Buffer the batch if we have space
|
||||
if e.bufferSize < e.maxBuffSize {
|
||||
// Clone records to avoid retention issues
|
||||
cloned := make([]otellog.Record, len(records))
|
||||
for i, r := range records {
|
||||
cloned[i] = r.Clone()
|
||||
}
|
||||
e.buffer = append(e.buffer, cloned)
|
||||
e.bufferSize++
|
||||
}
|
||||
|
||||
// Always return success to BatchProcessor
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkReadiness periodically checks if tracing is configured
|
||||
func (e *bufferingExporter) checkReadiness() {
|
||||
defer close(e.checkerDone)
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// If initialization failed, reset sync.Once to allow retry
|
||||
// The OTLP exporter will handle its own retry logic
|
||||
if e.initErr != nil {
|
||||
e.initOnce = sync.Once{}
|
||||
} else if e.exporter != nil {
|
||||
return // Exporter ready, checker no longer needed
|
||||
}
|
||||
|
||||
case <-e.stopChecker:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBuffer sends all buffered batches through the real exporter
|
||||
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
|
||||
if len(e.buffer) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var lastErr error
|
||||
for _, batch := range e.buffer {
|
||||
if err := e.exporter.Export(flushCtx, batch); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
||||
// Clear buffer after flush attempt
|
||||
e.buffer = nil
|
||||
e.bufferSize = 0
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// ForceFlush implements otellog.Exporter
|
||||
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
if e.exporter != nil {
|
||||
return e.exporter.ForceFlush(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown implements otellog.Exporter
|
||||
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
|
||||
// Stop the readiness checker from continuing
|
||||
close(e.stopChecker)
|
||||
|
||||
// Give one final chance for TLS/tracing to become ready before fully shutting down
|
||||
e.initOnce.Do(func() {
|
||||
e.initErr = e.initialize()
|
||||
})
|
||||
|
||||
// Wait for readiness checker goroutine to complete
|
||||
<-e.checkerDone
|
||||
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.exporter != nil {
|
||||
return e.exporter.Shutdown(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -16,13 +16,9 @@ type logfmt struct {
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newLogFmtHandler(next slog.Handler) slog.Handler {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
|
||||
h := &logfmt{
|
||||
buf: buf,
|
||||
next: next,
|
||||
txt: slog.NewTextHandler(buf, &slog.HandlerOptions{
|
||||
// createTextHandlerOptions creates the common slog.HandlerOptions used by all logfmt handlers
|
||||
func createTextHandlerOptions() *slog.HandlerOptions {
|
||||
return &slog.HandlerOptions{
|
||||
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
|
||||
if a.Key == slog.TimeKey && len(groups) == 0 {
|
||||
return slog.Attr{}
|
||||
@@ -32,7 +28,16 @@ func newLogFmtHandler(next slog.Handler) slog.Handler {
|
||||
}
|
||||
return a
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func newLogFmtHandler(next slog.Handler) slog.Handler {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
|
||||
h := &logfmt{
|
||||
buf: buf,
|
||||
next: next,
|
||||
txt: slog.NewTextHandler(buf, createTextHandlerOptions()),
|
||||
}
|
||||
|
||||
return h
|
||||
@@ -43,10 +48,11 @@ func (h *logfmt) Enabled(ctx context.Context, lvl slog.Level) bool {
|
||||
}
|
||||
|
||||
func (h *logfmt) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
return &logfmt{
|
||||
buf: bytes.NewBuffer([]byte{}),
|
||||
buf: buf,
|
||||
next: h.next.WithAttrs(slices.Clone(attrs)),
|
||||
txt: h.txt.WithAttrs(slices.Clone(attrs)),
|
||||
txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithAttrs(slices.Clone(attrs)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,10 +60,11 @@ func (h *logfmt) WithGroup(g string) slog.Handler {
|
||||
if g == "" {
|
||||
return h
|
||||
}
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
return &logfmt{
|
||||
buf: bytes.NewBuffer([]byte{}),
|
||||
buf: buf,
|
||||
next: h.next.WithGroup(g),
|
||||
txt: h.txt.WithGroup(g),
|
||||
txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithGroup(g),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,10 +76,22 @@ func (h *logfmt) Handle(ctx context.Context, r slog.Record) error {
|
||||
panic("buffer wasn't empty")
|
||||
}
|
||||
|
||||
h.txt.Handle(ctx, r)
|
||||
r.Message = h.buf.String()
|
||||
r.Message = strings.TrimSuffix(r.Message, "\n")
|
||||
// Format using text handler to get the formatted message
|
||||
err := h.txt.Handle(ctx, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
formattedMessage := h.buf.String()
|
||||
formattedMessage = strings.TrimSuffix(formattedMessage, "\n")
|
||||
h.buf.Reset()
|
||||
|
||||
return h.next.Handle(ctx, r)
|
||||
// Create a new record with the formatted message
|
||||
newRecord := slog.NewRecord(r.Time, r.Level, formattedMessage, r.PC)
|
||||
r.Attrs(func(a slog.Attr) bool {
|
||||
newRecord.AddAttrs(a)
|
||||
return true
|
||||
})
|
||||
|
||||
return h.next.Handle(ctx, newRecord)
|
||||
}
|
||||
|
@@ -29,10 +29,13 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
slogtraceid "github.com/remychantenay/slog-otel"
|
||||
slogmulti "github.com/samber/slog-multi"
|
||||
"go.opentelemetry.io/contrib/bridges/otelslog"
|
||||
"go.opentelemetry.io/otel/log/global"
|
||||
otellog "go.opentelemetry.io/otel/sdk/log"
|
||||
)
|
||||
|
||||
// ConfigPrefix allows customizing the environment variable prefix for configuration.
|
||||
@@ -85,9 +88,28 @@ func setupStdErrHandler() slog.Handler {
|
||||
|
||||
func setupOtlpLogger() *slog.Logger {
|
||||
setupOtlp.Do(func() {
|
||||
otlpLogger = slog.New(
|
||||
newLogFmtHandler(otelslog.NewHandler("common")),
|
||||
// Create our buffering exporter
|
||||
// It will buffer until tracing is configured
|
||||
bufferingExp := newBufferingExporter()
|
||||
|
||||
// Use BatchProcessor with our custom exporter
|
||||
processor := otellog.NewBatchProcessor(bufferingExp,
|
||||
otellog.WithExportInterval(10*time.Second),
|
||||
otellog.WithMaxQueueSize(2048),
|
||||
otellog.WithExportMaxBatchSize(512),
|
||||
)
|
||||
|
||||
// Create logger provider
|
||||
provider := otellog.NewLoggerProvider(
|
||||
otellog.WithProcessor(processor),
|
||||
)
|
||||
|
||||
// Set global provider
|
||||
global.SetLoggerProvider(provider)
|
||||
|
||||
// Create slog handler
|
||||
handler := newLogFmtHandler(otelslog.NewHandler("common"))
|
||||
otlpLogger = slog.New(handler)
|
||||
})
|
||||
return otlpLogger
|
||||
}
|
||||
|
122
metrics/metrics.go
Normal file
122
metrics/metrics.go
Normal file
@@ -0,0 +1,122 @@
|
||||
// Package metrics provides OpenTelemetry-native metrics with OTLP export support.
|
||||
//
|
||||
// This package implements a metrics system using the OpenTelemetry metrics data model
|
||||
// with OTLP export capabilities. It's designed for new applications that want to use
|
||||
// structured metrics export to observability backends.
|
||||
//
|
||||
// Key features:
|
||||
// - OpenTelemetry native metric types (Counter, Histogram, Gauge, etc.)
|
||||
// - OTLP export for sending metrics to observability backends
|
||||
// - Resource detection and correlation with traces/logs
|
||||
// - Graceful handling when OTLP configuration is not available
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Initialize metrics along with tracing
|
||||
// shutdown, err := tracing.InitTracer(ctx, cfg)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// defer shutdown(ctx)
|
||||
//
|
||||
// // Get a meter and create instruments
|
||||
// meter := metrics.GetMeter("my-service")
|
||||
// counter, _ := meter.Int64Counter("requests_total")
|
||||
// counter.Add(ctx, 1, metric.WithAttributes(attribute.String("method", "GET")))
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.ntppool.org/common/internal/tracerconfig"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
)
|
||||
|
||||
var (
|
||||
meterProvider metric.MeterProvider
|
||||
setupOnce sync.Once
|
||||
setupErr error
|
||||
)
|
||||
|
||||
// Setup initializes the OpenTelemetry metrics provider with OTLP export.
|
||||
// This function uses the configuration stored by the tracing package and
|
||||
// creates a metrics provider that exports to the same OTLP endpoint.
|
||||
//
|
||||
// The function is safe to call multiple times - it will only initialize once.
|
||||
// If tracing configuration is not available, it returns a no-op provider that
|
||||
// doesn't export metrics.
|
||||
//
|
||||
// Returns an error only if there's a configuration problem. Missing tracing
|
||||
// configuration is handled gracefully with a warning log.
|
||||
func Setup(ctx context.Context) error {
|
||||
setupOnce.Do(func() {
|
||||
setupErr = initializeMetrics(ctx)
|
||||
})
|
||||
return setupErr
|
||||
}
|
||||
|
||||
// GetMeter returns a named meter for creating metric instruments.
|
||||
// The meter uses the configured metrics provider, or the global provider
|
||||
// if metrics haven't been set up yet.
|
||||
//
|
||||
// This is the primary entry point for creating metric instruments in your application.
|
||||
func GetMeter(name string, opts ...metric.MeterOption) metric.Meter {
|
||||
if meterProvider == nil {
|
||||
// Return the global provider as fallback (no-op if not configured)
|
||||
return otel.GetMeterProvider().Meter(name, opts...)
|
||||
}
|
||||
return meterProvider.Meter(name, opts...)
|
||||
}
|
||||
|
||||
// initializeMetrics sets up the OpenTelemetry metrics provider with OTLP export.
|
||||
func initializeMetrics(ctx context.Context) error {
|
||||
log := slog.Default()
|
||||
|
||||
// Check if tracing configuration is available
|
||||
cfg, configCtx, factory := tracerconfig.GetMetricExporter()
|
||||
if cfg == nil || configCtx == nil || factory == nil {
|
||||
log.Warn("metrics setup: tracing configuration not available, using no-op provider")
|
||||
// Set the global provider as fallback - metrics just won't be exported
|
||||
meterProvider = otel.GetMeterProvider()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create OTLP metrics exporter
|
||||
exporter, err := factory(ctx, cfg)
|
||||
if err != nil {
|
||||
log.Error("metrics setup: failed to create OTLP exporter", "error", err)
|
||||
// Fall back to global provider
|
||||
meterProvider = otel.GetMeterProvider()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create metrics provider with the exporter
|
||||
provider := sdkmetric.NewMeterProvider(
|
||||
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(
|
||||
exporter,
|
||||
sdkmetric.WithInterval(15*time.Second),
|
||||
)),
|
||||
)
|
||||
|
||||
// Set the global provider
|
||||
otel.SetMeterProvider(provider)
|
||||
meterProvider = provider
|
||||
|
||||
log.Info("metrics setup: OTLP metrics provider initialized")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the metrics provider.
|
||||
// This should be called during application shutdown to ensure all metrics
|
||||
// are properly flushed and exported.
|
||||
func Shutdown(ctx context.Context) error {
|
||||
if provider, ok := meterProvider.(*sdkmetric.MeterProvider); ok {
|
||||
return provider.Shutdown(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
296
metrics/metrics_test.go
Normal file
296
metrics/metrics_test.go
Normal file
@@ -0,0 +1,296 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.ntppool.org/common/internal/tracerconfig"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func TestSetup_NoConfiguration(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
err := Setup(ctx)
|
||||
// Should not return an error even when no configuration is available
|
||||
if err != nil {
|
||||
t.Errorf("Setup() returned unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Should be able to get a meter (even if it's a no-op)
|
||||
meter := GetMeter("test-meter")
|
||||
if meter == nil {
|
||||
t.Error("GetMeter() returned nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMeter(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
_ = Setup(ctx)
|
||||
|
||||
meter := GetMeter("test-service")
|
||||
if meter == nil {
|
||||
t.Fatal("GetMeter() returned nil")
|
||||
}
|
||||
|
||||
// Test creating a counter instrument
|
||||
counter, err := meter.Int64Counter("test_counter")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create counter: %v", err)
|
||||
}
|
||||
|
||||
// Test using the counter (should not error even with no-op provider)
|
||||
counter.Add(ctx, 1, metric.WithAttributes(attribute.String("test", "value")))
|
||||
}
|
||||
|
||||
func TestSetup_MultipleCallsSafe(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Call Setup multiple times
|
||||
err1 := Setup(ctx)
|
||||
err2 := Setup(ctx)
|
||||
err3 := Setup(ctx)
|
||||
|
||||
if err1 != nil {
|
||||
t.Errorf("First Setup() call returned error: %v", err1)
|
||||
}
|
||||
if err2 != nil {
|
||||
t.Errorf("Second Setup() call returned error: %v", err2)
|
||||
}
|
||||
if err3 != nil {
|
||||
t.Errorf("Third Setup() call returned error: %v", err3)
|
||||
}
|
||||
|
||||
// Should still be able to get meters
|
||||
meter := GetMeter("test-meter")
|
||||
if meter == nil {
|
||||
t.Error("GetMeter() returned nil after multiple Setup() calls")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetup_WithConfiguration(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
config := &tracerconfig.Config{
|
||||
ServiceName: "test-metrics-service",
|
||||
Environment: "test",
|
||||
Endpoint: "localhost:4317", // Will likely fail to connect, but should set up provider
|
||||
}
|
||||
|
||||
// Create a mock exporter factory that returns a working exporter
|
||||
mockFactory := func(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
|
||||
// Create a simple in-memory exporter for testing
|
||||
return &mockMetricExporter{}, nil
|
||||
}
|
||||
|
||||
// Store configuration with mock factory
|
||||
tracerconfig.Store(ctx, config, nil, mockFactory, nil)
|
||||
|
||||
// Setup metrics
|
||||
err := Setup(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Setup() returned error: %v", err)
|
||||
}
|
||||
|
||||
// Should be able to get a meter
|
||||
meter := GetMeter("test-service")
|
||||
if meter == nil {
|
||||
t.Fatal("GetMeter() returned nil")
|
||||
}
|
||||
|
||||
// Test creating and using instruments
|
||||
counter, err := meter.Int64Counter("test_counter")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create counter: %v", err)
|
||||
}
|
||||
|
||||
histogram, err := meter.Float64Histogram("test_histogram")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create histogram: %v", err)
|
||||
}
|
||||
|
||||
gauge, err := meter.Int64UpDownCounter("test_gauge")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create gauge: %v", err)
|
||||
}
|
||||
|
||||
// Use the instruments
|
||||
counter.Add(ctx, 1, metric.WithAttributes(attribute.String("test", "value")))
|
||||
histogram.Record(ctx, 1.5, metric.WithAttributes(attribute.String("test", "value")))
|
||||
gauge.Add(ctx, 10, metric.WithAttributes(attribute.String("test", "value")))
|
||||
|
||||
// Test shutdown
|
||||
err = Shutdown(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Shutdown() returned error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetup_WithRealOTLPConfig(t *testing.T) {
|
||||
// Skip this test in short mode since it may try to make network connections
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
// Set environment variables for OTLP configuration
|
||||
originalEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
|
||||
originalProtocol := os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL")
|
||||
|
||||
defer func() {
|
||||
if originalEndpoint != "" {
|
||||
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", originalEndpoint)
|
||||
} else {
|
||||
os.Unsetenv("OTEL_EXPORTER_OTLP_ENDPOINT")
|
||||
}
|
||||
if originalProtocol != "" {
|
||||
os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", originalProtocol)
|
||||
} else {
|
||||
os.Unsetenv("OTEL_EXPORTER_OTLP_PROTOCOL")
|
||||
}
|
||||
}()
|
||||
|
||||
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") // HTTP endpoint
|
||||
os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
config := &tracerconfig.Config{
|
||||
ServiceName: "test-metrics-e2e",
|
||||
Environment: "test",
|
||||
Endpoint: "localhost:4318",
|
||||
}
|
||||
|
||||
// Store configuration with real factory
|
||||
tracerconfig.Store(ctx, config, nil, tracerconfig.CreateOTLPMetricExporter, nil)
|
||||
|
||||
// Setup metrics - this may fail if no OTLP collector is running, which is okay
|
||||
err := Setup(ctx)
|
||||
if err != nil {
|
||||
t.Logf("Setup() returned error (expected if no OTLP collector): %v", err)
|
||||
}
|
||||
|
||||
// Should still be able to get a meter
|
||||
meter := GetMeter("test-service-e2e")
|
||||
if meter == nil {
|
||||
t.Fatal("GetMeter() returned nil")
|
||||
}
|
||||
|
||||
// Create and use instruments
|
||||
counter, err := meter.Int64Counter("e2e_test_counter")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create counter: %v", err)
|
||||
}
|
||||
|
||||
// Add some metrics
|
||||
for i := 0; i < 5; i++ {
|
||||
counter.Add(ctx, 1, metric.WithAttributes(
|
||||
attribute.String("iteration", string(rune('0'+i))),
|
||||
attribute.String("test_type", "e2e"),
|
||||
))
|
||||
}
|
||||
|
||||
// Give some time for export (if collector is running)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Test shutdown
|
||||
err = Shutdown(ctx)
|
||||
if err != nil {
|
||||
t.Logf("Shutdown() returned error (may be expected): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentMetricUsage(t *testing.T) {
|
||||
// Clear any existing configuration
|
||||
tracerconfig.Clear()
|
||||
|
||||
ctx := context.Background()
|
||||
config := &tracerconfig.Config{
|
||||
ServiceName: "concurrent-test",
|
||||
}
|
||||
|
||||
// Use mock factory
|
||||
mockFactory := func(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
|
||||
return &mockMetricExporter{}, nil
|
||||
}
|
||||
|
||||
tracerconfig.Store(ctx, config, nil, mockFactory, nil)
|
||||
Setup(ctx)
|
||||
|
||||
meter := GetMeter("concurrent-test")
|
||||
counter, err := meter.Int64Counter("concurrent_counter")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create counter: %v", err)
|
||||
}
|
||||
|
||||
// Test concurrent metric usage
|
||||
const numGoroutines = 10
|
||||
const metricsPerGoroutine = 100
|
||||
|
||||
done := make(chan bool, numGoroutines)
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(goroutineID int) {
|
||||
for j := 0; j < metricsPerGoroutine; j++ {
|
||||
counter.Add(ctx, 1, metric.WithAttributes(
|
||||
attribute.Int("goroutine", goroutineID),
|
||||
attribute.Int("iteration", j),
|
||||
))
|
||||
}
|
||||
done <- true
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
<-done
|
||||
}
|
||||
|
||||
// Shutdown
|
||||
err = Shutdown(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Shutdown() returned error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// mockMetricExporter is a simple mock exporter for testing
|
||||
type mockMetricExporter struct{}
|
||||
|
||||
func (m *mockMetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
// Just pretend to export
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMetricExporter) ForceFlush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMetricExporter) Shutdown(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMetricExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality {
|
||||
return metricdata.CumulativeTemporality
|
||||
}
|
||||
|
||||
func (m *mockMetricExporter) Aggregation(kind sdkmetric.InstrumentKind) sdkmetric.Aggregation {
|
||||
return sdkmetric.DefaultAggregationSelector(kind)
|
||||
}
|
@@ -15,7 +15,7 @@ mkdir -p $DIR
|
||||
|
||||
BASE=https://geodns.bitnames.com/${BASE}/builds/${BUILD}
|
||||
|
||||
files=`curl -sSf ${BASE}/checksums.txt | awk '{print $2}'`
|
||||
files=`curl -sSf ${BASE}/checksums.txt | sed 's/^[a-f0-9]*[[:space:]]*//'`
|
||||
metafiles="checksums.txt metadata.json CHANGELOG.md artifacts.json"
|
||||
|
||||
for f in $metafiles; do
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
go install github.com/goreleaser/goreleaser/v2@v2.8.2
|
||||
go install github.com/goreleaser/goreleaser/v2@v2.11.0
|
||||
|
||||
if [ ! -z "${harbor_username:-}" ]; then
|
||||
DOCKER_FILE=~/.docker/config.json
|
||||
|
@@ -38,26 +38,23 @@ package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"os"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
"go.ntppool.org/common/internal/tracerconfig"
|
||||
"go.ntppool.org/common/version"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"go.opentelemetry.io/contrib/exporters/autoexport"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
logglobal "go.opentelemetry.io/otel/log/global"
|
||||
"go.opentelemetry.io/otel/log/global"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
sdklog "go.opentelemetry.io/otel/sdk/log"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||
@@ -67,12 +64,25 @@ import (
|
||||
const (
|
||||
// svcNameKey is the environment variable name that Service Name information will be read from.
|
||||
svcNameKey = "OTEL_SERVICE_NAME"
|
||||
|
||||
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
|
||||
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
|
||||
)
|
||||
|
||||
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
|
||||
// createOTLPLogExporter creates an OTLP log exporter using the provided configuration.
|
||||
// This function is used as the LogExporterFactory for the tracerconfig bridge.
|
||||
func createOTLPLogExporter(ctx context.Context, cfg *tracerconfig.Config) (sdklog.Exporter, error) {
|
||||
return tracerconfig.CreateOTLPLogExporter(ctx, cfg)
|
||||
}
|
||||
|
||||
// createOTLPMetricExporter creates an OTLP metric exporter using the provided configuration.
|
||||
// This function is used as the MetricExporterFactory for the tracerconfig bridge.
|
||||
func createOTLPMetricExporter(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
|
||||
return tracerconfig.CreateOTLPMetricExporter(ctx, cfg)
|
||||
}
|
||||
|
||||
// createOTLPTraceExporter creates an OTLP trace exporter using the provided configuration.
|
||||
// This function is used as the TraceExporterFactory for the tracerconfig bridge.
|
||||
func createOTLPTraceExporter(ctx context.Context, cfg *tracerconfig.Config) (sdktrace.SpanExporter, error) {
|
||||
return tracerconfig.CreateOTLPTraceExporter(ctx, cfg)
|
||||
}
|
||||
|
||||
// https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go
|
||||
|
||||
@@ -98,10 +108,9 @@ func Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption)
|
||||
return Tracer().Start(ctx, spanName, opts...)
|
||||
}
|
||||
|
||||
// GetClientCertificate defines a function type for providing client certificates for mutual TLS.
|
||||
// This is used when exporting telemetry data to secured OTLP endpoints that require
|
||||
// client certificate authentication.
|
||||
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
// GetClientCertificate is an alias for the type defined in tracerconfig.
|
||||
// This maintains backward compatibility for existing code.
|
||||
type GetClientCertificate = tracerconfig.GetClientCertificate
|
||||
|
||||
// TracerConfig provides configuration options for OpenTelemetry tracing setup.
|
||||
// It supplements standard OpenTelemetry environment variables with additional
|
||||
@@ -143,7 +152,18 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
cfg = &TracerConfig{}
|
||||
}
|
||||
|
||||
log := logger.Setup()
|
||||
// Store configuration for use by logger and metrics packages via bridge
|
||||
bridgeConfig := &tracerconfig.Config{
|
||||
ServiceName: cfg.ServiceName,
|
||||
Environment: cfg.Environment,
|
||||
Endpoint: cfg.Endpoint,
|
||||
EndpointURL: cfg.EndpointURL,
|
||||
CertificateProvider: cfg.CertificateProvider,
|
||||
RootCAs: cfg.RootCAs,
|
||||
}
|
||||
tracerconfig.Store(ctx, bridgeConfig, createOTLPLogExporter, createOTLPMetricExporter, createOTLPTraceExporter)
|
||||
|
||||
log := slog.Default()
|
||||
|
||||
if serviceName := os.Getenv(svcNameKey); len(serviceName) == 0 {
|
||||
if len(cfg.ServiceName) > 0 {
|
||||
@@ -184,13 +204,21 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
|
||||
var shutdownFuncs []func(context.Context) error
|
||||
shutdown = func(ctx context.Context) error {
|
||||
// Force flush the global logger provider before shutting down anything else
|
||||
if loggerProvider := global.GetLoggerProvider(); loggerProvider != nil {
|
||||
if sdkProvider, ok := loggerProvider.(*sdklog.LoggerProvider); ok {
|
||||
if flushErr := sdkProvider.ForceFlush(ctx); flushErr != nil {
|
||||
log.Warn("logger provider force flush failed", "err", flushErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
// need to shutdown the providers first,
|
||||
// exporters after which is the opposite
|
||||
// order they are setup.
|
||||
slices.Reverse(shutdownFuncs)
|
||||
for _, fn := range shutdownFuncs {
|
||||
// log.Warn("shutting down", "fn", fn)
|
||||
err = errors.Join(err, fn(ctx))
|
||||
}
|
||||
shutdownFuncs = nil
|
||||
@@ -212,9 +240,9 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
|
||||
switch os.Getenv("OTEL_TRACES_EXPORTER") {
|
||||
case "":
|
||||
spanExporter, err = newOLTPExporter(ctx, cfg)
|
||||
spanExporter, err = createOTLPTraceExporter(ctx, bridgeConfig)
|
||||
case "otlp":
|
||||
spanExporter, err = newOLTPExporter(ctx, cfg)
|
||||
spanExporter, err = createOTLPTraceExporter(ctx, bridgeConfig)
|
||||
default:
|
||||
// log.Debug("OTEL_TRACES_EXPORTER", "fallback", os.Getenv("OTEL_TRACES_EXPORTER"))
|
||||
spanExporter, err = autoexport.NewSpanExporter(ctx)
|
||||
@@ -225,13 +253,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
}
|
||||
shutdownFuncs = append(shutdownFuncs, spanExporter.Shutdown)
|
||||
|
||||
logExporter, err := autoexport.NewLogExporter(ctx)
|
||||
if err != nil {
|
||||
handleErr(err)
|
||||
return
|
||||
}
|
||||
shutdownFuncs = append(shutdownFuncs, logExporter.Shutdown)
|
||||
|
||||
// Set up trace provider.
|
||||
tracerProvider, err := newTraceProvider(spanExporter, res)
|
||||
if err != nil {
|
||||
@@ -241,19 +262,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
|
||||
otel.SetTracerProvider(tracerProvider)
|
||||
|
||||
logProvider := sdklog.NewLoggerProvider(sdklog.WithResource(res),
|
||||
sdklog.WithProcessor(
|
||||
sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(10)),
|
||||
),
|
||||
)
|
||||
|
||||
logglobal.SetLoggerProvider(logProvider)
|
||||
shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error {
|
||||
logProvider.ForceFlush(ctx)
|
||||
return logProvider.Shutdown(ctx)
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
handleErr(err)
|
||||
return
|
||||
@@ -262,74 +270,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
|
||||
return
|
||||
}
|
||||
|
||||
func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExporter, error) {
|
||||
log := logger.Setup()
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if cfg.CertificateProvider != nil {
|
||||
tlsConfig = &tls.Config{
|
||||
GetClientCertificate: cfg.CertificateProvider,
|
||||
RootCAs: cfg.RootCAs,
|
||||
}
|
||||
}
|
||||
|
||||
proto := os.Getenv(otelExporterOTLPTracesProtoEnvKey)
|
||||
if proto == "" {
|
||||
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
|
||||
}
|
||||
|
||||
// Fallback to default, http/protobuf.
|
||||
if proto == "" {
|
||||
proto = "http/protobuf"
|
||||
}
|
||||
|
||||
var client otlptrace.Client
|
||||
|
||||
switch proto {
|
||||
case "grpc":
|
||||
opts := []otlptracegrpc.Option{
|
||||
otlptracegrpc.WithCompressor("gzip"),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
log.Info("adding option", "Endpoint", cfg.Endpoint)
|
||||
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
log.Info("adding option", "EndpointURL", cfg.EndpointURL)
|
||||
opts = append(opts, otlptracegrpc.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
client = otlptracegrpc.NewClient(opts...)
|
||||
case "http/protobuf", "http/json":
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
if len(cfg.Endpoint) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
|
||||
}
|
||||
if len(cfg.EndpointURL) > 0 {
|
||||
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
|
||||
}
|
||||
|
||||
client = otlptracehttp.NewClient(opts...)
|
||||
default:
|
||||
return nil, errInvalidOTLPProtocol
|
||||
}
|
||||
|
||||
exporter, err := otlptrace.New(ctx, client)
|
||||
if err != nil {
|
||||
log.ErrorContext(ctx, "creating OTLP trace exporter", "err", err)
|
||||
}
|
||||
return exporter, err
|
||||
}
|
||||
|
||||
func newTraceProvider(traceExporter sdktrace.SpanExporter, res *resource.Resource) (*sdktrace.TracerProvider, error) {
|
||||
traceProvider := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithResource(res),
|
||||
|
@@ -10,8 +10,18 @@
|
||||
// -X go.ntppool.org/common/version.buildTime=2023-01-01T00:00:00Z \
|
||||
// -X go.ntppool.org/common/version.gitVersion=abc123"
|
||||
//
|
||||
// The package also automatically extracts build information from Go's debug.BuildInfo
|
||||
// when available, providing fallback values for VCS time and revision.
|
||||
// Build time supports both Unix epoch timestamps and RFC3339 format:
|
||||
//
|
||||
// # Unix epoch (simpler, recommended)
|
||||
// go build -ldflags "-X go.ntppool.org/common/version.buildTime=$(date +%s)"
|
||||
//
|
||||
// # RFC3339 format
|
||||
// go build -ldflags "-X go.ntppool.org/common/version.buildTime=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
|
||||
//
|
||||
// Both formats are automatically converted to RFC3339 for consistent output. The buildTime
|
||||
// parameter takes priority over Git commit time. If buildTime is not specified, the package
|
||||
// automatically extracts build information from Go's debug.BuildInfo when available,
|
||||
// providing fallback values for VCS time and revision.
|
||||
package version
|
||||
|
||||
import (
|
||||
@@ -19,7 +29,9 @@ import (
|
||||
"log/slog"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -30,7 +42,7 @@ import (
|
||||
// If not set, defaults to "dev-snapshot". The version should follow semantic versioning.
|
||||
var (
|
||||
VERSION string // Semantic version (e.g., "1.0.0" or "v1.0.0")
|
||||
buildTime string // Build timestamp (RFC3339 format)
|
||||
buildTime string // Build timestamp (Unix epoch or RFC3339, normalized to RFC3339)
|
||||
gitVersion string // Git commit hash
|
||||
gitModified bool // Whether the working tree was modified during build
|
||||
)
|
||||
@@ -38,6 +50,28 @@ var (
|
||||
// info holds the consolidated version information extracted from build variables and debug.BuildInfo.
|
||||
var info Info
|
||||
|
||||
// parseBuildTime converts a build time string to RFC3339 format.
|
||||
// Supports both Unix epoch timestamps (numeric strings) and RFC3339 format.
|
||||
// Returns the input unchanged if it cannot be parsed as either format.
|
||||
func parseBuildTime(s string) string {
|
||||
if s == "" {
|
||||
return s
|
||||
}
|
||||
|
||||
// Try parsing as Unix epoch timestamp (numeric string)
|
||||
if epoch, err := strconv.ParseInt(s, 10, 64); err == nil {
|
||||
return time.Unix(epoch, 0).UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
// Try parsing as RFC3339 to validate format
|
||||
if _, err := time.Parse(time.RFC3339, s); err == nil {
|
||||
return s // Already in RFC3339 format
|
||||
}
|
||||
|
||||
// Return original string if neither format works (graceful fallback)
|
||||
return s
|
||||
}
|
||||
|
||||
// Info represents structured version and build information.
|
||||
// This struct is used for JSON serialization and programmatic access to build metadata.
|
||||
type Info struct {
|
||||
@@ -48,6 +82,7 @@ type Info struct {
|
||||
}
|
||||
|
||||
func init() {
|
||||
buildTime = parseBuildTime(buildTime)
|
||||
info.BuildTime = buildTime
|
||||
info.GitRev = gitVersion
|
||||
|
||||
@@ -67,9 +102,9 @@ func init() {
|
||||
switch h.Key {
|
||||
case "vcs.time":
|
||||
if len(buildTime) == 0 {
|
||||
buildTime = h.Value
|
||||
buildTime = parseBuildTime(h.Value)
|
||||
info.BuildTime = buildTime
|
||||
}
|
||||
info.BuildTime = h.Value
|
||||
case "vcs.revision":
|
||||
// https://blog.carlmjohnson.net/post/2023/golang-git-hash-how-to/
|
||||
// todo: use BuildInfo.Main.Version if revision is empty
|
||||
|
@@ -309,3 +309,106 @@ func BenchmarkCheckVersionDevSnapshot(b *testing.B) {
|
||||
_ = CheckVersion(version, minimum)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseBuildTime(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "Unix epoch timestamp",
|
||||
input: "1672531200", // 2023-01-01T00:00:00Z
|
||||
expected: "2023-01-01T00:00:00Z",
|
||||
},
|
||||
{
|
||||
name: "Unix epoch zero",
|
||||
input: "0",
|
||||
expected: "1970-01-01T00:00:00Z",
|
||||
},
|
||||
{
|
||||
name: "Valid RFC3339 format",
|
||||
input: "2023-12-25T15:30:45Z",
|
||||
expected: "2023-12-25T15:30:45Z",
|
||||
},
|
||||
{
|
||||
name: "RFC3339 with timezone",
|
||||
input: "2023-12-25T10:30:45-05:00",
|
||||
expected: "2023-12-25T10:30:45-05:00",
|
||||
},
|
||||
{
|
||||
name: "Empty string",
|
||||
input: "",
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
name: "Invalid format - return unchanged",
|
||||
input: "not-a-date",
|
||||
expected: "not-a-date",
|
||||
},
|
||||
{
|
||||
name: "Invalid timestamp - return unchanged",
|
||||
input: "invalid-timestamp",
|
||||
expected: "invalid-timestamp",
|
||||
},
|
||||
{
|
||||
name: "Partial date - return unchanged",
|
||||
input: "2023-01-01",
|
||||
expected: "2023-01-01",
|
||||
},
|
||||
{
|
||||
name: "Negative epoch - should work",
|
||||
input: "-1",
|
||||
expected: "1969-12-31T23:59:59Z",
|
||||
},
|
||||
{
|
||||
name: "Large epoch timestamp",
|
||||
input: "4102444800", // 2100-01-01T00:00:00Z
|
||||
expected: "2100-01-01T00:00:00Z",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := parseBuildTime(tt.input)
|
||||
if result != tt.expected {
|
||||
t.Errorf("parseBuildTime(%q) = %q, expected %q", tt.input, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseBuildTimeConsistency(t *testing.T) {
|
||||
// Test that calling parseBuildTime multiple times with the same input returns the same result
|
||||
testInputs := []string{
|
||||
"1672531200",
|
||||
"2023-01-01T00:00:00Z",
|
||||
"invalid-date",
|
||||
"",
|
||||
}
|
||||
|
||||
for _, input := range testInputs {
|
||||
result1 := parseBuildTime(input)
|
||||
result2 := parseBuildTime(input)
|
||||
if result1 != result2 {
|
||||
t.Errorf("parseBuildTime(%q) not consistent: %q != %q", input, result1, result2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkParseBuildTime(b *testing.B) {
|
||||
inputs := []string{
|
||||
"1672531200", // Unix epoch
|
||||
"2023-01-01T00:00:00Z", // RFC3339
|
||||
"invalid-timestamp", // Invalid
|
||||
"", // Empty
|
||||
}
|
||||
|
||||
for _, input := range inputs {
|
||||
b.Run(input, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = parseBuildTime(input)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user