11 Commits

Author SHA1 Message Date
0b9769dc39 Prepare v0.5.1 2025-08-02 11:04:13 -07:00
9dadd9edc3 feat(version): add Unix epoch support for buildTime
Support both Unix epoch timestamps and RFC3339 format for build time
injection via ldflags. Unix epoch format provides simpler build
commands: $(date +%s) vs $(date -u +%Y-%m-%dT%H:%M:%SZ).

- Add parseBuildTime() to convert epoch to RFC3339
- Maintain backward compatibility with existing RFC3339 format
- Ensure consistent RFC3339 output regardless of input format
- Fix build date priority over git commit time
2025-08-02 10:16:41 -07:00
c6230be91e feat(metrics): add OTLP metrics support with centralized config
- Create new metrics/ package for OpenTelemetry-native metrics with OTLP export
- Refactor OTLP configuration to internal/tracerconfig/ to eliminate code duplication
- Add consistent retry configuration across all HTTP OTLP exporters
- Add configuration validation and improved error messages
- Include test coverage for all new functionality
- Make OpenTelemetry metrics dependencies explicit in go.mod

Designed for new applications requiring structured metrics export to
observability backends via OTLP protocol.
2025-08-02 09:29:27 -07:00
796b2a8412 tracing: enable retrying otlp requests when using http 2025-07-27 17:13:06 -07:00
6a3bc7bab3 feat(logger): add buffering exporter with TLS support for OTLP logs
Add buffering exporter to queue OTLP logs until tracing is configured.
Support TLS configuration for OpenTelemetry log export with client
certificate authentication. Improve logfmt formatting and tracing setup.
2025-07-27 16:36:18 -07:00
da13a371b4 feat(database): add shared transaction helpers
Add transaction base utilities with Begin, Commit, and Rollback
functions supporting both sql.DB and sql.Tx interfaces.
2025-07-12 23:52:48 -07:00
a1a5a6b8be database: create shared database package
Extract common database functionality from api/ntpdb and monitor/ntpdb
into shared common/database package:

- Dynamic connector pattern with configuration loading
- Configurable connection pool management (API: 25/10, Monitor: 10/5)
- Optional Prometheus metrics integration
- Generic transaction helpers with proper error handling
- Unified interfaces compatible with SQLC-generated code

Foundation for migration to eliminate ~200 lines of duplicate code.
2025-07-12 17:59:28 -07:00
96afb77844 database: create shared database package with configurable patterns
Extract ~200 lines of duplicate database connection code from api/ntpdb/
and monitor/ntpdb/ into common/database/ package. Creates foundation for
database consolidation while maintaining zero breaking changes.

Files added:
- config.go: Unified configuration with package-specific defaults
- connector.go: Dynamic connector pattern from Boostport
- pool.go: Configurable connection pool management
- metrics.go: Optional Prometheus metrics integration
- interfaces.go: Shared database interfaces for consistent patterns

Key features:
- Configuration-driven approach (API: 25/10 connections + metrics,
  Monitor: 10/5 connections, no metrics)
- Optional Prometheus metrics when registerer provided
- Backward compatibility via convenience functions
- Flexible config file loading (explicit paths + search-based)

Dependencies: Added mysql driver and yaml parsing for database configuration.
2025-07-12 16:54:24 -07:00
c372d79d1d build: goreleaser 2.11.0 and download script tweaks 2025-07-12 16:51:10 -07:00
b5141d6a70 Add database transaction helpers 2025-07-12 13:57:27 -07:00
694f8ba1d3 Add comprehensive godoc documentation to all packages
- Add package-level documentation with usage examples and architecture details
- Document all public types, functions, and methods following godoc conventions
- Remove unused logger.Error type and NewError function
- Apply consistent documentation style across all packages

Packages updated:
- apitls: TLS certificate management with automatic renewal
- config: Environment-based configuration system
- config/depenv: Deployment environment handling
- ekko: Enhanced Echo web framework wrapper
- kafka: Kafka client wrapper with TLS support
- logger: Structured logging with OpenTelemetry integration
- tracing: OpenTelemetry distributed tracing setup
- types: Shared data structures for NTP Pool project
- xff/fastlyxff: Fastly CDN IP range management

All tests pass after documentation changes.
2025-06-19 23:52:03 -07:00
36 changed files with 3215 additions and 249 deletions

34
CHANGELOG.md Normal file
View File

@@ -0,0 +1,34 @@
# Release Notes - v0.5.1
## Observability Enhancements
### OTLP Metrics Support
- **New `metrics/` package** - OpenTelemetry-native metrics with OTLP export support for structured metrics collection
- **Centralized OTLP configuration** - Refactored configuration to `internal/tracerconfig/` to eliminate code duplication across tracing, logging, and metrics
- **HTTP retry support** - Added consistent retry configuration for all HTTP OTLP exporters to improve reliability
### Enhanced Logging
- **Buffering exporter** - Added OTLP log buffering to queue logs until tracing configuration is available
- **TLS support for logs** - Client certificate authentication support for secure OTLP log export
- **Improved logfmt formatting** - Better structured output for log messages
### Tracing Improvements
- **HTTP retry support** - OTLP trace requests now automatically retry on failure when using HTTP transport
## Build System
### Version Package Enhancements
- **Unix epoch build time support** - Build time can now be injected as Unix timestamps (`$(date +%s)`) in addition to RFC3339 format
- **Simplified build commands** - Reduces complexity of ldflags injection while maintaining backward compatibility
- **Consistent output format** - All build times normalize to RFC3339 format regardless of input
## API Changes
### New Public Interfaces
- `metrics.NewMeterProvider()` - Create OTLP metrics provider with centralized configuration
- `metrics.Shutdown()` - Graceful shutdown for metrics exporters
- `internal/tracerconfig` - Shared OTLP configuration utilities (internal package)
### Dependencies
- Added explicit OpenTelemetry metrics dependencies to `go.mod`
- Updated tracing dependencies for retry support

View File

@@ -1,3 +1,14 @@
// Package apitls provides TLS certificate management with automatic renewal support.
//
// This package handles TLS certificate provisioning and management for secure
// inter-service communication within the NTP Pool project infrastructure.
// It provides both server and client certificate management through the
// CertificateProvider interface and includes a trusted CA certificate pool
// for validating certificates.
//
// The package integrates with certman for automatic certificate renewal
// and includes embedded CA certificates for establishing trust relationships
// between services.
package apitls
import (
@@ -13,11 +24,32 @@ import (
//go:embed ca.pem
var caBytes []byte
// CertificateProvider defines the interface for providing TLS certificates
// for both server and client connections. Implementations should handle
// certificate retrieval, caching, and renewal as needed.
//
// This interface supports both server-side certificate provisioning
// (via GetCertificate) and client-side certificate authentication
// (via GetClientCertificate).
type CertificateProvider interface {
// GetCertificate retrieves a server certificate based on the client hello information.
// This method is typically used in tls.Config.GetCertificate for server-side TLS.
GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error)
// GetClientCertificate retrieves a client certificate for mutual TLS authentication.
// This method is used in tls.Config.GetClientCertificate for client-side TLS.
GetClientCertificate(certRequestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error)
}
// CAPool returns a certificate pool containing trusted CA certificates
// for validating TLS connections within the NTP Pool infrastructure.
//
// The CA certificates are embedded in the binary and include the trusted
// certificate authorities used for inter-service communication.
// This pool should be used in tls.Config.RootCAs for client connections
// or tls.Config.ClientCAs for server connections requiring client certificates.
//
// Returns an error if the embedded CA certificates cannot be parsed or loaded.
func CAPool() (*x509.CertPool, error) {
capool := x509.NewCertPool()
if !capool.AppendCertsFromPEM(caBytes) {

View File

@@ -1,5 +1,18 @@
// Package config provides NTP Pool specific
// configuration tools.
// Package config provides environment-based configuration management for NTP Pool services.
//
// This package handles configuration loading from environment variables and provides
// utilities for constructing URLs for web and management interfaces. It supports
// deployment-specific settings including hostname configuration, TLS settings,
// and deployment modes.
//
// Configuration is loaded automatically from environment variables:
// - deployment_mode: The deployment environment (devel, production, etc.)
// - manage_hostname: Hostname for management interface
// - web_hostname: Comma-separated list of web hostnames (first is primary)
// - manage_tls: Enable TLS for management interface (yes, no, true, false)
// - web_tls: Enable TLS for web interface (yes, no, true, false)
//
// The package includes code generation for accessor methods using the accessory tool.
package config
import (
@@ -13,6 +26,9 @@ import (
//go:generate go tool github.com/masaushi/accessory -type Config
// Config holds environment-based configuration for NTP Pool services.
// It manages hostnames, TLS settings, and deployment modes loaded from
// environment variables. The struct includes code-generated accessor methods.
type Config struct {
deploymentMode string `accessor:"getter"`
@@ -26,6 +42,16 @@ type Config struct {
valid bool `accessor:"getter"`
}
// New creates a new Config instance by loading configuration from environment variables.
// It automatically parses hostnames, TLS settings, and deployment mode from the environment.
// The configuration is considered valid if at least one web hostname is provided.
//
// Environment variables used:
// - deployment_mode: Deployment environment identifier
// - manage_hostname: Management interface hostname
// - web_hostname: Comma-separated web hostnames (first becomes primary)
// - manage_tls: Management interface TLS setting
// - web_tls: Web interface TLS setting
func New() *Config {
c := Config{}
c.deploymentMode = os.Getenv("deployment_mode")
@@ -46,10 +72,26 @@ func New() *Config {
return &c
}
// WebURL constructs a complete URL for the web interface using the primary web hostname.
// It automatically selects HTTP or HTTPS based on the web_tls configuration setting.
//
// Parameters:
// - path: URL path component (should start with "/")
// - query: Optional URL query parameters (can be nil)
//
// Returns a complete URL string suitable for web interface requests.
func (c *Config) WebURL(path string, query *url.Values) string {
return baseURL(c.webHostname, c.webTLS, path, query)
}
// ManageURL constructs a complete URL for the management interface using the management hostname.
// It automatically selects HTTP or HTTPS based on the manage_tls configuration setting.
//
// Parameters:
// - path: URL path component (should start with "/")
// - query: Optional URL query parameters (can be nil)
//
// Returns a complete URL string suitable for management interface requests.
func (c *Config) ManageURL(path string, query *url.Values) string {
return baseURL(c.manageHostname, c.webTLS, path, query)
}

View File

@@ -1,3 +1,19 @@
// Package depenv provides deployment environment management for NTP Pool services.
//
// This package handles different deployment environments (development, test, production)
// and provides environment-specific configuration including API endpoints, management URLs,
// and monitoring domains. It supports string-based environment identification and
// automatic URL construction for various service endpoints.
//
// The package defines three main deployment environments:
// - DeployDevel: Development environment with dev-specific endpoints
// - DeployTest: Test/beta environment for staging
// - DeployProd: Production environment with live endpoints
//
// Environment detection supports both short and long forms:
// - "dev" or "devel" → DeployDevel
// - "test" or "beta" → DeployTest
// - "prod" → DeployProd
package depenv
import (
@@ -24,14 +40,27 @@ var apiServers = map[DeploymentEnvironment]string{
// }
const (
// DeployUndefined represents an unrecognized or unset deployment environment.
DeployUndefined DeploymentEnvironment = iota
// DeployDevel represents the development environment.
DeployDevel
// DeployTest represents the test/beta environment.
DeployTest
// DeployProd represents the production environment.
DeployProd
)
// DeploymentEnvironment represents a deployment environment type.
// It provides methods for environment-specific URL construction and
// supports text marshaling/unmarshaling for configuration files.
type DeploymentEnvironment uint8
// DeploymentEnvironmentFromString parses a string into a DeploymentEnvironment.
// It supports both short and long forms of environment names:
// - "dev" or "devel" → DeployDevel
// - "test" or "beta" → DeployTest
// - "prod" → DeployProd
// - any other value → DeployUndefined
func DeploymentEnvironmentFromString(s string) DeploymentEnvironment {
switch s {
case "devel", "dev":
@@ -45,6 +74,8 @@ func DeploymentEnvironmentFromString(s string) DeploymentEnvironment {
}
}
// String returns the canonical string representation of the deployment environment.
// Returns "prod", "test", "devel", or panics for invalid environments.
func (d DeploymentEnvironment) String() string {
switch d {
case DeployProd:
@@ -58,6 +89,9 @@ func (d DeploymentEnvironment) String() string {
}
}
// APIHost returns the API server URL for this deployment environment.
// It first checks the API_HOST environment variable for overrides,
// then falls back to the environment-specific default API endpoint.
func (d DeploymentEnvironment) APIHost() string {
if apiHost := os.Getenv("API_HOST"); apiHost != "" {
return apiHost
@@ -65,14 +99,26 @@ func (d DeploymentEnvironment) APIHost() string {
return apiServers[d]
}
// ManageURL constructs a management interface URL for this deployment environment.
// It combines the environment-specific management server base URL with the provided path.
//
// The path parameter should start with "/" for proper URL construction.
func (d DeploymentEnvironment) ManageURL(path string) string {
return manageServers[d] + path
}
// MonitorDomain returns the monitoring domain for this deployment environment.
// The domain follows the pattern: {environment}.mon.ntppool.dev
// For example: "devel.mon.ntppool.dev" for the development environment.
func (d DeploymentEnvironment) MonitorDomain() string {
return d.String() + ".mon.ntppool.dev"
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// It allows DeploymentEnvironment to be unmarshaled from configuration files
// and other text-based formats. Empty strings are treated as valid (no-op).
//
// Returns an error if the text represents an invalid deployment environment.
func (d *DeploymentEnvironment) UnmarshalText(text []byte) error {
s := string(text)
if s == "" {

61
database/config.go Normal file
View File

@@ -0,0 +1,61 @@
package database
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
// Config represents the database configuration structure
type Config struct {
MySQL DBConfig `yaml:"mysql"`
}
// DBConfig represents the MySQL database configuration
type DBConfig struct {
DSN string `default:"" flag:"dsn" usage:"Database DSN"`
User string `default:"" flag:"user"`
Pass string `default:"" flag:"pass"`
DBName string // Optional database name override
}
// ConfigOptions allows customization of database opening behavior
type ConfigOptions struct {
// ConfigFiles is a list of config file paths to search for database configuration
ConfigFiles []string
// EnablePoolMonitoring enables connection pool metrics collection
EnablePoolMonitoring bool
// PrometheusRegisterer for metrics collection. If nil, no metrics are collected.
PrometheusRegisterer prometheus.Registerer
// Connection pool settings
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
}
// DefaultConfigOptions returns the standard configuration options used by API package
func DefaultConfigOptions() ConfigOptions {
return ConfigOptions{
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
EnablePoolMonitoring: true,
PrometheusRegisterer: prometheus.DefaultRegisterer,
MaxOpenConns: 25,
MaxIdleConns: 10,
ConnMaxLifetime: 3 * time.Minute,
}
}
// MonitorConfigOptions returns configuration options optimized for Monitor package
func MonitorConfigOptions() ConfigOptions {
return ConfigOptions{
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
EnablePoolMonitoring: false, // Monitor doesn't need metrics
PrometheusRegisterer: nil, // No Prometheus dependency
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: 3 * time.Minute,
}
}

81
database/config_test.go Normal file
View File

@@ -0,0 +1,81 @@
package database
import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
)
func TestDefaultConfigOptions(t *testing.T) {
opts := DefaultConfigOptions()
// Verify expected defaults for API package
if opts.MaxOpenConns != 25 {
t.Errorf("Expected MaxOpenConns=25, got %d", opts.MaxOpenConns)
}
if opts.MaxIdleConns != 10 {
t.Errorf("Expected MaxIdleConns=10, got %d", opts.MaxIdleConns)
}
if opts.ConnMaxLifetime != 3*time.Minute {
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
}
if !opts.EnablePoolMonitoring {
t.Error("Expected EnablePoolMonitoring=true")
}
if opts.PrometheusRegisterer != prometheus.DefaultRegisterer {
t.Error("Expected PrometheusRegisterer to be DefaultRegisterer")
}
if len(opts.ConfigFiles) == 0 {
t.Error("Expected ConfigFiles to be non-empty")
}
}
func TestMonitorConfigOptions(t *testing.T) {
opts := MonitorConfigOptions()
// Verify expected defaults for Monitor package
if opts.MaxOpenConns != 10 {
t.Errorf("Expected MaxOpenConns=10, got %d", opts.MaxOpenConns)
}
if opts.MaxIdleConns != 5 {
t.Errorf("Expected MaxIdleConns=5, got %d", opts.MaxIdleConns)
}
if opts.ConnMaxLifetime != 3*time.Minute {
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
}
if opts.EnablePoolMonitoring {
t.Error("Expected EnablePoolMonitoring=false")
}
if opts.PrometheusRegisterer != nil {
t.Error("Expected PrometheusRegisterer to be nil")
}
if len(opts.ConfigFiles) == 0 {
t.Error("Expected ConfigFiles to be non-empty")
}
}
func TestConfigStructures(t *testing.T) {
// Test that configuration structures can be created and populated
config := Config{
MySQL: DBConfig{
DSN: "user:pass@tcp(localhost:3306)/dbname",
User: "testuser",
Pass: "testpass",
DBName: "testdb",
},
}
if config.MySQL.DSN == "" {
t.Error("Expected DSN to be set")
}
if config.MySQL.User != "testuser" {
t.Errorf("Expected User='testuser', got '%s'", config.MySQL.User)
}
if config.MySQL.Pass != "testpass" {
t.Errorf("Expected Pass='testpass', got '%s'", config.MySQL.Pass)
}
if config.MySQL.DBName != "testdb" {
t.Errorf("Expected DBName='testdb', got '%s'", config.MySQL.DBName)
}
}

88
database/connector.go Normal file
View File

@@ -0,0 +1,88 @@
package database
import (
"context"
"database/sql/driver"
"errors"
"fmt"
"os"
"github.com/go-sql-driver/mysql"
"gopkg.in/yaml.v3"
)
// from https://github.com/Boostport/dynamic-database-config
// CreateConnectorFunc is a function that creates a database connector
type CreateConnectorFunc func() (driver.Connector, error)
// Driver implements the sql/driver interface with dynamic configuration
type Driver struct {
CreateConnectorFunc CreateConnectorFunc
}
// Driver returns the driver instance
func (d Driver) Driver() driver.Driver {
return d
}
// Connect creates a new database connection using the dynamic connector
func (d Driver) Connect(ctx context.Context) (driver.Conn, error) {
connector, err := d.CreateConnectorFunc()
if err != nil {
return nil, fmt.Errorf("error creating connector from function: %w", err)
}
return connector.Connect(ctx)
}
// Open is not supported for dynamic configuration
func (d Driver) Open(name string) (driver.Conn, error) {
return nil, errors.New("open is not supported")
}
// createConnector creates a connector function that reads configuration from a file
func createConnector(configFile string) CreateConnectorFunc {
return func() (driver.Connector, error) {
dbFile, err := os.Open(configFile)
if err != nil {
return nil, err
}
defer dbFile.Close()
dec := yaml.NewDecoder(dbFile)
cfg := Config{}
err = dec.Decode(&cfg)
if err != nil {
return nil, err
}
dsn := cfg.MySQL.DSN
if len(dsn) == 0 {
dsn = os.Getenv("DATABASE_DSN")
if len(dsn) == 0 {
return nil, fmt.Errorf("dsn config in database.yaml or DATABASE_DSN environment variable required")
}
}
dbcfg, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
if user := cfg.MySQL.User; len(user) > 0 {
dbcfg.User = user
}
if pass := cfg.MySQL.Pass; len(pass) > 0 {
dbcfg.Passwd = pass
}
if name := cfg.MySQL.DBName; len(name) > 0 {
dbcfg.DBName = name
}
return mysql.NewConnector(dbcfg)
}
}

View File

@@ -0,0 +1,117 @@
package database
import (
"context"
"database/sql"
"testing"
)
// Mock types for testing SQLC integration patterns
type mockQueries struct {
db DBTX
}
type mockQueriesTx struct {
*mockQueries
tx *sql.Tx
}
// Mock the Begin method pattern that SQLC generates
func (q *mockQueries) Begin(ctx context.Context) (*mockQueriesTx, error) {
// This would normally be: tx, err := q.db.(*sql.DB).BeginTx(ctx, nil)
// For our test, we return a mock
return &mockQueriesTx{mockQueries: q, tx: nil}, nil
}
func (qtx *mockQueriesTx) Commit(ctx context.Context) error {
return nil // Mock implementation
}
func (qtx *mockQueriesTx) Rollback(ctx context.Context) error {
return nil // Mock implementation
}
// This test verifies that our common database interfaces are compatible with SQLC-generated code
func TestSQLCIntegration(t *testing.T) {
// Test that SQLC's DBTX interface matches our DBTX interface
t.Run("DBTX Interface Compatibility", func(t *testing.T) {
// Test interface compatibility by assignment without execution
var ourDBTX DBTX
// Test with sql.DB (should implement DBTX)
var db *sql.DB
ourDBTX = db // This will compile only if interfaces are compatible
_ = ourDBTX // Use the variable to avoid "unused" warning
// Test with sql.Tx (should implement DBTX)
var tx *sql.Tx
ourDBTX = tx // This will compile only if interfaces are compatible
_ = ourDBTX // Use the variable to avoid "unused" warning
// If we reach here, interfaces are compatible
t.Log("DBTX interface is compatible with sql.DB and sql.Tx")
})
t.Run("Transaction Interface Compatibility", func(t *testing.T) {
// This test verifies our transaction interfaces work with SQLC patterns
// We can't define methods inside a function, so we test interface compatibility
// Verify our DB interface is compatible with what SQLC expects
var dbInterface DB[*mockQueriesTx]
var mockDB *mockQueries = &mockQueries{}
dbInterface = mockDB
// Test that our transaction helper can work with this pattern
err := WithTransaction(context.Background(), dbInterface, func(ctx context.Context, qtx *mockQueriesTx) error {
// This would be where you'd call SQLC-generated query methods
return nil
})
if err != nil {
t.Errorf("Transaction helper failed: %v", err)
}
})
}
// Test that demonstrates how the common package would be used with real SQLC patterns
func TestRealWorldUsagePattern(t *testing.T) {
// This test shows how a package would typically use our common database code
t.Run("Database Opening Pattern", func(t *testing.T) {
// Test that our configuration options work as expected
opts := DefaultConfigOptions()
// Modify for test environment (no actual database connection)
opts.ConfigFiles = []string{} // No config files for unit test
opts.PrometheusRegisterer = nil // No metrics for unit test
// This would normally open a database: db, err := OpenDB(ctx, opts)
// For our unit test, we just verify the options are reasonable
if opts.MaxOpenConns <= 0 {
t.Error("MaxOpenConns should be positive")
}
if opts.MaxIdleConns <= 0 {
t.Error("MaxIdleConns should be positive")
}
if opts.ConnMaxLifetime <= 0 {
t.Error("ConnMaxLifetime should be positive")
}
})
t.Run("Monitor Package Configuration", func(t *testing.T) {
opts := MonitorConfigOptions()
// Verify monitor-specific settings
if opts.EnablePoolMonitoring {
t.Error("Monitor package should not enable pool monitoring")
}
if opts.PrometheusRegisterer != nil {
t.Error("Monitor package should not have Prometheus registerer")
}
if opts.MaxOpenConns != 10 {
t.Errorf("Expected MaxOpenConns=10 for monitor, got %d", opts.MaxOpenConns)
}
if opts.MaxIdleConns != 5 {
t.Errorf("Expected MaxIdleConns=5 for monitor, got %d", opts.MaxIdleConns)
}
})
}

34
database/interfaces.go Normal file
View File

@@ -0,0 +1,34 @@
package database
import (
"context"
"database/sql"
)
// DBTX matches the interface expected by SQLC-generated code
// This interface is implemented by both *sql.DB and *sql.Tx
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
// BaseQuerier provides basic query functionality
// This interface should be implemented by package-specific Queries types
type BaseQuerier interface {
WithTx(tx *sql.Tx) BaseQuerier
}
// BaseQuerierTx provides transaction functionality
// This interface should be implemented by package-specific Queries types
type BaseQuerierTx interface {
BaseQuerier
Begin(ctx context.Context) (BaseQuerierTx, error)
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
// TransactionFunc represents a function that operates within a database transaction
// This is used by the shared transaction helpers in transaction.go
type TransactionFunc[Q any] func(ctx context.Context, q Q) error

93
database/metrics.go Normal file
View File

@@ -0,0 +1,93 @@
package database
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
type DatabaseMetrics struct {
ConnectionsOpen prometheus.Gauge
ConnectionsIdle prometheus.Gauge
ConnectionsInUse prometheus.Gauge
ConnectionsWaitCount prometheus.Counter
ConnectionsWaitDuration prometheus.Histogram
}
// NewDatabaseMetrics creates a new set of database metrics and registers them
func NewDatabaseMetrics(registerer prometheus.Registerer) *DatabaseMetrics {
metrics := &DatabaseMetrics{
ConnectionsOpen: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "database_connections_open",
Help: "Number of open database connections",
}),
ConnectionsIdle: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "database_connections_idle",
Help: "Number of idle database connections",
}),
ConnectionsInUse: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "database_connections_in_use",
Help: "Number of database connections in use",
}),
ConnectionsWaitCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "database_connections_wait_count_total",
Help: "Total number of times a connection had to wait",
}),
ConnectionsWaitDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "database_connections_wait_duration_seconds",
Help: "Time spent waiting for a database connection",
Buckets: prometheus.DefBuckets,
}),
}
if registerer != nil {
registerer.MustRegister(
metrics.ConnectionsOpen,
metrics.ConnectionsIdle,
metrics.ConnectionsInUse,
metrics.ConnectionsWaitCount,
metrics.ConnectionsWaitDuration,
)
}
return metrics
}
// monitorConnectionPool runs a background goroutine to collect connection pool metrics
func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheus.Registerer) {
if registerer == nil {
return // No metrics collection if no registerer provided
}
metrics := NewDatabaseMetrics(registerer)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := db.Stats()
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
metrics.ConnectionsIdle.Set(float64(stats.Idle))
metrics.ConnectionsInUse.Set(float64(stats.InUse))
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
if stats.WaitDuration > 0 {
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds())
}
// Log connection pool stats for high usage or waiting
if stats.OpenConnections > 20 || stats.WaitCount > 0 {
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n",
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration)
}
}
}
}

78
database/pool.go Normal file
View File

@@ -0,0 +1,78 @@
package database
import (
"context"
"database/sql"
"fmt"
"os"
"go.ntppool.org/common/logger"
)
// OpenDB opens a database connection with the specified configuration options
func OpenDB(ctx context.Context, options ConfigOptions) (*sql.DB, error) {
log := logger.Setup()
configFile, err := findConfigFile(options.ConfigFiles)
if err != nil {
return nil, err
}
dbconn := sql.OpenDB(Driver{
CreateConnectorFunc: createConnector(configFile),
})
// Set connection pool parameters
dbconn.SetConnMaxLifetime(options.ConnMaxLifetime)
dbconn.SetMaxOpenConns(options.MaxOpenConns)
dbconn.SetMaxIdleConns(options.MaxIdleConns)
err = dbconn.Ping()
if err != nil {
log.Error("could not connect to database", "err", err)
return nil, err
}
// Start optional connection pool monitoring
if options.EnablePoolMonitoring && options.PrometheusRegisterer != nil {
go monitorConnectionPool(ctx, dbconn, options.PrometheusRegisterer)
}
return dbconn, nil
}
// OpenDBWithConfigFile opens a database connection using an explicit config file path
// This is a convenience function for API package compatibility
func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, error) {
options := DefaultConfigOptions()
options.ConfigFiles = []string{configFile}
return OpenDB(ctx, options)
}
// OpenDBMonitor opens a database connection with monitor-specific defaults
// This is a convenience function for Monitor package compatibility
func OpenDBMonitor() (*sql.DB, error) {
options := MonitorConfigOptions()
return OpenDB(context.Background(), options)
}
// findConfigFile searches for the first existing config file from the list
func findConfigFile(configFiles []string) (string, error) {
var firstErr error
for _, configFile := range configFiles {
if configFile == "" {
continue
}
if _, err := os.Stat(configFile); err == nil {
return configFile, nil
} else if firstErr == nil {
firstErr = err
}
}
if firstErr != nil {
return "", fmt.Errorf("no config file found: %w", firstErr)
}
return "", fmt.Errorf("no valid config files provided")
}

69
database/transaction.go Normal file
View File

@@ -0,0 +1,69 @@
package database
import (
"context"
"fmt"
"go.ntppool.org/common/logger"
)
// DB interface for database operations that can begin transactions
type DB[Q any] interface {
Begin(ctx context.Context) (Q, error)
}
// TX interface for transaction operations
type TX interface {
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
// WithTransaction executes a function within a database transaction
// Handles proper rollback on error and commit on success
func WithTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
tx, err := db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
var committed bool
defer func() {
if !committed {
if rbErr := tx.Rollback(ctx); rbErr != nil {
// Log rollback error but don't override original error
log := logger.FromContext(ctx)
log.ErrorContext(ctx, "failed to rollback transaction", "error", rbErr)
}
}
}()
if err := fn(ctx, tx); err != nil {
return err
}
err = tx.Commit(ctx)
committed = true // Mark as committed regardless of commit success/failure
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// WithReadOnlyTransaction executes a read-only function within a transaction
// Always rolls back at the end (for consistent read isolation)
func WithReadOnlyTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
tx, err := db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin read-only transaction: %w", err)
}
defer func() {
if rbErr := tx.Rollback(ctx); rbErr != nil {
log := logger.FromContext(ctx)
log.ErrorContext(ctx, "failed to rollback read-only transaction", "error", rbErr)
}
}()
return fn(ctx, tx)
}

View File

@@ -0,0 +1,69 @@
package database
import (
"context"
"database/sql"
"fmt"
"go.ntppool.org/common/logger"
)
// Shared interface definitions that both packages use identically
type BaseBeginner interface {
Begin(context.Context) (sql.Tx, error)
}
type BaseTx interface {
BaseBeginner
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}
// BeginTransactionForQuerier contains the shared Begin() logic from both packages
func BeginTransactionForQuerier(ctx context.Context, db DBTX) (DBTX, error) {
if sqlDB, ok := db.(*sql.DB); ok {
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, err
}
return tx, nil
} else {
// Handle transaction case
if beginner, ok := db.(BaseBeginner); ok {
tx, err := beginner.Begin(ctx)
if err != nil {
return nil, err
}
return &tx, nil
}
return nil, fmt.Errorf("database connection does not support transactions")
}
}
// CommitTransactionForQuerier contains the shared Commit() logic from both packages
func CommitTransactionForQuerier(ctx context.Context, db DBTX) error {
if sqlTx, ok := db.(*sql.Tx); ok {
return sqlTx.Commit()
}
tx, ok := db.(BaseTx)
if !ok {
log := logger.FromContext(ctx)
log.ErrorContext(ctx, "could not get a Tx", "type", fmt.Sprintf("%T", db))
return sql.ErrTxDone
}
return tx.Commit(ctx)
}
// RollbackTransactionForQuerier contains the shared Rollback() logic from both packages
func RollbackTransactionForQuerier(ctx context.Context, db DBTX) error {
if sqlTx, ok := db.(*sql.Tx); ok {
return sqlTx.Rollback()
}
tx, ok := db.(BaseTx)
if !ok {
return sql.ErrTxDone
}
return tx.Rollback(ctx)
}

View File

@@ -0,0 +1,157 @@
package database
import (
"context"
"errors"
"testing"
)
// Mock implementations for testing
type mockDB struct {
beginError error
txMock *mockTX
}
func (m *mockDB) Begin(ctx context.Context) (*mockTX, error) {
if m.beginError != nil {
return nil, m.beginError
}
return m.txMock, nil
}
type mockTX struct {
commitError error
rollbackError error
commitCalled bool
rollbackCalled bool
}
func (m *mockTX) Commit(ctx context.Context) error {
m.commitCalled = true
return m.commitError
}
func (m *mockTX) Rollback(ctx context.Context) error {
m.rollbackCalled = true
return m.rollbackError
}
func TestWithTransaction_Success(t *testing.T) {
tx := &mockTX{}
db := &mockDB{txMock: tx}
var functionCalled bool
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
functionCalled = true
if q != tx {
t.Error("Expected transaction to be passed to function")
}
return nil
})
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if !functionCalled {
t.Error("Expected function to be called")
}
if !tx.commitCalled {
t.Error("Expected commit to be called")
}
if tx.rollbackCalled {
t.Error("Expected rollback NOT to be called on success")
}
}
func TestWithTransaction_FunctionError(t *testing.T) {
tx := &mockTX{}
db := &mockDB{txMock: tx}
expectedError := errors.New("function error")
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
return expectedError
})
if err != expectedError {
t.Errorf("Expected error %v, got %v", expectedError, err)
}
if tx.commitCalled {
t.Error("Expected commit NOT to be called on function error")
}
if !tx.rollbackCalled {
t.Error("Expected rollback to be called on function error")
}
}
func TestWithTransaction_BeginError(t *testing.T) {
expectedError := errors.New("begin error")
db := &mockDB{beginError: expectedError}
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
t.Error("Function should not be called when Begin fails")
return nil
})
if err == nil || !errors.Is(err, expectedError) {
t.Errorf("Expected wrapped begin error, got %v", err)
}
}
func TestWithTransaction_CommitError(t *testing.T) {
commitError := errors.New("commit error")
tx := &mockTX{commitError: commitError}
db := &mockDB{txMock: tx}
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
return nil
})
if err == nil || !errors.Is(err, commitError) {
t.Errorf("Expected wrapped commit error, got %v", err)
}
if !tx.commitCalled {
t.Error("Expected commit to be called")
}
if tx.rollbackCalled {
t.Error("Expected rollback NOT to be called when commit fails")
}
}
func TestWithReadOnlyTransaction_Success(t *testing.T) {
tx := &mockTX{}
db := &mockDB{txMock: tx}
var functionCalled bool
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
functionCalled = true
return nil
})
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if !functionCalled {
t.Error("Expected function to be called")
}
if tx.commitCalled {
t.Error("Expected commit NOT to be called in read-only transaction")
}
if !tx.rollbackCalled {
t.Error("Expected rollback to be called in read-only transaction")
}
}
func TestWithReadOnlyTransaction_FunctionError(t *testing.T) {
tx := &mockTX{}
db := &mockDB{txMock: tx}
expectedError := errors.New("function error")
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
return expectedError
})
if err != expectedError {
t.Errorf("Expected error %v, got %v", expectedError, err)
}
if !tx.rollbackCalled {
t.Error("Expected rollback to be called")
}
}

View File

@@ -1,3 +1,32 @@
// Package ekko provides an enhanced Echo web framework wrapper with pre-configured middleware.
//
// This package wraps the Echo web framework with a comprehensive middleware stack including:
// - OpenTelemetry distributed tracing with request context propagation
// - Prometheus metrics collection with per-service subsystems
// - Structured logging with trace ID correlation
// - Security headers (HSTS, content security policy)
// - Gzip compression for response optimization
// - Recovery middleware with detailed error logging
// - HTTP/2 support with H2C (HTTP/2 Cleartext) capability
//
// The package uses functional options pattern for flexible configuration
// and supports graceful shutdown with configurable timeouts. It's designed
// as the standard web service foundation for NTP Pool project services.
//
// Example usage:
//
// ekko, err := ekko.New("myservice",
// ekko.WithPort(8080),
// ekko.WithPrometheus(prometheus.DefaultRegisterer),
// ekko.WithEchoSetup(func(e *echo.Echo) error {
// e.GET("/health", healthHandler)
// return nil
// }),
// )
// if err != nil {
// log.Fatal(err)
// }
// err = ekko.Start(ctx)
package ekko
import (
@@ -20,6 +49,25 @@ import (
"golang.org/x/sync/errgroup"
)
// New creates a new Ekko instance with the specified service name and functional options.
// The name parameter is used for OpenTelemetry service identification, Prometheus metrics
// subsystem naming, and server identification headers.
//
// Default configuration includes:
// - 60 second write timeout
// - 30 second read header timeout
// - HTTP/2 support with H2C
// - Standard middleware stack (tracing, metrics, logging, security)
//
// Use functional options to customize behavior:
// - WithPort(): Set server port (required for Start())
// - WithPrometheus(): Enable Prometheus metrics
// - WithEchoSetup(): Configure routes and handlers
// - WithLogFilters(): Filter access logs
// - WithOtelMiddleware(): Custom OpenTelemetry middleware
// - WithWriteTimeout(): Custom write timeout
// - WithReadHeaderTimeout(): Custom read header timeout
// - WithGzipConfig(): Custom gzip compression settings
func New(name string, options ...func(*Ekko)) (*Ekko, error) {
ek := &Ekko{
writeTimeout: 60 * time.Second,
@@ -32,13 +80,25 @@ func New(name string, options ...func(*Ekko)) (*Ekko, error) {
return ek, nil
}
// Setup Echo; only intended for testing
// SetupEcho creates and configures an Echo instance without starting the server.
// This method is primarily intended for testing scenarios where you need access
// to the configured Echo instance without starting the HTTP server.
//
// The returned Echo instance includes all configured middleware and routes
// but requires manual server lifecycle management.
func (ek *Ekko) SetupEcho(ctx context.Context) (*echo.Echo, error) {
return ek.setup(ctx)
}
// Setup Echo and start the server. Will return if the http server
// returns or the context is done.
// Start creates the Echo instance and starts the HTTP server with graceful shutdown support.
// The server runs until either an error occurs or the provided context is cancelled.
//
// The server supports HTTP/2 with H2C (HTTP/2 Cleartext) and includes a 5-second
// graceful shutdown timeout when the context is cancelled. Server configuration
// (port, timeouts, middleware) must be set via functional options during New().
//
// Returns an error if server startup fails or if shutdown doesn't complete within
// the timeout period. Returns nil for clean shutdown via context cancellation.
func (ek *Ekko) Start(ctx context.Context) error {
log := logger.Setup()

View File

@@ -9,6 +9,9 @@ import (
slogecho "github.com/samber/slog-echo"
)
// Ekko represents an enhanced Echo web server with pre-configured middleware stack.
// It encapsulates server configuration, middleware options, and lifecycle management
// for NTP Pool web services. Use New() with functional options to configure.
type Ekko struct {
name string
prom prometheus.Registerer
@@ -22,50 +25,76 @@ type Ekko struct {
readHeaderTimeout time.Duration
}
// RouteFn defines a function type for configuring Echo routes and handlers.
// It receives a configured Echo instance and should register all application
// routes, middleware, and handlers. Return an error to abort server startup.
type RouteFn func(e *echo.Echo) error
// WithPort sets the HTTP server port. This option is required when using Start().
// The port should be available and the process should have permission to bind to it.
func WithPort(port int) func(*Ekko) {
return func(ek *Ekko) {
ek.port = port
}
}
// WithPrometheus enables Prometheus metrics collection using the provided registerer.
// Metrics include HTTP request duration, request count, and response size histograms.
// The service name is used as the metrics subsystem for namespacing.
func WithPrometheus(reg prometheus.Registerer) func(*Ekko) {
return func(ek *Ekko) {
ek.prom = reg
}
}
// WithEchoSetup configures application routes and handlers via a setup function.
// The provided function receives the configured Echo instance after all middleware
// is applied and should register routes, custom middleware, and handlers.
func WithEchoSetup(rfn RouteFn) func(*Ekko) {
return func(ek *Ekko) {
ek.routeFn = rfn
}
}
// WithLogFilters configures access log filtering to reduce log noise.
// Filters can exclude specific paths, methods, or status codes from access logs.
// Useful for excluding health checks, metrics endpoints, and other high-frequency requests.
func WithLogFilters(f []slogecho.Filter) func(*Ekko) {
return func(ek *Ekko) {
ek.logFilters = f
}
}
// WithOtelMiddleware replaces the default OpenTelemetry middleware with a custom implementation.
// The default middleware provides distributed tracing for all requests. Use this option
// when you need custom trace configuration or want to disable tracing entirely.
func WithOtelMiddleware(mw echo.MiddlewareFunc) func(*Ekko) {
return func(ek *Ekko) {
ek.otelmiddleware = mw
}
}
// WithWriteTimeout configures the HTTP server write timeout.
// This is the maximum duration before timing out writes of the response.
// Default is 60 seconds. Should be longer than expected response generation time.
func WithWriteTimeout(t time.Duration) func(*Ekko) {
return func(ek *Ekko) {
ek.writeTimeout = t
}
}
// WithReadHeaderTimeout configures the HTTP server read header timeout.
// This is the amount of time allowed to read request headers.
// Default is 30 seconds. Should be sufficient for slow clients and large headers.
func WithReadHeaderTimeout(t time.Duration) func(*Ekko) {
return func(ek *Ekko) {
ek.readHeaderTimeout = t
}
}
// WithGzipConfig provides custom gzip compression configuration.
// By default, gzip compression is enabled with standard settings.
// Use this option to customize compression level, skip patterns, or disable compression.
func WithGzipConfig(gzipConfig *middleware.GzipConfig) func(*Ekko) {
return func(ek *Ekko) {
ek.gzipConfig = gzipConfig

17
go.mod
View File

@@ -4,10 +4,12 @@ go 1.23.5
require (
github.com/abh/certman v0.4.0
github.com/go-sql-driver/mysql v1.9.3
github.com/labstack/echo-contrib v0.17.2
github.com/labstack/echo/v4 v4.13.3
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/remychantenay/slog-otel v1.3.2
github.com/samber/slog-echo v1.14.8
github.com/samber/slog-multi v1.2.4
@@ -17,20 +19,28 @@ require (
go.opentelemetry.io/contrib/exporters/autoexport v0.58.0
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.58.0
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.9.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.9.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0
go.opentelemetry.io/otel/log v0.9.0
go.opentelemetry.io/otel/metric v1.33.0
go.opentelemetry.io/otel/sdk v1.33.0
go.opentelemetry.io/otel/sdk/log v0.9.0
go.opentelemetry.io/otel/sdk/metric v1.33.0
go.opentelemetry.io/otel/trace v1.33.0
golang.org/x/mod v0.22.0
golang.org/x/net v0.33.0
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.69.2
gopkg.in/yaml.v3 v3.0.1
)
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -47,7 +57,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/samber/lo v1.47.0 // indirect
@@ -56,16 +65,10 @@ require (
github.com/valyala/fasttemplate v1.2.2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/bridges/prometheus v0.58.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.9.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.9.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.55.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.9.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.33.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect

12
go.sum
View File

@@ -1,3 +1,5 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/abh/certman v0.4.0 h1:XHoDtb0YyRQPclaHMrBDlKTVZpNjTK6vhB0S3Bd/Sbs=
github.com/abh/certman v0.4.0/go.mod h1:x8QhpKVZifmV1Hdiwdg9gLo2GMPAxezz1s3zrVnPs+I=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -17,6 +19,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -30,6 +34,10 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/labstack/echo-contrib v0.17.2 h1:K1zivqmtcC70X9VdBFdLomjPDEVHlrcAObqmuFj1c6w=
@@ -65,6 +73,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/remychantenay/slog-otel v1.3.2 h1:ZBx8qnwfLJ6e18Vba4e9Xp9B7khTmpIwFsU1sAmActw=
github.com/remychantenay/slog-otel v1.3.2/go.mod h1:gKW4tQ8cGOKoA+bi7wtYba/tcJ6Tc9XyQ/EW8gHA/2E=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
@@ -211,6 +221,8 @@ google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,378 @@
// Package tracerconfig provides a bridge to eliminate circular dependencies between
// the logger and tracing packages. It stores tracer configuration and provides
// factory functions that can be used by the logger package without importing tracing.
package tracerconfig
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/url"
"os"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/credentials"
)
const (
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
otelExporterOTLPLogsProtoEnvKey = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL"
otelExporterOTLPMetricsProtoEnvKey = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL"
)
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
// newInvalidProtocolError creates a specific error message for invalid protocols
func newInvalidProtocolError(protocol, signalType string) error {
return fmt.Errorf("invalid OTLP protocol '%s' for %s - should be one of ['grpc', 'http/protobuf', 'http/json']", protocol, signalType)
}
// Validate checks the configuration for common errors and inconsistencies
func (c *Config) Validate() error {
var errs []error
// Check that both Endpoint and EndpointURL are not specified
if c.Endpoint != "" && c.EndpointURL != "" {
errs = append(errs, errors.New("cannot specify both Endpoint and EndpointURL - use one or the other"))
}
// Validate EndpointURL format if specified
if c.EndpointURL != "" {
if _, err := url.Parse(c.EndpointURL); err != nil {
errs = append(errs, fmt.Errorf("invalid EndpointURL format: %w", err))
}
}
// Validate Endpoint format if specified
if c.Endpoint != "" {
// Basic validation - should not contain protocol scheme
if strings.Contains(c.Endpoint, "://") {
errs = append(errs, errors.New("Endpoint should not include protocol scheme (use EndpointURL for full URLs)"))
}
// Should not be empty after trimming whitespace
if strings.TrimSpace(c.Endpoint) == "" {
errs = append(errs, errors.New("Endpoint cannot be empty or whitespace"))
}
}
// Validate TLS configuration consistency
if c.CertificateProvider != nil && c.RootCAs == nil {
// This is just a warning - client cert without custom CAs is valid
// but might indicate a configuration issue
}
// Validate service name if specified
if c.ServiceName != "" && strings.TrimSpace(c.ServiceName) == "" {
errs = append(errs, errors.New("ServiceName cannot be empty or whitespace"))
}
// Combine all errors
if len(errs) > 0 {
var errMsgs []string
for _, err := range errs {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("configuration validation failed: %s", strings.Join(errMsgs, "; "))
}
return nil
}
// ValidateAndStore validates the configuration before storing it
func ValidateAndStore(ctx context.Context, cfg *Config, logFactory LogExporterFactory, metricFactory MetricExporterFactory, traceFactory TraceExporterFactory) error {
if cfg != nil {
if err := cfg.Validate(); err != nil {
return err
}
}
Store(ctx, cfg, logFactory, metricFactory, traceFactory)
return nil
}
// GetClientCertificate defines a function type for providing client certificates for mutual TLS.
// This is used when exporting telemetry data to secured OTLP endpoints that require
// client certificate authentication.
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
// Config provides configuration options for OpenTelemetry tracing setup.
// It supplements standard OpenTelemetry environment variables with additional
// NTP Pool-specific configuration including TLS settings for secure OTLP export.
type Config struct {
ServiceName string // Service name for resource identification (overrides OTEL_SERVICE_NAME)
Environment string // Deployment environment (development, staging, production)
Endpoint string // OTLP endpoint hostname/port (e.g., "otlp.example.com:4317")
EndpointURL string // Complete OTLP endpoint URL (e.g., "https://otlp.example.com:4317/v1/traces")
CertificateProvider GetClientCertificate // Client certificate provider for mutual TLS
RootCAs *x509.CertPool // CA certificate pool for server verification
}
// LogExporterFactory creates an OTLP log exporter using the provided configuration.
// This allows the logger package to create exporters without importing the tracing package.
type LogExporterFactory func(context.Context, *Config) (sdklog.Exporter, error)
// MetricExporterFactory creates an OTLP metric exporter using the provided configuration.
// This allows the metrics package to create exporters without importing the tracing package.
type MetricExporterFactory func(context.Context, *Config) (sdkmetric.Exporter, error)
// TraceExporterFactory creates an OTLP trace exporter using the provided configuration.
// This allows for consistent trace exporter creation across packages.
type TraceExporterFactory func(context.Context, *Config) (sdktrace.SpanExporter, error)
// Global state for sharing configuration between packages
var (
globalConfig *Config
globalContext context.Context
logExporterFactory LogExporterFactory
metricExporterFactory MetricExporterFactory
traceExporterFactory TraceExporterFactory
configMu sync.RWMutex
)
// Store saves the tracer configuration and exporter factories for use by other packages.
// This should be called by the tracing package during initialization.
func Store(ctx context.Context, cfg *Config, logFactory LogExporterFactory, metricFactory MetricExporterFactory, traceFactory TraceExporterFactory) {
configMu.Lock()
defer configMu.Unlock()
globalConfig = cfg
globalContext = ctx
logExporterFactory = logFactory
metricExporterFactory = metricFactory
traceExporterFactory = traceFactory
}
// GetLogExporter returns the stored configuration and log exporter factory.
// Returns nil values if no configuration has been stored yet.
func GetLogExporter() (*Config, context.Context, LogExporterFactory) {
configMu.RLock()
defer configMu.RUnlock()
return globalConfig, globalContext, logExporterFactory
}
// GetMetricExporter returns the stored configuration and metric exporter factory.
// Returns nil values if no configuration has been stored yet.
func GetMetricExporter() (*Config, context.Context, MetricExporterFactory) {
configMu.RLock()
defer configMu.RUnlock()
return globalConfig, globalContext, metricExporterFactory
}
// GetTraceExporter returns the stored configuration and trace exporter factory.
// Returns nil values if no configuration has been stored yet.
func GetTraceExporter() (*Config, context.Context, TraceExporterFactory) {
configMu.RLock()
defer configMu.RUnlock()
return globalConfig, globalContext, traceExporterFactory
}
// Get returns the stored tracer configuration, context, and log exporter factory.
// This maintains backward compatibility for the logger package.
// Returns nil values if no configuration has been stored yet.
func Get() (*Config, context.Context, LogExporterFactory) {
return GetLogExporter()
}
// IsConfigured returns true if tracer configuration has been stored.
func IsConfigured() bool {
configMu.RLock()
defer configMu.RUnlock()
return globalConfig != nil && globalContext != nil
}
// Clear removes the stored configuration. This is primarily useful for testing.
func Clear() {
configMu.Lock()
defer configMu.Unlock()
globalConfig = nil
globalContext = nil
logExporterFactory = nil
metricExporterFactory = nil
traceExporterFactory = nil
}
// getTLSConfig creates a TLS configuration from the provided Config.
func getTLSConfig(cfg *Config) *tls.Config {
if cfg.CertificateProvider == nil {
return nil
}
return &tls.Config{
GetClientCertificate: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
}
// getProtocol determines the OTLP protocol to use for the given signal type.
// It follows OpenTelemetry environment variable precedence.
func getProtocol(signalSpecificEnv string) string {
proto := os.Getenv(signalSpecificEnv)
if proto == "" {
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
}
// Fallback to default, http/protobuf.
if proto == "" {
proto = "http/protobuf"
}
return proto
}
// CreateOTLPLogExporter creates an OTLP log exporter using the provided configuration.
func CreateOTLPLogExporter(ctx context.Context, cfg *Config) (sdklog.Exporter, error) {
tlsConfig := getTLSConfig(cfg)
proto := getProtocol(otelExporterOTLPLogsProtoEnvKey)
switch proto {
case "grpc":
opts := []otlploggrpc.Option{
otlploggrpc.WithCompressor("gzip"),
}
if tlsConfig != nil {
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlploggrpc.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlploggrpc.WithEndpointURL(cfg.EndpointURL))
}
return otlploggrpc.New(ctx, opts...)
case "http/protobuf", "http/json":
opts := []otlploghttp.Option{
otlploghttp.WithCompression(otlploghttp.GzipCompression),
}
if tlsConfig != nil {
opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlploghttp.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlploghttp.WithEndpointURL(cfg.EndpointURL))
}
opts = append(opts, otlploghttp.WithRetry(otlploghttp.RetryConfig{
Enabled: true,
InitialInterval: 3 * time.Second,
MaxInterval: 60 * time.Second,
MaxElapsedTime: 5 * time.Minute,
}))
return otlploghttp.New(ctx, opts...)
default:
return nil, newInvalidProtocolError(proto, "logs")
}
}
// CreateOTLPMetricExporter creates an OTLP metric exporter using the provided configuration.
func CreateOTLPMetricExporter(ctx context.Context, cfg *Config) (sdkmetric.Exporter, error) {
tlsConfig := getTLSConfig(cfg)
proto := getProtocol(otelExporterOTLPMetricsProtoEnvKey)
switch proto {
case "grpc":
opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithCompressor("gzip"),
}
if tlsConfig != nil {
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlpmetricgrpc.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlpmetricgrpc.WithEndpointURL(cfg.EndpointURL))
}
return otlpmetricgrpc.New(ctx, opts...)
case "http/protobuf", "http/json":
opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
}
if tlsConfig != nil {
opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlpmetrichttp.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlpmetrichttp.WithEndpointURL(cfg.EndpointURL))
}
opts = append(opts, otlpmetrichttp.WithRetry(otlpmetrichttp.RetryConfig{
Enabled: true,
InitialInterval: 3 * time.Second,
MaxInterval: 60 * time.Second,
MaxElapsedTime: 5 * time.Minute,
}))
return otlpmetrichttp.New(ctx, opts...)
default:
return nil, newInvalidProtocolError(proto, "metrics")
}
}
// CreateOTLPTraceExporter creates an OTLP trace exporter using the provided configuration.
func CreateOTLPTraceExporter(ctx context.Context, cfg *Config) (sdktrace.SpanExporter, error) {
tlsConfig := getTLSConfig(cfg)
proto := getProtocol(otelExporterOTLPTracesProtoEnvKey)
var client otlptrace.Client
switch proto {
case "grpc":
opts := []otlptracegrpc.Option{
otlptracegrpc.WithCompressor("gzip"),
}
if tlsConfig != nil {
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlptracegrpc.WithEndpointURL(cfg.EndpointURL))
}
client = otlptracegrpc.NewClient(opts...)
case "http/protobuf", "http/json":
opts := []otlptracehttp.Option{
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
}
if tlsConfig != nil {
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
}
opts = append(opts, otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: 3 * time.Second,
MaxInterval: 60 * time.Second,
MaxElapsedTime: 5 * time.Minute,
}))
client = otlptracehttp.NewClient(opts...)
default:
return nil, newInvalidProtocolError(proto, "traces")
}
return otlptrace.New(ctx, client)
}

View File

@@ -0,0 +1,474 @@
package tracerconfig
import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"strings"
"sync"
"testing"
"time"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func TestStore_And_Retrieve(t *testing.T) {
// Clear any existing configuration
Clear()
ctx := context.Background()
config := &Config{
ServiceName: "test-service",
Environment: "test",
Endpoint: "localhost:4317",
}
// Create mock factories
logFactory := func(context.Context, *Config) (sdklog.Exporter, error) { return nil, nil }
metricFactory := func(context.Context, *Config) (sdkmetric.Exporter, error) { return nil, nil }
traceFactory := func(context.Context, *Config) (sdktrace.SpanExporter, error) { return nil, nil }
// Store configuration
Store(ctx, config, logFactory, metricFactory, traceFactory)
// Test IsConfigured
if !IsConfigured() {
t.Error("IsConfigured() should return true after Store()")
}
// Test GetLogExporter
cfg, ctx2, factory := GetLogExporter()
if cfg == nil || ctx2 == nil || factory == nil {
t.Error("GetLogExporter() should return non-nil values")
}
if cfg.ServiceName != "test-service" {
t.Errorf("Expected ServiceName 'test-service', got '%s'", cfg.ServiceName)
}
// Test GetMetricExporter
cfg, ctx3, metricFact := GetMetricExporter()
if cfg == nil || ctx3 == nil || metricFact == nil {
t.Error("GetMetricExporter() should return non-nil values")
}
// Test GetTraceExporter
cfg, ctx4, traceFact := GetTraceExporter()
if cfg == nil || ctx4 == nil || traceFact == nil {
t.Error("GetTraceExporter() should return non-nil values")
}
// Test backward compatibility Get()
cfg, ctx5, logFact := Get()
if cfg == nil || ctx5 == nil || logFact == nil {
t.Error("Get() should return non-nil values for backward compatibility")
}
}
func TestClear(t *testing.T) {
// Store some configuration first
ctx := context.Background()
config := &Config{ServiceName: "test"}
Store(ctx, config, nil, nil, nil)
if !IsConfigured() {
t.Error("Should be configured before Clear()")
}
// Clear configuration
Clear()
if IsConfigured() {
t.Error("Should not be configured after Clear()")
}
// All getters should return nil
cfg, ctx2, factory := GetLogExporter()
if cfg != nil || ctx2 != nil || factory != nil {
t.Error("GetLogExporter() should return nil values after Clear()")
}
}
func TestConcurrentAccess(t *testing.T) {
Clear()
ctx := context.Background()
config := &Config{ServiceName: "concurrent-test"}
var wg sync.WaitGroup
const numGoroutines = 10
// Test concurrent Store and Get operations
wg.Add(numGoroutines * 2)
// Concurrent Store operations
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
Store(ctx, config, nil, nil, nil)
}()
}
// Concurrent Get operations
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
IsConfigured()
GetLogExporter()
GetMetricExporter()
GetTraceExporter()
}()
}
wg.Wait()
// Should be configured after all operations
if !IsConfigured() {
t.Error("Should be configured after concurrent operations")
}
}
func TestGetTLSConfig(t *testing.T) {
tests := []struct {
name string
config *Config
expected bool // whether TLS config should be nil
}{
{
name: "nil certificate provider",
config: &Config{},
expected: true, // should be nil
},
{
name: "with certificate provider",
config: &Config{
CertificateProvider: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return &tls.Certificate{}, nil
},
},
expected: false, // should not be nil
},
{
name: "with certificate provider and RootCAs",
config: &Config{
CertificateProvider: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
return &tls.Certificate{}, nil
},
RootCAs: x509.NewCertPool(),
},
expected: false, // should not be nil
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tlsConfig := getTLSConfig(tt.config)
if tt.expected && tlsConfig != nil {
t.Errorf("Expected nil TLS config, got %v", tlsConfig)
}
if !tt.expected && tlsConfig == nil {
t.Error("Expected non-nil TLS config, got nil")
}
if !tt.expected && tlsConfig != nil {
if tlsConfig.GetClientCertificate == nil {
t.Error("Expected GetClientCertificate to be set")
}
if tt.config.RootCAs != nil && tlsConfig.RootCAs != tt.config.RootCAs {
t.Error("Expected RootCAs to be set correctly")
}
}
})
}
}
func TestGetProtocol(t *testing.T) {
// Save original env vars
originalGeneral := os.Getenv(otelExporterOTLPProtoEnvKey)
originalLogs := os.Getenv(otelExporterOTLPLogsProtoEnvKey)
defer func() {
// Restore original env vars
if originalGeneral != "" {
os.Setenv(otelExporterOTLPProtoEnvKey, originalGeneral)
} else {
os.Unsetenv(otelExporterOTLPProtoEnvKey)
}
if originalLogs != "" {
os.Setenv(otelExporterOTLPLogsProtoEnvKey, originalLogs)
} else {
os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
}
}()
tests := []struct {
name string
signalSpecific string
generalProto string
specificProto string
expectedResult string
}{
{
name: "no env vars set - default",
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
expectedResult: "http/protobuf",
},
{
name: "general env var set",
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
generalProto: "grpc",
expectedResult: "grpc",
},
{
name: "specific env var overrides general",
signalSpecific: otelExporterOTLPLogsProtoEnvKey,
generalProto: "grpc",
specificProto: "http/protobuf",
expectedResult: "http/protobuf",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Clear env vars
os.Unsetenv(otelExporterOTLPProtoEnvKey)
os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
// Set test env vars
if tt.generalProto != "" {
os.Setenv(otelExporterOTLPProtoEnvKey, tt.generalProto)
}
if tt.specificProto != "" {
os.Setenv(tt.signalSpecific, tt.specificProto)
}
result := getProtocol(tt.signalSpecific)
if result != tt.expectedResult {
t.Errorf("Expected protocol '%s', got '%s'", tt.expectedResult, result)
}
})
}
}
func TestCreateExporterErrors(t *testing.T) {
ctx := context.Background()
config := &Config{
ServiceName: "test-service",
Endpoint: "invalid-endpoint",
}
// Test with invalid protocol for logs
os.Setenv(otelExporterOTLPLogsProtoEnvKey, "invalid-protocol")
defer os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
_, err := CreateOTLPLogExporter(ctx, config)
if err == nil {
t.Error("Expected error for invalid protocol")
}
// Check that it's a protocol error (the specific message will be different now)
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
t.Errorf("Expected protocol error, got %v", err)
}
// Test with invalid protocol for metrics
os.Setenv(otelExporterOTLPMetricsProtoEnvKey, "invalid-protocol")
defer os.Unsetenv(otelExporterOTLPMetricsProtoEnvKey)
_, err = CreateOTLPMetricExporter(ctx, config)
if err == nil {
t.Error("Expected error for invalid protocol")
}
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
t.Errorf("Expected protocol error, got %v", err)
}
// Test with invalid protocol for traces
os.Setenv(otelExporterOTLPTracesProtoEnvKey, "invalid-protocol")
defer os.Unsetenv(otelExporterOTLPTracesProtoEnvKey)
_, err = CreateOTLPTraceExporter(ctx, config)
if err == nil {
t.Error("Expected error for invalid protocol")
}
if !strings.Contains(err.Error(), "invalid OTLP protocol") {
t.Errorf("Expected protocol error, got %v", err)
}
}
func TestCreateExporterValidProtocols(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
config := &Config{
ServiceName: "test-service",
Endpoint: "localhost:4317", // This will likely fail to connect, but should create exporter
}
protocols := []string{"grpc", "http/protobuf", "http/json"}
for _, proto := range protocols {
t.Run("logs_"+proto, func(t *testing.T) {
os.Setenv(otelExporterOTLPLogsProtoEnvKey, proto)
defer os.Unsetenv(otelExporterOTLPLogsProtoEnvKey)
exporter, err := CreateOTLPLogExporter(ctx, config)
if err != nil {
// Connection errors are expected since we're not running a real OTLP server
// but the exporter should be created successfully
t.Logf("Connection error expected: %v", err)
}
if exporter != nil {
exporter.Shutdown(ctx)
}
})
t.Run("metrics_"+proto, func(t *testing.T) {
os.Setenv(otelExporterOTLPMetricsProtoEnvKey, proto)
defer os.Unsetenv(otelExporterOTLPMetricsProtoEnvKey)
exporter, err := CreateOTLPMetricExporter(ctx, config)
if err != nil {
t.Logf("Connection error expected: %v", err)
}
if exporter != nil {
exporter.Shutdown(ctx)
}
})
t.Run("traces_"+proto, func(t *testing.T) {
os.Setenv(otelExporterOTLPTracesProtoEnvKey, proto)
defer os.Unsetenv(otelExporterOTLPTracesProtoEnvKey)
exporter, err := CreateOTLPTraceExporter(ctx, config)
if err != nil {
t.Logf("Connection error expected: %v", err)
}
if exporter != nil {
exporter.Shutdown(ctx)
}
})
}
}
func TestConfigValidation(t *testing.T) {
tests := []struct {
name string
config *Config
shouldErr bool
}{
{
name: "valid empty config",
config: &Config{},
shouldErr: false,
},
{
name: "valid config with endpoint",
config: &Config{
ServiceName: "test-service",
Endpoint: "localhost:4317",
},
shouldErr: false,
},
{
name: "valid config with endpoint URL",
config: &Config{
ServiceName: "test-service",
EndpointURL: "https://otlp.example.com:4317/v1/traces",
},
shouldErr: false,
},
{
name: "invalid - both endpoint and endpoint URL",
config: &Config{
ServiceName: "test-service",
Endpoint: "localhost:4317",
EndpointURL: "https://otlp.example.com:4317/v1/traces",
},
shouldErr: true,
},
{
name: "invalid - endpoint with protocol",
config: &Config{
ServiceName: "test-service",
Endpoint: "https://localhost:4317",
},
shouldErr: true,
},
{
name: "invalid - empty endpoint",
config: &Config{
ServiceName: "test-service",
Endpoint: " ",
},
shouldErr: true,
},
{
name: "invalid - malformed endpoint URL",
config: &Config{
ServiceName: "test-service",
EndpointURL: "://invalid-url-missing-scheme",
},
shouldErr: true,
},
{
name: "invalid - empty service name",
config: &Config{
ServiceName: " ",
Endpoint: "localhost:4317",
},
shouldErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.config.Validate()
if tt.shouldErr && err == nil {
t.Error("Expected validation error, got nil")
}
if !tt.shouldErr && err != nil {
t.Errorf("Expected no validation error, got: %v", err)
}
})
}
}
func TestValidateAndStore(t *testing.T) {
Clear()
ctx := context.Background()
// Test with valid config
validConfig := &Config{
ServiceName: "test-service",
Endpoint: "localhost:4317",
}
err := ValidateAndStore(ctx, validConfig, nil, nil, nil)
if err != nil {
t.Errorf("ValidateAndStore with valid config should not error: %v", err)
}
if !IsConfigured() {
t.Error("Should be configured after ValidateAndStore")
}
Clear()
// Test with invalid config
invalidConfig := &Config{
ServiceName: "test-service",
Endpoint: "localhost:4317",
EndpointURL: "https://example.com:4317", // both specified - invalid
}
err = ValidateAndStore(ctx, invalidConfig, nil, nil, nil)
if err == nil {
t.Error("ValidateAndStore with invalid config should return error")
}
if IsConfigured() {
t.Error("Should not be configured after failed ValidateAndStore")
}
}

View File

@@ -1,3 +1,32 @@
// Package kafconn provides a Kafka client wrapper with TLS support for secure log streaming.
//
// This package handles Kafka connections with mutual TLS authentication for the NTP Pool
// project's log streaming infrastructure. It provides factories for creating Kafka readers
// and writers with automatic broker discovery, TLS configuration, and connection management.
//
// The package is designed specifically for the NTP Pool pipeline infrastructure and includes
// hardcoded bootstrap servers and group configurations. It uses certman for automatic
// certificate renewal and provides compression and batching optimizations.
//
// Key features:
// - Mutual TLS authentication with automatic certificate renewal
// - Broker discovery and connection pooling
// - Reader and writer factory methods with optimized configurations
// - LZ4 compression for efficient data transfer
// - Configurable batch sizes and load balancing
//
// Example usage:
//
// tlsSetup := kafconn.TLSSetup{
// CA: "/path/to/ca.pem",
// Cert: "/path/to/client.pem",
// Key: "/path/to/client.key",
// }
// kafka, err := kafconn.NewKafka(ctx, tlsSetup)
// if err != nil {
// log.Fatal(err)
// }
// writer, err := kafka.NewWriter("logs")
package kafconn
import (
@@ -24,12 +53,17 @@ const (
// kafkaMinBatchSize = 1000
)
// TLSSetup contains file paths for TLS certificate configuration.
// All fields are required for establishing secure Kafka connections.
type TLSSetup struct {
CA string
Key string
Cert string
CA string // Path to CA certificate file for server verification
Key string // Path to client private key file
Cert string // Path to client certificate file
}
// Kafka represents a configured Kafka client with TLS support.
// It manages connections, brokers, and provides factory methods for readers and writers.
// The client handles broker discovery, connection pooling, and TLS configuration automatically.
type Kafka struct {
tls TLSSetup
@@ -116,6 +150,19 @@ func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) {
return transport, nil
}
// NewKafka creates a new Kafka client with TLS configuration and establishes initial connections.
// It performs broker discovery, validates TLS certificates, and prepares the client for creating
// readers and writers.
//
// The function validates TLS configuration, establishes a connection to the bootstrap server,
// discovers all available brokers, and configures transport layers for optimal performance.
//
// Parameters:
// - ctx: Context for connection establishment and timeouts
// - tls: TLS configuration with paths to CA, certificate, and key files
//
// Returns a configured Kafka client ready for creating readers and writers, or an error
// if TLS setup fails, connection cannot be established, or broker discovery fails.
func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)
@@ -171,6 +218,12 @@ func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
return k, nil
}
// NewReader creates a new Kafka reader with the client's broker list and TLS configuration.
// The provided config is enhanced with the discovered brokers and configured dialer.
// The reader supports automatic offset management, consumer group coordination, and reconnection.
//
// The caller should configure the reader's Topic, GroupID, and other consumer-specific settings
// in the provided config. The client automatically sets Brokers and Dialer fields.
func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader {
config.Brokers = k.brokerAddrs()
config.Dialer = k.dialer
@@ -186,6 +239,16 @@ func (k *Kafka) brokerAddrs() []string {
return addrs
}
// NewWriter creates a new Kafka writer for the specified topic with optimized configuration.
// The writer uses LZ4 compression, least-bytes load balancing, and batching for performance.
//
// Configuration includes:
// - Batch size: 2000 messages for efficient throughput
// - Compression: LZ4 for fast compression with good ratios
// - Balancer: LeastBytes for optimal partition distribution
// - Transport: TLS-configured transport with connection pooling
//
// The writer is ready for immediate use and handles connection management automatically.
func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
w := &kafka.Writer{
@@ -202,6 +265,12 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
return w, nil
}
// CheckPartitions verifies that the Kafka connection can read partition metadata.
// This method is useful for health checks and connection validation.
//
// Returns an error if partition metadata cannot be retrieved, which typically
// indicates connection problems, authentication failures, or broker unavailability.
// Logs a warning if no partitions are available but does not return an error.
func (k *Kafka) CheckPartitions() error {
partitions, err := k.conn.ReadPartitions()
if err != nil {

View File

@@ -0,0 +1,198 @@
package logger
import (
"context"
"errors"
"fmt"
"sync"
"time"
"go.ntppool.org/common/internal/tracerconfig"
otellog "go.opentelemetry.io/otel/sdk/log"
)
// bufferingExporter wraps an OTLP exporter and buffers logs until tracing is configured
type bufferingExporter struct {
mu sync.RWMutex
// Buffered records while waiting for tracing config
buffer [][]otellog.Record
bufferSize int
maxBuffSize int
// Real exporter (created when tracing is configured)
exporter otellog.Exporter
// Thread-safe initialization
initOnce sync.Once
initErr error
// Background checker
stopChecker chan struct{}
checkerDone chan struct{}
}
// newBufferingExporter creates a new exporter that buffers logs until tracing is configured
func newBufferingExporter() *bufferingExporter {
e := &bufferingExporter{
maxBuffSize: 1000, // Max number of batches to buffer
stopChecker: make(chan struct{}),
checkerDone: make(chan struct{}),
}
// Start background readiness checker
go e.checkReadiness()
return e
}
// Export implements otellog.Exporter
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
// Try initialization once
e.initOnce.Do(func() {
e.initErr = e.initialize()
})
// If initialization succeeded, use the exporter
if e.initErr == nil {
e.mu.RLock()
exporter := e.exporter
e.mu.RUnlock()
if exporter != nil {
return exporter.Export(ctx, records)
}
}
// Not ready yet, buffer the records
return e.bufferRecords(records)
}
// initialize attempts to create the real OTLP exporter using tracing config
func (e *bufferingExporter) initialize() error {
cfg, ctx, factory := tracerconfig.Get()
if cfg == nil || ctx == nil || factory == nil {
return errors.New("tracer not configured yet")
}
// Add timeout for initialization
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
exporter, err := factory(initCtx, cfg)
if err != nil {
return fmt.Errorf("failed to create OTLP exporter: %w", err)
}
e.mu.Lock()
e.exporter = exporter
flushErr := e.flushBuffer(initCtx)
e.mu.Unlock()
if flushErr != nil {
// Log but don't fail initialization
Setup().Warn("buffer flush failed during initialization", "error", flushErr)
}
return nil
}
// bufferRecords adds records to the buffer for later processing
func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
e.mu.Lock()
defer e.mu.Unlock()
// Buffer the batch if we have space
if e.bufferSize < e.maxBuffSize {
// Clone records to avoid retention issues
cloned := make([]otellog.Record, len(records))
for i, r := range records {
cloned[i] = r.Clone()
}
e.buffer = append(e.buffer, cloned)
e.bufferSize++
}
// Always return success to BatchProcessor
return nil
}
// checkReadiness periodically checks if tracing is configured
func (e *bufferingExporter) checkReadiness() {
defer close(e.checkerDone)
ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries
defer ticker.Stop()
for {
select {
case <-ticker.C:
// If initialization failed, reset sync.Once to allow retry
// The OTLP exporter will handle its own retry logic
if e.initErr != nil {
e.initOnce = sync.Once{}
} else if e.exporter != nil {
return // Exporter ready, checker no longer needed
}
case <-e.stopChecker:
return
}
}
}
// flushBuffer sends all buffered batches through the real exporter
func (e *bufferingExporter) flushBuffer(ctx context.Context) error {
if len(e.buffer) == 0 {
return nil
}
flushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var lastErr error
for _, batch := range e.buffer {
if err := e.exporter.Export(flushCtx, batch); err != nil {
lastErr = err
}
}
// Clear buffer after flush attempt
e.buffer = nil
e.bufferSize = 0
return lastErr
}
// ForceFlush implements otellog.Exporter
func (e *bufferingExporter) ForceFlush(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
if e.exporter != nil {
return e.exporter.ForceFlush(ctx)
}
return nil
}
// Shutdown implements otellog.Exporter
func (e *bufferingExporter) Shutdown(ctx context.Context) error {
// Stop the readiness checker from continuing
close(e.stopChecker)
// Give one final chance for TLS/tracing to become ready before fully shutting down
e.initOnce.Do(func() {
e.initErr = e.initialize()
})
// Wait for readiness checker goroutine to complete
<-e.checkerDone
e.mu.Lock()
defer e.mu.Unlock()
if e.exporter != nil {
return e.exporter.Shutdown(ctx)
}
return nil
}

View File

@@ -16,23 +16,28 @@ type logfmt struct {
mu sync.Mutex
}
// createTextHandlerOptions creates the common slog.HandlerOptions used by all logfmt handlers
func createTextHandlerOptions() *slog.HandlerOptions {
return &slog.HandlerOptions{
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey && len(groups) == 0 {
return slog.Attr{}
}
if a.Key == slog.LevelKey && len(groups) == 0 {
return slog.Attr{}
}
return a
},
}
}
func newLogFmtHandler(next slog.Handler) slog.Handler {
buf := bytes.NewBuffer([]byte{})
h := &logfmt{
buf: buf,
next: next,
txt: slog.NewTextHandler(buf, &slog.HandlerOptions{
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey && len(groups) == 0 {
return slog.Attr{}
}
if a.Key == slog.LevelKey && len(groups) == 0 {
return slog.Attr{}
}
return a
},
}),
txt: slog.NewTextHandler(buf, createTextHandlerOptions()),
}
return h
@@ -43,10 +48,11 @@ func (h *logfmt) Enabled(ctx context.Context, lvl slog.Level) bool {
}
func (h *logfmt) WithAttrs(attrs []slog.Attr) slog.Handler {
buf := bytes.NewBuffer([]byte{})
return &logfmt{
buf: bytes.NewBuffer([]byte{}),
buf: buf,
next: h.next.WithAttrs(slices.Clone(attrs)),
txt: h.txt.WithAttrs(slices.Clone(attrs)),
txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithAttrs(slices.Clone(attrs)),
}
}
@@ -54,10 +60,11 @@ func (h *logfmt) WithGroup(g string) slog.Handler {
if g == "" {
return h
}
buf := bytes.NewBuffer([]byte{})
return &logfmt{
buf: bytes.NewBuffer([]byte{}),
buf: buf,
next: h.next.WithGroup(g),
txt: h.txt.WithGroup(g),
txt: slog.NewTextHandler(buf, createTextHandlerOptions()).WithGroup(g),
}
}
@@ -69,10 +76,22 @@ func (h *logfmt) Handle(ctx context.Context, r slog.Record) error {
panic("buffer wasn't empty")
}
h.txt.Handle(ctx, r)
r.Message = h.buf.String()
r.Message = strings.TrimSuffix(r.Message, "\n")
// Format using text handler to get the formatted message
err := h.txt.Handle(ctx, r)
if err != nil {
return err
}
formattedMessage := h.buf.String()
formattedMessage = strings.TrimSuffix(formattedMessage, "\n")
h.buf.Reset()
return h.next.Handle(ctx, r)
// Create a new record with the formatted message
newRecord := slog.NewRecord(r.Time, r.Level, formattedMessage, r.PC)
r.Attrs(func(a slog.Attr) bool {
newRecord.AddAttrs(a)
return true
})
return h.next.Handle(ctx, newRecord)
}

View File

@@ -1,3 +1,25 @@
// Package logger provides structured logging with OpenTelemetry trace integration.
//
// This package offers multiple logging configurations for different deployment scenarios:
// - Text logging to stderr with optional timestamp removal for systemd
// - OTLP (OpenTelemetry Protocol) logging for observability pipelines
// - Multi-logger setup that outputs to both text and OTLP simultaneously
// - Context-aware logging with trace ID correlation
//
// The package automatically detects systemd environments and adjusts timestamp handling
// accordingly. It supports debug level configuration via environment variables and
// provides compatibility bridges for legacy logging interfaces.
//
// Key features:
// - Automatic OpenTelemetry trace and span ID inclusion in log entries
// - Configurable log levels via DEBUG environment variable (with optional prefix)
// - Systemd-compatible output (no timestamps when INVOCATION_ID is present)
// - Thread-safe logger setup with sync.Once protection
// - Context propagation for request-scoped logging
//
// Environment variables:
// - DEBUG: Enable debug level logging (configurable prefix via ConfigPrefix)
// - INVOCATION_ID: Systemd detection for timestamp handling
package logger
import (
@@ -7,12 +29,18 @@ import (
"os"
"strconv"
"sync"
"time"
slogtraceid "github.com/remychantenay/slog-otel"
slogmulti "github.com/samber/slog-multi"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel/log/global"
otellog "go.opentelemetry.io/otel/sdk/log"
)
// ConfigPrefix allows customizing the environment variable prefix for configuration.
// When set, environment variables like DEBUG become {ConfigPrefix}_DEBUG.
// This enables multiple services to have independent logging configuration.
var ConfigPrefix = ""
var (
@@ -60,17 +88,41 @@ func setupStdErrHandler() slog.Handler {
func setupOtlpLogger() *slog.Logger {
setupOtlp.Do(func() {
otlpLogger = slog.New(
newLogFmtHandler(otelslog.NewHandler("common")),
// Create our buffering exporter
// It will buffer until tracing is configured
bufferingExp := newBufferingExporter()
// Use BatchProcessor with our custom exporter
processor := otellog.NewBatchProcessor(bufferingExp,
otellog.WithExportInterval(10*time.Second),
otellog.WithMaxQueueSize(2048),
otellog.WithExportMaxBatchSize(512),
)
// Create logger provider
provider := otellog.NewLoggerProvider(
otellog.WithProcessor(processor),
)
// Set global provider
global.SetLoggerProvider(provider)
// Create slog handler
handler := newLogFmtHandler(otelslog.NewHandler("common"))
otlpLogger = slog.New(handler)
})
return otlpLogger
}
// SetupMultiLogger will setup and make default a logger that
// logs as described in Setup() as well as an OLTP logger.
// The "multi logger" is made the default the first time
// this function is called
// SetupMultiLogger creates a logger that outputs to both text (stderr) and OTLP simultaneously.
// This is useful for services that need both human-readable logs and structured observability data.
//
// The multi-logger combines:
// - Text handler: Stderr output with OpenTelemetry trace integration
// - OTLP handler: Structured logs sent via OpenTelemetry Protocol
//
// On first call, this logger becomes the default logger returned by Setup().
// The function is thread-safe and uses sync.Once to ensure single initialization.
func SetupMultiLogger() *slog.Logger {
setupMulti.Do(func() {
textHandler := Setup().Handler()
@@ -89,28 +141,38 @@ func SetupMultiLogger() *slog.Logger {
return multiLogger
}
// SetupOLTP configures and returns a logger sending logs
// via OpenTelemetry (configured via the tracing package).
// SetupOLTP creates a logger that sends structured logs via OpenTelemetry Protocol.
// This logger is designed for observability pipelines and log aggregation systems.
//
// This was made to work with Loki + Grafana that makes it
// hard to view the log attributes in the UI, so the log
// message is formatted similarly to the text logger. The
// attributes are duplicated as OLTP attributes in the
// log messages. https://github.com/grafana/loki/issues/14788
// The OTLP logger formats log messages similarly to the text logger for better
// compatibility with Loki + Grafana, while still providing structured attributes.
// Log attributes are available both in the message format and as OTLP attributes.
//
// This logger does not become the default logger and must be used explicitly.
// It requires OpenTelemetry tracing configuration to be set up via the tracing package.
//
// See: https://github.com/grafana/loki/issues/14788 for formatting rationale.
func SetupOLTP() *slog.Logger {
return setupOtlpLogger()
}
// Setup returns an slog.Logger configured for text formatting
// to stderr.
// OpenTelemetry trace_id and span_id's are logged as attributes
// when available.
// When the application is running under systemd timestamps are
// omitted. On first call the slog default logger is set to this
// logger as well.
// Setup creates and returns the standard text logger for the application.
// This is the primary logging function that most applications should use.
//
// If SetupMultiLogger has been called Setup() will return
// the "multi logger"
// Features:
// - Text formatting to stderr with human-readable output
// - Automatic OpenTelemetry trace_id and span_id inclusion when available
// - Systemd compatibility: omits timestamps when INVOCATION_ID environment variable is present
// - Debug level support via DEBUG environment variable (respects ConfigPrefix)
// - Thread-safe initialization with sync.Once
//
// On first call, this logger becomes the slog default logger. If SetupMultiLogger()
// has been called previously, Setup() returns the multi-logger instead of the text logger.
//
// The logger automatically detects execution context:
// - Systemd: Removes timestamps (systemd adds its own)
// - Debug mode: Enables debug level logging based on environment variables
// - OpenTelemetry: Includes trace correlation when tracing is active
func Setup() *slog.Logger {
setupText.Do(func() {
h := setupStdErrHandler()
@@ -129,15 +191,33 @@ func Setup() *slog.Logger {
type loggerKey struct{}
// NewContext adds the logger to the context. Use this
// to for example make a request specific logger available
// to other functions through the context
// NewContext stores a logger in the context for request-scoped logging.
// This enables passing request-specific loggers (e.g., with request IDs,
// user context, or other correlation data) through the call stack.
//
// Use this to create context-aware logging where different parts of the
// application can access the same enriched logger instance.
//
// Example:
//
// logger := slog.With("request_id", requestID)
// ctx := logger.NewContext(ctx, logger)
// // Pass ctx to downstream functions
func NewContext(ctx context.Context, l *slog.Logger) context.Context {
return context.WithValue(ctx, loggerKey{}, l)
}
// FromContext retrieves a logger from the context. If there is none,
// it returns the default logger
// FromContext retrieves a logger from the context.
// If no logger is stored in the context, it returns the default logger from Setup().
//
// This function provides a safe way to access context-scoped loggers without
// needing to check for nil values. It ensures that logging is always available,
// falling back to the application's standard logger configuration.
//
// Example:
//
// log := logger.FromContext(ctx)
// log.Info("processing request") // Uses context logger or default
func FromContext(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(loggerKey{}).(*slog.Logger); ok {
return l

View File

@@ -5,12 +5,24 @@ import (
"log/slog"
)
// stdLoggerish provides a bridge between legacy log interfaces and slog.
// It implements common logging methods (Println, Printf, Fatalf) that
// delegate to structured logging with a consistent key prefix.
type stdLoggerish struct {
key string
log *slog.Logger
f func(string, ...any)
key string // Prefix key for all log messages
log *slog.Logger // Underlying structured logger
f func(string, ...any) // Log function (Info or Debug level)
}
// NewStdLog creates a legacy-compatible logger that bridges to structured logging.
// This is useful for third-party libraries that expect a standard log.Logger interface.
//
// Parameters:
// - key: Prefix added to all log messages for identification
// - debug: If true, logs at debug level; otherwise logs at info level
// - log: Underlying slog.Logger (uses Setup() if nil)
//
// The returned logger implements Println, Printf, and Fatalf methods.
func NewStdLog(key string, debug bool, log *slog.Logger) *stdLoggerish {
if log == nil {
log = Setup()
@@ -27,14 +39,18 @@ func NewStdLog(key string, debug bool, log *slog.Logger) *stdLoggerish {
return sl
}
// Println logs the arguments using the configured log level with the instance key.
func (l stdLoggerish) Println(msg ...any) {
l.f(l.key, "msg", msg)
}
// Printf logs a formatted message using the configured log level with the instance key.
func (l stdLoggerish) Printf(msg string, args ...any) {
l.f(l.key, "msg", fmt.Sprintf(msg, args...))
}
// Fatalf logs a formatted error message and panics.
// Note: This implementation panics instead of calling os.Exit for testability.
func (l stdLoggerish) Fatalf(msg string, args ...any) {
l.log.Error(l.key, "msg", fmt.Sprintf(msg, args...))
panic("fatal error") // todo: does this make sense at all?

View File

@@ -1,17 +0,0 @@
package logger
type Error struct {
Msg string
Data []any
}
func NewError(msg string, data ...any) *Error {
return &Error{
Msg: msg,
Data: data,
}
}
func (e *Error) Error() string {
return "not implemented"
}

122
metrics/metrics.go Normal file
View File

@@ -0,0 +1,122 @@
// Package metrics provides OpenTelemetry-native metrics with OTLP export support.
//
// This package implements a metrics system using the OpenTelemetry metrics data model
// with OTLP export capabilities. It's designed for new applications that want to use
// structured metrics export to observability backends.
//
// Key features:
// - OpenTelemetry native metric types (Counter, Histogram, Gauge, etc.)
// - OTLP export for sending metrics to observability backends
// - Resource detection and correlation with traces/logs
// - Graceful handling when OTLP configuration is not available
//
// Example usage:
//
// // Initialize metrics along with tracing
// shutdown, err := tracing.InitTracer(ctx, cfg)
// if err != nil {
// log.Fatal(err)
// }
// defer shutdown(ctx)
//
// // Get a meter and create instruments
// meter := metrics.GetMeter("my-service")
// counter, _ := meter.Int64Counter("requests_total")
// counter.Add(ctx, 1, metric.WithAttributes(attribute.String("method", "GET")))
package metrics
import (
"context"
"log/slog"
"sync"
"time"
"go.ntppool.org/common/internal/tracerconfig"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
)
var (
meterProvider metric.MeterProvider
setupOnce sync.Once
setupErr error
)
// Setup initializes the OpenTelemetry metrics provider with OTLP export.
// This function uses the configuration stored by the tracing package and
// creates a metrics provider that exports to the same OTLP endpoint.
//
// The function is safe to call multiple times - it will only initialize once.
// If tracing configuration is not available, it returns a no-op provider that
// doesn't export metrics.
//
// Returns an error only if there's a configuration problem. Missing tracing
// configuration is handled gracefully with a warning log.
func Setup(ctx context.Context) error {
setupOnce.Do(func() {
setupErr = initializeMetrics(ctx)
})
return setupErr
}
// GetMeter returns a named meter for creating metric instruments.
// The meter uses the configured metrics provider, or the global provider
// if metrics haven't been set up yet.
//
// This is the primary entry point for creating metric instruments in your application.
func GetMeter(name string, opts ...metric.MeterOption) metric.Meter {
if meterProvider == nil {
// Return the global provider as fallback (no-op if not configured)
return otel.GetMeterProvider().Meter(name, opts...)
}
return meterProvider.Meter(name, opts...)
}
// initializeMetrics sets up the OpenTelemetry metrics provider with OTLP export.
func initializeMetrics(ctx context.Context) error {
log := slog.Default()
// Check if tracing configuration is available
cfg, configCtx, factory := tracerconfig.GetMetricExporter()
if cfg == nil || configCtx == nil || factory == nil {
log.Warn("metrics setup: tracing configuration not available, using no-op provider")
// Set the global provider as fallback - metrics just won't be exported
meterProvider = otel.GetMeterProvider()
return nil
}
// Create OTLP metrics exporter
exporter, err := factory(ctx, cfg)
if err != nil {
log.Error("metrics setup: failed to create OTLP exporter", "error", err)
// Fall back to global provider
meterProvider = otel.GetMeterProvider()
return nil
}
// Create metrics provider with the exporter
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(
exporter,
sdkmetric.WithInterval(15*time.Second),
)),
)
// Set the global provider
otel.SetMeterProvider(provider)
meterProvider = provider
log.Info("metrics setup: OTLP metrics provider initialized")
return nil
}
// Shutdown gracefully shuts down the metrics provider.
// This should be called during application shutdown to ensure all metrics
// are properly flushed and exported.
func Shutdown(ctx context.Context) error {
if provider, ok := meterProvider.(*sdkmetric.MeterProvider); ok {
return provider.Shutdown(ctx)
}
return nil
}

296
metrics/metrics_test.go Normal file
View File

@@ -0,0 +1,296 @@
package metrics
import (
"context"
"os"
"testing"
"time"
"go.ntppool.org/common/internal/tracerconfig"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
func TestSetup_NoConfiguration(t *testing.T) {
// Clear any existing configuration
tracerconfig.Clear()
ctx := context.Background()
err := Setup(ctx)
// Should not return an error even when no configuration is available
if err != nil {
t.Errorf("Setup() returned unexpected error: %v", err)
}
// Should be able to get a meter (even if it's a no-op)
meter := GetMeter("test-meter")
if meter == nil {
t.Error("GetMeter() returned nil")
}
}
func TestGetMeter(t *testing.T) {
// Clear any existing configuration
tracerconfig.Clear()
ctx := context.Background()
_ = Setup(ctx)
meter := GetMeter("test-service")
if meter == nil {
t.Fatal("GetMeter() returned nil")
}
// Test creating a counter instrument
counter, err := meter.Int64Counter("test_counter")
if err != nil {
t.Errorf("Failed to create counter: %v", err)
}
// Test using the counter (should not error even with no-op provider)
counter.Add(ctx, 1, metric.WithAttributes(attribute.String("test", "value")))
}
func TestSetup_MultipleCallsSafe(t *testing.T) {
// Clear any existing configuration
tracerconfig.Clear()
ctx := context.Background()
// Call Setup multiple times
err1 := Setup(ctx)
err2 := Setup(ctx)
err3 := Setup(ctx)
if err1 != nil {
t.Errorf("First Setup() call returned error: %v", err1)
}
if err2 != nil {
t.Errorf("Second Setup() call returned error: %v", err2)
}
if err3 != nil {
t.Errorf("Third Setup() call returned error: %v", err3)
}
// Should still be able to get meters
meter := GetMeter("test-meter")
if meter == nil {
t.Error("GetMeter() returned nil after multiple Setup() calls")
}
}
func TestSetup_WithConfiguration(t *testing.T) {
// Clear any existing configuration
tracerconfig.Clear()
ctx := context.Background()
config := &tracerconfig.Config{
ServiceName: "test-metrics-service",
Environment: "test",
Endpoint: "localhost:4317", // Will likely fail to connect, but should set up provider
}
// Create a mock exporter factory that returns a working exporter
mockFactory := func(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
// Create a simple in-memory exporter for testing
return &mockMetricExporter{}, nil
}
// Store configuration with mock factory
tracerconfig.Store(ctx, config, nil, mockFactory, nil)
// Setup metrics
err := Setup(ctx)
if err != nil {
t.Errorf("Setup() returned error: %v", err)
}
// Should be able to get a meter
meter := GetMeter("test-service")
if meter == nil {
t.Fatal("GetMeter() returned nil")
}
// Test creating and using instruments
counter, err := meter.Int64Counter("test_counter")
if err != nil {
t.Errorf("Failed to create counter: %v", err)
}
histogram, err := meter.Float64Histogram("test_histogram")
if err != nil {
t.Errorf("Failed to create histogram: %v", err)
}
gauge, err := meter.Int64UpDownCounter("test_gauge")
if err != nil {
t.Errorf("Failed to create gauge: %v", err)
}
// Use the instruments
counter.Add(ctx, 1, metric.WithAttributes(attribute.String("test", "value")))
histogram.Record(ctx, 1.5, metric.WithAttributes(attribute.String("test", "value")))
gauge.Add(ctx, 10, metric.WithAttributes(attribute.String("test", "value")))
// Test shutdown
err = Shutdown(ctx)
if err != nil {
t.Errorf("Shutdown() returned error: %v", err)
}
}
func TestSetup_WithRealOTLPConfig(t *testing.T) {
// Skip this test in short mode since it may try to make network connections
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Clear any existing configuration
tracerconfig.Clear()
// Set environment variables for OTLP configuration
originalEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
originalProtocol := os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL")
defer func() {
if originalEndpoint != "" {
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", originalEndpoint)
} else {
os.Unsetenv("OTEL_EXPORTER_OTLP_ENDPOINT")
}
if originalProtocol != "" {
os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", originalProtocol)
} else {
os.Unsetenv("OTEL_EXPORTER_OTLP_PROTOCOL")
}
}()
os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") // HTTP endpoint
os.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
config := &tracerconfig.Config{
ServiceName: "test-metrics-e2e",
Environment: "test",
Endpoint: "localhost:4318",
}
// Store configuration with real factory
tracerconfig.Store(ctx, config, nil, tracerconfig.CreateOTLPMetricExporter, nil)
// Setup metrics - this may fail if no OTLP collector is running, which is okay
err := Setup(ctx)
if err != nil {
t.Logf("Setup() returned error (expected if no OTLP collector): %v", err)
}
// Should still be able to get a meter
meter := GetMeter("test-service-e2e")
if meter == nil {
t.Fatal("GetMeter() returned nil")
}
// Create and use instruments
counter, err := meter.Int64Counter("e2e_test_counter")
if err != nil {
t.Errorf("Failed to create counter: %v", err)
}
// Add some metrics
for i := 0; i < 5; i++ {
counter.Add(ctx, 1, metric.WithAttributes(
attribute.String("iteration", string(rune('0'+i))),
attribute.String("test_type", "e2e"),
))
}
// Give some time for export (if collector is running)
time.Sleep(100 * time.Millisecond)
// Test shutdown
err = Shutdown(ctx)
if err != nil {
t.Logf("Shutdown() returned error (may be expected): %v", err)
}
}
func TestConcurrentMetricUsage(t *testing.T) {
// Clear any existing configuration
tracerconfig.Clear()
ctx := context.Background()
config := &tracerconfig.Config{
ServiceName: "concurrent-test",
}
// Use mock factory
mockFactory := func(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
return &mockMetricExporter{}, nil
}
tracerconfig.Store(ctx, config, nil, mockFactory, nil)
Setup(ctx)
meter := GetMeter("concurrent-test")
counter, err := meter.Int64Counter("concurrent_counter")
if err != nil {
t.Fatalf("Failed to create counter: %v", err)
}
// Test concurrent metric usage
const numGoroutines = 10
const metricsPerGoroutine = 100
done := make(chan bool, numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
for j := 0; j < metricsPerGoroutine; j++ {
counter.Add(ctx, 1, metric.WithAttributes(
attribute.Int("goroutine", goroutineID),
attribute.Int("iteration", j),
))
}
done <- true
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
<-done
}
// Shutdown
err = Shutdown(ctx)
if err != nil {
t.Errorf("Shutdown() returned error: %v", err)
}
}
// mockMetricExporter is a simple mock exporter for testing
type mockMetricExporter struct{}
func (m *mockMetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
// Just pretend to export
return nil
}
func (m *mockMetricExporter) ForceFlush(ctx context.Context) error {
return nil
}
func (m *mockMetricExporter) Shutdown(ctx context.Context) error {
return nil
}
func (m *mockMetricExporter) Temporality(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
func (m *mockMetricExporter) Aggregation(kind sdkmetric.InstrumentKind) sdkmetric.Aggregation {
return sdkmetric.DefaultAggregationSelector(kind)
}

View File

@@ -15,11 +15,11 @@ import (
func TestNew(t *testing.T) {
metrics := New()
if metrics == nil {
t.Fatal("New returned nil")
}
if metrics.r == nil {
t.Error("metrics registry is nil")
}
@@ -28,32 +28,32 @@ func TestNew(t *testing.T) {
func TestRegistry(t *testing.T) {
metrics := New()
registry := metrics.Registry()
if registry == nil {
t.Fatal("Registry() returned nil")
}
if registry != metrics.r {
t.Error("Registry() did not return the metrics registry")
}
// Test that we can register a metric
counter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_counter",
Help: "A test counter",
})
err := registry.Register(counter)
if err != nil {
t.Errorf("failed to register metric: %v", err)
}
// Test that the metric is registered
metricFamilies, err := registry.Gather()
if err != nil {
t.Errorf("failed to gather metrics: %v", err)
}
found := false
for _, mf := range metricFamilies {
if mf.GetName() == "test_counter" {
@@ -61,7 +61,7 @@ func TestRegistry(t *testing.T) {
break
}
}
if !found {
t.Error("registered metric not found in registry")
}
@@ -69,7 +69,7 @@ func TestRegistry(t *testing.T) {
func TestHandler(t *testing.T) {
metrics := New()
// Register a test metric
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -80,40 +80,40 @@ func TestHandler(t *testing.T) {
)
metrics.Registry().MustRegister(counter)
counter.WithLabelValues("GET").Inc()
// Test the handler
handler := metrics.Handler()
if handler == nil {
t.Fatal("Handler() returned nil")
}
// Create a test request
req := httptest.NewRequest("GET", "/metrics", nil)
recorder := httptest.NewRecorder()
// Call the handler
handler.ServeHTTP(recorder, req)
// Check response
resp := recorder.Result()
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status 200, got %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read response body: %v", err)
}
bodyStr := string(body)
// Check for our test metric
if !strings.Contains(bodyStr, "test_requests_total") {
t.Error("test metric not found in metrics output")
}
// Check for OpenMetrics format indicators
if !strings.Contains(bodyStr, "# TYPE") {
t.Error("metrics output missing TYPE comments")
@@ -122,7 +122,7 @@ func TestHandler(t *testing.T) {
func TestListenAndServe(t *testing.T) {
metrics := New()
// Register a test metric
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -133,46 +133,46 @@ func TestListenAndServe(t *testing.T) {
)
metrics.Registry().MustRegister(counter)
counter.WithLabelValues("GET").Inc()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start server in a goroutine
errCh := make(chan error, 1)
go func() {
// Use a high port number to avoid conflicts
errCh <- metrics.ListenAndServe(ctx, 9999)
}()
// Give the server a moment to start
time.Sleep(100 * time.Millisecond)
// Test metrics endpoint
resp, err := http.Get("http://localhost:9999/metrics")
if err != nil {
t.Fatalf("failed to GET /metrics: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status 200, got %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read response body: %v", err)
}
bodyStr := string(body)
// Check for our test metric
if !strings.Contains(bodyStr, "test_requests_total") {
t.Error("test metric not found in metrics output")
}
// Cancel context to stop server
cancel()
// Wait for server to stop
select {
case err := <-errCh:
@@ -186,21 +186,21 @@ func TestListenAndServe(t *testing.T) {
func TestListenAndServeContextCancellation(t *testing.T) {
metrics := New()
ctx, cancel := context.WithCancel(context.Background())
// Start server
errCh := make(chan error, 1)
go func() {
errCh <- metrics.ListenAndServe(ctx, 9998)
}()
// Give server time to start
time.Sleep(100 * time.Millisecond)
// Cancel context
cancel()
// Server should stop gracefully
select {
case err := <-errCh:
@@ -215,7 +215,7 @@ func TestListenAndServeContextCancellation(t *testing.T) {
// Benchmark the metrics handler response time
func BenchmarkMetricsHandler(b *testing.B) {
metrics := New()
// Register some test metrics
for i := 0; i < 10; i++ {
counter := prometheus.NewCounter(prometheus.CounterOpts{
@@ -225,18 +225,18 @@ func BenchmarkMetricsHandler(b *testing.B) {
metrics.Registry().MustRegister(counter)
counter.Add(float64(i * 100))
}
handler := metrics.Handler()
b.ResetTimer()
for i := 0; i < b.N; i++ {
req := httptest.NewRequest("GET", "/metrics", nil)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
if recorder.Code != http.StatusOK {
b.Fatalf("unexpected status code: %d", recorder.Code)
}
}
}
}

View File

@@ -15,7 +15,7 @@ mkdir -p $DIR
BASE=https://geodns.bitnames.com/${BASE}/builds/${BUILD}
files=`curl -sSf ${BASE}/checksums.txt | awk '{print $2}'`
files=`curl -sSf ${BASE}/checksums.txt | sed 's/^[a-f0-9]*[[:space:]]*//'`
metafiles="checksums.txt metadata.json CHANGELOG.md artifacts.json"
for f in $metafiles; do

View File

@@ -2,7 +2,7 @@
set -euo pipefail
go install github.com/goreleaser/goreleaser/v2@v2.8.2
go install github.com/goreleaser/goreleaser/v2@v2.11.0
if [ ! -z "${harbor_username:-}" ]; then
DOCKER_FILE=~/.docker/config.json

View File

@@ -1,3 +1,36 @@
// Package tracing provides OpenTelemetry distributed tracing setup with OTLP export support.
//
// This package handles the complete OpenTelemetry SDK initialization including:
// - Trace provider configuration with batching and resource detection
// - Log provider setup for structured log export via OTLP
// - Automatic resource discovery (service name, version, host, container, process info)
// - Support for both gRPC and HTTP OTLP exporters with TLS configuration
// - Propagation context setup for distributed tracing across services
// - Graceful shutdown handling for all telemetry components
//
// The package supports various deployment scenarios:
// - Development: Local OTLP collectors or observability backends
// - Production: Secure OTLP export with mutual TLS authentication
// - Container environments: Automatic container and Kubernetes resource detection
//
// Configuration is primarily handled via standard OpenTelemetry environment variables:
// - OTEL_SERVICE_NAME: Service identification
// - OTEL_EXPORTER_OTLP_PROTOCOL: Protocol selection (grpc, http/protobuf)
// - OTEL_TRACES_EXPORTER: Exporter type (otlp, autoexport)
// - OTEL_RESOURCE_ATTRIBUTES: Additional resource attributes
//
// Example usage:
//
// cfg := &tracing.TracerConfig{
// ServiceName: "my-service",
// Environment: "production",
// Endpoint: "https://otlp.example.com:4317",
// }
// shutdown, err := tracing.InitTracer(ctx, cfg)
// if err != nil {
// log.Fatal(err)
// }
// defer shutdown(ctx)
package tracing
// todo, review:
@@ -5,26 +38,23 @@ package tracing
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"log/slog"
"os"
"slices"
"time"
"go.ntppool.org/common/logger"
"go.ntppool.org/common/internal/tracerconfig"
"go.ntppool.org/common/version"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/contrib/exporters/autoexport"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
logglobal "go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
@@ -34,49 +64,106 @@ import (
const (
// svcNameKey is the environment variable name that Service Name information will be read from.
svcNameKey = "OTEL_SERVICE_NAME"
otelExporterOTLPProtoEnvKey = "OTEL_EXPORTER_OTLP_PROTOCOL"
otelExporterOTLPTracesProtoEnvKey = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"
)
var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one of ['grpc', 'http/protobuf']")
// createOTLPLogExporter creates an OTLP log exporter using the provided configuration.
// This function is used as the LogExporterFactory for the tracerconfig bridge.
func createOTLPLogExporter(ctx context.Context, cfg *tracerconfig.Config) (sdklog.Exporter, error) {
return tracerconfig.CreateOTLPLogExporter(ctx, cfg)
}
// createOTLPMetricExporter creates an OTLP metric exporter using the provided configuration.
// This function is used as the MetricExporterFactory for the tracerconfig bridge.
func createOTLPMetricExporter(ctx context.Context, cfg *tracerconfig.Config) (sdkmetric.Exporter, error) {
return tracerconfig.CreateOTLPMetricExporter(ctx, cfg)
}
// createOTLPTraceExporter creates an OTLP trace exporter using the provided configuration.
// This function is used as the TraceExporterFactory for the tracerconfig bridge.
func createOTLPTraceExporter(ctx context.Context, cfg *tracerconfig.Config) (sdktrace.SpanExporter, error) {
return tracerconfig.CreateOTLPTraceExporter(ctx, cfg)
}
// https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go
// TpShutdownFunc represents a function that gracefully shuts down telemetry providers.
// It should be called during application shutdown to ensure all telemetry data is flushed
// and exporters are properly closed. The context can be used to set shutdown timeouts.
type TpShutdownFunc func(ctx context.Context) error
// Tracer returns the configured OpenTelemetry tracer for the NTP Pool project.
// This tracer should be used for creating spans and distributed tracing throughout
// the application. It uses the global tracer provider set up by InitTracer/SetupSDK.
func Tracer() trace.Tracer {
traceProvider := otel.GetTracerProvider()
return traceProvider.Tracer("ntppool-tracer")
}
// Start creates a new span with the given name and options using the configured tracer.
// This is a convenience function that wraps the standard OpenTelemetry span creation.
// It returns a new context containing the span and the span itself for further configuration.
//
// The returned context should be used for downstream operations to maintain trace correlation.
func Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return Tracer().Start(ctx, spanName, opts...)
}
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
// GetClientCertificate is an alias for the type defined in tracerconfig.
// This maintains backward compatibility for existing code.
type GetClientCertificate = tracerconfig.GetClientCertificate
// TracerConfig provides configuration options for OpenTelemetry tracing setup.
// It supplements standard OpenTelemetry environment variables with additional
// NTP Pool-specific configuration including TLS settings for secure OTLP export.
type TracerConfig struct {
ServiceName string
Environment string
Endpoint string
EndpointURL string
ServiceName string // Service name for resource identification (overrides OTEL_SERVICE_NAME)
Environment string // Deployment environment (development, staging, production)
Endpoint string // OTLP endpoint hostname/port (e.g., "otlp.example.com:4317")
EndpointURL string // Complete OTLP endpoint URL (e.g., "https://otlp.example.com:4317/v1/traces")
CertificateProvider GetClientCertificate
RootCAs *x509.CertPool
CertificateProvider GetClientCertificate // Client certificate provider for mutual TLS
RootCAs *x509.CertPool // CA certificate pool for server verification
}
// InitTracer initializes the OpenTelemetry SDK with the provided configuration.
// This is the main entry point for setting up distributed tracing in applications.
//
// The function configures trace and log providers, sets up OTLP exporters,
// and returns a shutdown function that must be called during application termination.
//
// Returns a shutdown function and an error. The shutdown function should be called
// with a context that has an appropriate timeout for graceful shutdown.
func InitTracer(ctx context.Context, cfg *TracerConfig) (TpShutdownFunc, error) {
// todo: setup environment from cfg
return SetupSDK(ctx, cfg)
}
// SetupSDK performs the complete OpenTelemetry SDK initialization including resource
// discovery, exporter configuration, provider setup, and shutdown function creation.
//
// The function automatically discovers system resources (service info, host, container,
// process details) and configures both trace and log exporters. It supports multiple
// OTLP protocols (gRPC, HTTP) and handles TLS configuration for secure deployments.
//
// The returned shutdown function coordinates graceful shutdown of all telemetry
// components in the reverse order of their initialization.
func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, err error) {
if cfg == nil {
cfg = &TracerConfig{}
}
log := logger.Setup()
// Store configuration for use by logger and metrics packages via bridge
bridgeConfig := &tracerconfig.Config{
ServiceName: cfg.ServiceName,
Environment: cfg.Environment,
Endpoint: cfg.Endpoint,
EndpointURL: cfg.EndpointURL,
CertificateProvider: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
tracerconfig.Store(ctx, bridgeConfig, createOTLPLogExporter, createOTLPMetricExporter, createOTLPTraceExporter)
log := slog.Default()
if serviceName := os.Getenv(svcNameKey); len(serviceName) == 0 {
if len(cfg.ServiceName) > 0 {
@@ -117,13 +204,21 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
var shutdownFuncs []func(context.Context) error
shutdown = func(ctx context.Context) error {
// Force flush the global logger provider before shutting down anything else
if loggerProvider := global.GetLoggerProvider(); loggerProvider != nil {
if sdkProvider, ok := loggerProvider.(*sdklog.LoggerProvider); ok {
if flushErr := sdkProvider.ForceFlush(ctx); flushErr != nil {
log.Warn("logger provider force flush failed", "err", flushErr)
}
}
}
var err error
// need to shutdown the providers first,
// exporters after which is the opposite
// order they are setup.
slices.Reverse(shutdownFuncs)
for _, fn := range shutdownFuncs {
// log.Warn("shutting down", "fn", fn)
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
@@ -145,9 +240,9 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
switch os.Getenv("OTEL_TRACES_EXPORTER") {
case "":
spanExporter, err = newOLTPExporter(ctx, cfg)
spanExporter, err = createOTLPTraceExporter(ctx, bridgeConfig)
case "otlp":
spanExporter, err = newOLTPExporter(ctx, cfg)
spanExporter, err = createOTLPTraceExporter(ctx, bridgeConfig)
default:
// log.Debug("OTEL_TRACES_EXPORTER", "fallback", os.Getenv("OTEL_TRACES_EXPORTER"))
spanExporter, err = autoexport.NewSpanExporter(ctx)
@@ -158,13 +253,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
}
shutdownFuncs = append(shutdownFuncs, spanExporter.Shutdown)
logExporter, err := autoexport.NewLogExporter(ctx)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, logExporter.Shutdown)
// Set up trace provider.
tracerProvider, err := newTraceProvider(spanExporter, res)
if err != nil {
@@ -174,19 +262,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)
logProvider := sdklog.NewLoggerProvider(sdklog.WithResource(res),
sdklog.WithProcessor(
sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(10)),
),
)
logglobal.SetLoggerProvider(logProvider)
shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error {
logProvider.ForceFlush(ctx)
return logProvider.Shutdown(ctx)
},
)
if err != nil {
handleErr(err)
return
@@ -195,74 +270,6 @@ func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc,
return
}
func newOLTPExporter(ctx context.Context, cfg *TracerConfig) (sdktrace.SpanExporter, error) {
log := logger.Setup()
var tlsConfig *tls.Config
if cfg.CertificateProvider != nil {
tlsConfig = &tls.Config{
GetClientCertificate: cfg.CertificateProvider,
RootCAs: cfg.RootCAs,
}
}
proto := os.Getenv(otelExporterOTLPTracesProtoEnvKey)
if proto == "" {
proto = os.Getenv(otelExporterOTLPProtoEnvKey)
}
// Fallback to default, http/protobuf.
if proto == "" {
proto = "http/protobuf"
}
var client otlptrace.Client
switch proto {
case "grpc":
opts := []otlptracegrpc.Option{
otlptracegrpc.WithCompressor("gzip"),
}
if tlsConfig != nil {
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
if len(cfg.Endpoint) > 0 {
log.Info("adding option", "Endpoint", cfg.Endpoint)
opts = append(opts, otlptracegrpc.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
log.Info("adding option", "EndpointURL", cfg.EndpointURL)
opts = append(opts, otlptracegrpc.WithEndpointURL(cfg.EndpointURL))
}
client = otlptracegrpc.NewClient(opts...)
case "http/protobuf", "http/json":
opts := []otlptracehttp.Option{
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
}
if tlsConfig != nil {
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
}
if len(cfg.Endpoint) > 0 {
opts = append(opts, otlptracehttp.WithEndpoint(cfg.Endpoint))
}
if len(cfg.EndpointURL) > 0 {
opts = append(opts, otlptracehttp.WithEndpointURL(cfg.EndpointURL))
}
client = otlptracehttp.NewClient(opts...)
default:
return nil, errInvalidOTLPProtocol
}
exporter, err := otlptrace.New(ctx, client)
if err != nil {
log.ErrorContext(ctx, "creating OTLP trace exporter", "err", err)
}
return exporter, err
}
func newTraceProvider(traceExporter sdktrace.SpanExporter, res *resource.Resource) (*sdktrace.TracerProvider, error) {
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),

View File

@@ -1,3 +1,17 @@
// Package types provides shared data structures for the NTP Pool project.
//
// This package contains common types used across different NTP Pool services
// for data exchange, logging, and database operations. The types are designed
// to support JSON serialization for API responses and SQL database storage
// with automatic marshaling/unmarshaling.
//
// Current types include:
// - LogScoreAttributes: NTP server scoring metadata for monitoring and analysis
//
// All types implement appropriate interfaces for:
// - JSON serialization (json.Marshaler/json.Unmarshaler)
// - SQL database storage (database/sql/driver.Valuer/sql.Scanner)
// - String representation for logging and debugging
package types
import (
@@ -6,17 +20,26 @@ import (
"errors"
)
// LogScoreAttributes contains metadata about NTP server scoring and monitoring results.
// This structure captures both NTP protocol-specific information (leap, stratum) and
// operational data (errors, warnings, response status) for analysis and alerting.
//
// The type supports JSON serialization for API responses and database storage
// via the database/sql/driver interfaces. Fields use omitempty tags to minimize
// JSON payload size when values are at their zero state.
type LogScoreAttributes struct {
Leap int8 `json:"leap,omitempty"`
Stratum int8 `json:"stratum,omitempty"`
NoResponse bool `json:"no_response,omitempty"`
Error string `json:"error,omitempty"`
Warning string `json:"warning,omitempty"`
Leap int8 `json:"leap,omitempty"` // NTP leap indicator (0=no warning, 1=+1s, 2=-1s, 3=unsynchronized)
Stratum int8 `json:"stratum,omitempty"` // NTP stratum level (1=primary, 2-15=secondary, 16=unsynchronized)
NoResponse bool `json:"no_response,omitempty"` // True if server failed to respond to NTP queries
Error string `json:"error,omitempty"` // Error message if scoring failed
Warning string `json:"warning,omitempty"` // Warning message for non-fatal issues
FromLSID int `json:"from_ls_id,omitempty"`
FromSSID int `json:"from_ss_id,omitempty"`
FromLSID int `json:"from_ls_id,omitempty"` // Source log server ID for traceability
FromSSID int `json:"from_ss_id,omitempty"` // Source scoring system ID for traceability
}
// String returns a JSON representation of the LogScoreAttributes for logging and debugging.
// Returns an empty string if JSON marshaling fails.
func (lsa *LogScoreAttributes) String() string {
b, err := json.Marshal(lsa)
if err != nil {
@@ -25,10 +48,17 @@ func (lsa *LogScoreAttributes) String() string {
return string(b)
}
// Value implements the database/sql/driver.Valuer interface for database storage.
// It serializes the LogScoreAttributes to JSON for storage in SQL databases.
// Returns the JSON bytes or an error if marshaling fails.
func (lsa *LogScoreAttributes) Value() (driver.Value, error) {
return json.Marshal(lsa)
}
// Scan implements the database/sql.Scanner interface for reading from SQL databases.
// It deserializes JSON data from the database back into LogScoreAttributes.
// Supports both []byte and string input types, with nil values treated as no-op.
// Returns an error if the input type is unsupported or JSON unmarshaling fails.
func (lsa *LogScoreAttributes) Scan(value any) error {
var source []byte
_t := LogScoreAttributes{}

View File

@@ -10,8 +10,18 @@
// -X go.ntppool.org/common/version.buildTime=2023-01-01T00:00:00Z \
// -X go.ntppool.org/common/version.gitVersion=abc123"
//
// The package also automatically extracts build information from Go's debug.BuildInfo
// when available, providing fallback values for VCS time and revision.
// Build time supports both Unix epoch timestamps and RFC3339 format:
//
// # Unix epoch (simpler, recommended)
// go build -ldflags "-X go.ntppool.org/common/version.buildTime=$(date +%s)"
//
// # RFC3339 format
// go build -ldflags "-X go.ntppool.org/common/version.buildTime=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
//
// Both formats are automatically converted to RFC3339 for consistent output. The buildTime
// parameter takes priority over Git commit time. If buildTime is not specified, the package
// automatically extracts build information from Go's debug.BuildInfo when available,
// providing fallback values for VCS time and revision.
package version
import (
@@ -19,7 +29,9 @@ import (
"log/slog"
"runtime"
"runtime/debug"
"strconv"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
@@ -30,7 +42,7 @@ import (
// If not set, defaults to "dev-snapshot". The version should follow semantic versioning.
var (
VERSION string // Semantic version (e.g., "1.0.0" or "v1.0.0")
buildTime string // Build timestamp (RFC3339 format)
buildTime string // Build timestamp (Unix epoch or RFC3339, normalized to RFC3339)
gitVersion string // Git commit hash
gitModified bool // Whether the working tree was modified during build
)
@@ -38,6 +50,28 @@ var (
// info holds the consolidated version information extracted from build variables and debug.BuildInfo.
var info Info
// parseBuildTime converts a build time string to RFC3339 format.
// Supports both Unix epoch timestamps (numeric strings) and RFC3339 format.
// Returns the input unchanged if it cannot be parsed as either format.
func parseBuildTime(s string) string {
if s == "" {
return s
}
// Try parsing as Unix epoch timestamp (numeric string)
if epoch, err := strconv.ParseInt(s, 10, 64); err == nil {
return time.Unix(epoch, 0).UTC().Format(time.RFC3339)
}
// Try parsing as RFC3339 to validate format
if _, err := time.Parse(time.RFC3339, s); err == nil {
return s // Already in RFC3339 format
}
// Return original string if neither format works (graceful fallback)
return s
}
// Info represents structured version and build information.
// This struct is used for JSON serialization and programmatic access to build metadata.
type Info struct {
@@ -48,6 +82,7 @@ type Info struct {
}
func init() {
buildTime = parseBuildTime(buildTime)
info.BuildTime = buildTime
info.GitRev = gitVersion
@@ -67,9 +102,9 @@ func init() {
switch h.Key {
case "vcs.time":
if len(buildTime) == 0 {
buildTime = h.Value
buildTime = parseBuildTime(h.Value)
info.BuildTime = buildTime
}
info.BuildTime = h.Value
case "vcs.revision":
// https://blog.carlmjohnson.net/post/2023/golang-git-hash-how-to/
// todo: use BuildInfo.Main.Version if revision is empty

View File

@@ -309,3 +309,106 @@ func BenchmarkCheckVersionDevSnapshot(b *testing.B) {
_ = CheckVersion(version, minimum)
}
}
func TestParseBuildTime(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "Unix epoch timestamp",
input: "1672531200", // 2023-01-01T00:00:00Z
expected: "2023-01-01T00:00:00Z",
},
{
name: "Unix epoch zero",
input: "0",
expected: "1970-01-01T00:00:00Z",
},
{
name: "Valid RFC3339 format",
input: "2023-12-25T15:30:45Z",
expected: "2023-12-25T15:30:45Z",
},
{
name: "RFC3339 with timezone",
input: "2023-12-25T10:30:45-05:00",
expected: "2023-12-25T10:30:45-05:00",
},
{
name: "Empty string",
input: "",
expected: "",
},
{
name: "Invalid format - return unchanged",
input: "not-a-date",
expected: "not-a-date",
},
{
name: "Invalid timestamp - return unchanged",
input: "invalid-timestamp",
expected: "invalid-timestamp",
},
{
name: "Partial date - return unchanged",
input: "2023-01-01",
expected: "2023-01-01",
},
{
name: "Negative epoch - should work",
input: "-1",
expected: "1969-12-31T23:59:59Z",
},
{
name: "Large epoch timestamp",
input: "4102444800", // 2100-01-01T00:00:00Z
expected: "2100-01-01T00:00:00Z",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := parseBuildTime(tt.input)
if result != tt.expected {
t.Errorf("parseBuildTime(%q) = %q, expected %q", tt.input, result, tt.expected)
}
})
}
}
func TestParseBuildTimeConsistency(t *testing.T) {
// Test that calling parseBuildTime multiple times with the same input returns the same result
testInputs := []string{
"1672531200",
"2023-01-01T00:00:00Z",
"invalid-date",
"",
}
for _, input := range testInputs {
result1 := parseBuildTime(input)
result2 := parseBuildTime(input)
if result1 != result2 {
t.Errorf("parseBuildTime(%q) not consistent: %q != %q", input, result1, result2)
}
}
}
func BenchmarkParseBuildTime(b *testing.B) {
inputs := []string{
"1672531200", // Unix epoch
"2023-01-01T00:00:00Z", // RFC3339
"invalid-timestamp", // Invalid
"", // Empty
}
for _, input := range inputs {
b.Run(input, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = parseBuildTime(input)
}
})
}
}

View File

@@ -1,3 +1,27 @@
// Package fastlyxff provides Fastly CDN IP range management for trusted proxy handling.
//
// This package parses Fastly's public IP ranges JSON file and generates Echo framework
// trust options for proper client IP extraction from X-Forwarded-For headers.
// It's designed specifically for services deployed behind Fastly's CDN that need
// to identify real client IPs for logging, rate limiting, and security purposes.
//
// Fastly publishes their edge server IP ranges in a JSON format that this package
// consumes to automatically configure trusted proxy ranges. This ensures that
// X-Forwarded-For headers are only trusted when they originate from legitimate
// Fastly edge servers.
//
// Key features:
// - Automatic parsing of Fastly's IP ranges JSON format
// - Support for both IPv4 and IPv6 address ranges
// - Echo framework integration via TrustOption generation
// - CIDR notation parsing and validation
//
// The JSON file typically contains IP ranges in this format:
//
// {
// "addresses": ["23.235.32.0/20", "43.249.72.0/22", ...],
// "ipv6_addresses": ["2a04:4e40::/32", "2a04:4e42::/32", ...]
// }
package fastlyxff
import (
@@ -9,15 +33,29 @@ import (
"github.com/labstack/echo/v4"
)
// FastlyXFF represents Fastly's published IP ranges for their CDN edge servers.
// This structure matches the JSON format provided by Fastly for their public IP ranges.
// It contains separate lists for IPv4 and IPv6 CIDR ranges.
type FastlyXFF struct {
IPv4 []string `json:"addresses"`
IPv6 []string `json:"ipv6_addresses"`
IPv4 []string `json:"addresses"` // IPv4 CIDR ranges (e.g., "23.235.32.0/20")
IPv6 []string `json:"ipv6_addresses"` // IPv6 CIDR ranges (e.g., "2a04:4e40::/32")
}
// TrustedNets holds parsed network prefixes for efficient IP range checking.
// This type is currently unused but reserved for future optimizations
// where frequent IP range lookups might benefit from pre-parsed prefixes.
type TrustedNets struct {
prefixes []netip.Prefix
prefixes []netip.Prefix // Parsed network prefixes for efficient lookups
}
// New loads and parses Fastly IP ranges from a JSON file.
// The file should contain Fastly's published IP ranges in their standard JSON format.
//
// Parameters:
// - fileName: Path to the Fastly IP ranges JSON file
//
// Returns the parsed FastlyXFF structure or an error if the file cannot be
// read or the JSON format is invalid.
func New(fileName string) (*FastlyXFF, error) {
b, err := os.ReadFile(fileName)
if err != nil {
@@ -34,6 +72,19 @@ func New(fileName string) (*FastlyXFF, error) {
return &d, nil
}
// EchoTrustOption converts Fastly IP ranges into Echo framework trust options.
// This method generates trust configurations that tell Echo to accept X-Forwarded-For
// headers only from Fastly's edge servers, ensuring accurate client IP extraction.
//
// The generated trust options should be used with Echo's IP extractor:
//
// options, err := fastlyRanges.EchoTrustOption()
// if err != nil {
// return err
// }
// e.IPExtractor = echo.ExtractIPFromXFFHeader(options...)
//
// Returns a slice of Echo trust options or an error if any CIDR range cannot be parsed.
func (xff *FastlyXFF) EchoTrustOption() ([]echo.TrustOption, error) {
ranges := []echo.TrustOption{}