Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
da13a371b4 | |||
a1a5a6b8be | |||
96afb77844 | |||
c372d79d1d | |||
b5141d6a70 | |||
694f8ba1d3 |
@@ -1,3 +1,14 @@
|
||||
// Package apitls provides TLS certificate management with automatic renewal support.
|
||||
//
|
||||
// This package handles TLS certificate provisioning and management for secure
|
||||
// inter-service communication within the NTP Pool project infrastructure.
|
||||
// It provides both server and client certificate management through the
|
||||
// CertificateProvider interface and includes a trusted CA certificate pool
|
||||
// for validating certificates.
|
||||
//
|
||||
// The package integrates with certman for automatic certificate renewal
|
||||
// and includes embedded CA certificates for establishing trust relationships
|
||||
// between services.
|
||||
package apitls
|
||||
|
||||
import (
|
||||
@@ -13,11 +24,32 @@ import (
|
||||
//go:embed ca.pem
|
||||
var caBytes []byte
|
||||
|
||||
// CertificateProvider defines the interface for providing TLS certificates
|
||||
// for both server and client connections. Implementations should handle
|
||||
// certificate retrieval, caching, and renewal as needed.
|
||||
//
|
||||
// This interface supports both server-side certificate provisioning
|
||||
// (via GetCertificate) and client-side certificate authentication
|
||||
// (via GetClientCertificate).
|
||||
type CertificateProvider interface {
|
||||
// GetCertificate retrieves a server certificate based on the client hello information.
|
||||
// This method is typically used in tls.Config.GetCertificate for server-side TLS.
|
||||
GetCertificate(hello *tls.ClientHelloInfo) (*tls.Certificate, error)
|
||||
|
||||
// GetClientCertificate retrieves a client certificate for mutual TLS authentication.
|
||||
// This method is used in tls.Config.GetClientCertificate for client-side TLS.
|
||||
GetClientCertificate(certRequestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
}
|
||||
|
||||
// CAPool returns a certificate pool containing trusted CA certificates
|
||||
// for validating TLS connections within the NTP Pool infrastructure.
|
||||
//
|
||||
// The CA certificates are embedded in the binary and include the trusted
|
||||
// certificate authorities used for inter-service communication.
|
||||
// This pool should be used in tls.Config.RootCAs for client connections
|
||||
// or tls.Config.ClientCAs for server connections requiring client certificates.
|
||||
//
|
||||
// Returns an error if the embedded CA certificates cannot be parsed or loaded.
|
||||
func CAPool() (*x509.CertPool, error) {
|
||||
capool := x509.NewCertPool()
|
||||
if !capool.AppendCertsFromPEM(caBytes) {
|
||||
|
@@ -1,5 +1,18 @@
|
||||
// Package config provides NTP Pool specific
|
||||
// configuration tools.
|
||||
// Package config provides environment-based configuration management for NTP Pool services.
|
||||
//
|
||||
// This package handles configuration loading from environment variables and provides
|
||||
// utilities for constructing URLs for web and management interfaces. It supports
|
||||
// deployment-specific settings including hostname configuration, TLS settings,
|
||||
// and deployment modes.
|
||||
//
|
||||
// Configuration is loaded automatically from environment variables:
|
||||
// - deployment_mode: The deployment environment (devel, production, etc.)
|
||||
// - manage_hostname: Hostname for management interface
|
||||
// - web_hostname: Comma-separated list of web hostnames (first is primary)
|
||||
// - manage_tls: Enable TLS for management interface (yes, no, true, false)
|
||||
// - web_tls: Enable TLS for web interface (yes, no, true, false)
|
||||
//
|
||||
// The package includes code generation for accessor methods using the accessory tool.
|
||||
package config
|
||||
|
||||
import (
|
||||
@@ -13,6 +26,9 @@ import (
|
||||
|
||||
//go:generate go tool github.com/masaushi/accessory -type Config
|
||||
|
||||
// Config holds environment-based configuration for NTP Pool services.
|
||||
// It manages hostnames, TLS settings, and deployment modes loaded from
|
||||
// environment variables. The struct includes code-generated accessor methods.
|
||||
type Config struct {
|
||||
deploymentMode string `accessor:"getter"`
|
||||
|
||||
@@ -26,6 +42,16 @@ type Config struct {
|
||||
valid bool `accessor:"getter"`
|
||||
}
|
||||
|
||||
// New creates a new Config instance by loading configuration from environment variables.
|
||||
// It automatically parses hostnames, TLS settings, and deployment mode from the environment.
|
||||
// The configuration is considered valid if at least one web hostname is provided.
|
||||
//
|
||||
// Environment variables used:
|
||||
// - deployment_mode: Deployment environment identifier
|
||||
// - manage_hostname: Management interface hostname
|
||||
// - web_hostname: Comma-separated web hostnames (first becomes primary)
|
||||
// - manage_tls: Management interface TLS setting
|
||||
// - web_tls: Web interface TLS setting
|
||||
func New() *Config {
|
||||
c := Config{}
|
||||
c.deploymentMode = os.Getenv("deployment_mode")
|
||||
@@ -46,10 +72,26 @@ func New() *Config {
|
||||
return &c
|
||||
}
|
||||
|
||||
// WebURL constructs a complete URL for the web interface using the primary web hostname.
|
||||
// It automatically selects HTTP or HTTPS based on the web_tls configuration setting.
|
||||
//
|
||||
// Parameters:
|
||||
// - path: URL path component (should start with "/")
|
||||
// - query: Optional URL query parameters (can be nil)
|
||||
//
|
||||
// Returns a complete URL string suitable for web interface requests.
|
||||
func (c *Config) WebURL(path string, query *url.Values) string {
|
||||
return baseURL(c.webHostname, c.webTLS, path, query)
|
||||
}
|
||||
|
||||
// ManageURL constructs a complete URL for the management interface using the management hostname.
|
||||
// It automatically selects HTTP or HTTPS based on the manage_tls configuration setting.
|
||||
//
|
||||
// Parameters:
|
||||
// - path: URL path component (should start with "/")
|
||||
// - query: Optional URL query parameters (can be nil)
|
||||
//
|
||||
// Returns a complete URL string suitable for management interface requests.
|
||||
func (c *Config) ManageURL(path string, query *url.Values) string {
|
||||
return baseURL(c.manageHostname, c.webTLS, path, query)
|
||||
}
|
||||
|
@@ -1,3 +1,19 @@
|
||||
// Package depenv provides deployment environment management for NTP Pool services.
|
||||
//
|
||||
// This package handles different deployment environments (development, test, production)
|
||||
// and provides environment-specific configuration including API endpoints, management URLs,
|
||||
// and monitoring domains. It supports string-based environment identification and
|
||||
// automatic URL construction for various service endpoints.
|
||||
//
|
||||
// The package defines three main deployment environments:
|
||||
// - DeployDevel: Development environment with dev-specific endpoints
|
||||
// - DeployTest: Test/beta environment for staging
|
||||
// - DeployProd: Production environment with live endpoints
|
||||
//
|
||||
// Environment detection supports both short and long forms:
|
||||
// - "dev" or "devel" → DeployDevel
|
||||
// - "test" or "beta" → DeployTest
|
||||
// - "prod" → DeployProd
|
||||
package depenv
|
||||
|
||||
import (
|
||||
@@ -24,14 +40,27 @@ var apiServers = map[DeploymentEnvironment]string{
|
||||
// }
|
||||
|
||||
const (
|
||||
// DeployUndefined represents an unrecognized or unset deployment environment.
|
||||
DeployUndefined DeploymentEnvironment = iota
|
||||
// DeployDevel represents the development environment.
|
||||
DeployDevel
|
||||
// DeployTest represents the test/beta environment.
|
||||
DeployTest
|
||||
// DeployProd represents the production environment.
|
||||
DeployProd
|
||||
)
|
||||
|
||||
// DeploymentEnvironment represents a deployment environment type.
|
||||
// It provides methods for environment-specific URL construction and
|
||||
// supports text marshaling/unmarshaling for configuration files.
|
||||
type DeploymentEnvironment uint8
|
||||
|
||||
// DeploymentEnvironmentFromString parses a string into a DeploymentEnvironment.
|
||||
// It supports both short and long forms of environment names:
|
||||
// - "dev" or "devel" → DeployDevel
|
||||
// - "test" or "beta" → DeployTest
|
||||
// - "prod" → DeployProd
|
||||
// - any other value → DeployUndefined
|
||||
func DeploymentEnvironmentFromString(s string) DeploymentEnvironment {
|
||||
switch s {
|
||||
case "devel", "dev":
|
||||
@@ -45,6 +74,8 @@ func DeploymentEnvironmentFromString(s string) DeploymentEnvironment {
|
||||
}
|
||||
}
|
||||
|
||||
// String returns the canonical string representation of the deployment environment.
|
||||
// Returns "prod", "test", "devel", or panics for invalid environments.
|
||||
func (d DeploymentEnvironment) String() string {
|
||||
switch d {
|
||||
case DeployProd:
|
||||
@@ -58,6 +89,9 @@ func (d DeploymentEnvironment) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
// APIHost returns the API server URL for this deployment environment.
|
||||
// It first checks the API_HOST environment variable for overrides,
|
||||
// then falls back to the environment-specific default API endpoint.
|
||||
func (d DeploymentEnvironment) APIHost() string {
|
||||
if apiHost := os.Getenv("API_HOST"); apiHost != "" {
|
||||
return apiHost
|
||||
@@ -65,14 +99,26 @@ func (d DeploymentEnvironment) APIHost() string {
|
||||
return apiServers[d]
|
||||
}
|
||||
|
||||
// ManageURL constructs a management interface URL for this deployment environment.
|
||||
// It combines the environment-specific management server base URL with the provided path.
|
||||
//
|
||||
// The path parameter should start with "/" for proper URL construction.
|
||||
func (d DeploymentEnvironment) ManageURL(path string) string {
|
||||
return manageServers[d] + path
|
||||
}
|
||||
|
||||
// MonitorDomain returns the monitoring domain for this deployment environment.
|
||||
// The domain follows the pattern: {environment}.mon.ntppool.dev
|
||||
// For example: "devel.mon.ntppool.dev" for the development environment.
|
||||
func (d DeploymentEnvironment) MonitorDomain() string {
|
||||
return d.String() + ".mon.ntppool.dev"
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
// It allows DeploymentEnvironment to be unmarshaled from configuration files
|
||||
// and other text-based formats. Empty strings are treated as valid (no-op).
|
||||
//
|
||||
// Returns an error if the text represents an invalid deployment environment.
|
||||
func (d *DeploymentEnvironment) UnmarshalText(text []byte) error {
|
||||
s := string(text)
|
||||
if s == "" {
|
||||
|
61
database/config.go
Normal file
61
database/config.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Config represents the database configuration structure
|
||||
type Config struct {
|
||||
MySQL DBConfig `yaml:"mysql"`
|
||||
}
|
||||
|
||||
// DBConfig represents the MySQL database configuration
|
||||
type DBConfig struct {
|
||||
DSN string `default:"" flag:"dsn" usage:"Database DSN"`
|
||||
User string `default:"" flag:"user"`
|
||||
Pass string `default:"" flag:"pass"`
|
||||
DBName string // Optional database name override
|
||||
}
|
||||
|
||||
// ConfigOptions allows customization of database opening behavior
|
||||
type ConfigOptions struct {
|
||||
// ConfigFiles is a list of config file paths to search for database configuration
|
||||
ConfigFiles []string
|
||||
|
||||
// EnablePoolMonitoring enables connection pool metrics collection
|
||||
EnablePoolMonitoring bool
|
||||
|
||||
// PrometheusRegisterer for metrics collection. If nil, no metrics are collected.
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
|
||||
// Connection pool settings
|
||||
MaxOpenConns int
|
||||
MaxIdleConns int
|
||||
ConnMaxLifetime time.Duration
|
||||
}
|
||||
|
||||
// DefaultConfigOptions returns the standard configuration options used by API package
|
||||
func DefaultConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: true,
|
||||
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
||||
MaxOpenConns: 25,
|
||||
MaxIdleConns: 10,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
||||
func MonitorConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
||||
PrometheusRegisterer: nil, // No Prometheus dependency
|
||||
MaxOpenConns: 10,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
81
database/config_test.go
Normal file
81
database/config_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func TestDefaultConfigOptions(t *testing.T) {
|
||||
opts := DefaultConfigOptions()
|
||||
|
||||
// Verify expected defaults for API package
|
||||
if opts.MaxOpenConns != 25 {
|
||||
t.Errorf("Expected MaxOpenConns=25, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 10 {
|
||||
t.Errorf("Expected MaxIdleConns=10, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
if opts.ConnMaxLifetime != 3*time.Minute {
|
||||
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
|
||||
}
|
||||
if !opts.EnablePoolMonitoring {
|
||||
t.Error("Expected EnablePoolMonitoring=true")
|
||||
}
|
||||
if opts.PrometheusRegisterer != prometheus.DefaultRegisterer {
|
||||
t.Error("Expected PrometheusRegisterer to be DefaultRegisterer")
|
||||
}
|
||||
if len(opts.ConfigFiles) == 0 {
|
||||
t.Error("Expected ConfigFiles to be non-empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorConfigOptions(t *testing.T) {
|
||||
opts := MonitorConfigOptions()
|
||||
|
||||
// Verify expected defaults for Monitor package
|
||||
if opts.MaxOpenConns != 10 {
|
||||
t.Errorf("Expected MaxOpenConns=10, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 5 {
|
||||
t.Errorf("Expected MaxIdleConns=5, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
if opts.ConnMaxLifetime != 3*time.Minute {
|
||||
t.Errorf("Expected ConnMaxLifetime=3m, got %v", opts.ConnMaxLifetime)
|
||||
}
|
||||
if opts.EnablePoolMonitoring {
|
||||
t.Error("Expected EnablePoolMonitoring=false")
|
||||
}
|
||||
if opts.PrometheusRegisterer != nil {
|
||||
t.Error("Expected PrometheusRegisterer to be nil")
|
||||
}
|
||||
if len(opts.ConfigFiles) == 0 {
|
||||
t.Error("Expected ConfigFiles to be non-empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigStructures(t *testing.T) {
|
||||
// Test that configuration structures can be created and populated
|
||||
config := Config{
|
||||
MySQL: DBConfig{
|
||||
DSN: "user:pass@tcp(localhost:3306)/dbname",
|
||||
User: "testuser",
|
||||
Pass: "testpass",
|
||||
DBName: "testdb",
|
||||
},
|
||||
}
|
||||
|
||||
if config.MySQL.DSN == "" {
|
||||
t.Error("Expected DSN to be set")
|
||||
}
|
||||
if config.MySQL.User != "testuser" {
|
||||
t.Errorf("Expected User='testuser', got '%s'", config.MySQL.User)
|
||||
}
|
||||
if config.MySQL.Pass != "testpass" {
|
||||
t.Errorf("Expected Pass='testpass', got '%s'", config.MySQL.Pass)
|
||||
}
|
||||
if config.MySQL.DBName != "testdb" {
|
||||
t.Errorf("Expected DBName='testdb', got '%s'", config.MySQL.DBName)
|
||||
}
|
||||
}
|
88
database/connector.go
Normal file
88
database/connector.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// from https://github.com/Boostport/dynamic-database-config
|
||||
|
||||
// CreateConnectorFunc is a function that creates a database connector
|
||||
type CreateConnectorFunc func() (driver.Connector, error)
|
||||
|
||||
// Driver implements the sql/driver interface with dynamic configuration
|
||||
type Driver struct {
|
||||
CreateConnectorFunc CreateConnectorFunc
|
||||
}
|
||||
|
||||
// Driver returns the driver instance
|
||||
func (d Driver) Driver() driver.Driver {
|
||||
return d
|
||||
}
|
||||
|
||||
// Connect creates a new database connection using the dynamic connector
|
||||
func (d Driver) Connect(ctx context.Context) (driver.Conn, error) {
|
||||
connector, err := d.CreateConnectorFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating connector from function: %w", err)
|
||||
}
|
||||
|
||||
return connector.Connect(ctx)
|
||||
}
|
||||
|
||||
// Open is not supported for dynamic configuration
|
||||
func (d Driver) Open(name string) (driver.Conn, error) {
|
||||
return nil, errors.New("open is not supported")
|
||||
}
|
||||
|
||||
// createConnector creates a connector function that reads configuration from a file
|
||||
func createConnector(configFile string) CreateConnectorFunc {
|
||||
return func() (driver.Connector, error) {
|
||||
dbFile, err := os.Open(configFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer dbFile.Close()
|
||||
|
||||
dec := yaml.NewDecoder(dbFile)
|
||||
cfg := Config{}
|
||||
|
||||
err = dec.Decode(&cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dsn := cfg.MySQL.DSN
|
||||
if len(dsn) == 0 {
|
||||
dsn = os.Getenv("DATABASE_DSN")
|
||||
if len(dsn) == 0 {
|
||||
return nil, fmt.Errorf("dsn config in database.yaml or DATABASE_DSN environment variable required")
|
||||
}
|
||||
}
|
||||
|
||||
dbcfg, err := mysql.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if user := cfg.MySQL.User; len(user) > 0 {
|
||||
dbcfg.User = user
|
||||
}
|
||||
|
||||
if pass := cfg.MySQL.Pass; len(pass) > 0 {
|
||||
dbcfg.Passwd = pass
|
||||
}
|
||||
|
||||
if name := cfg.MySQL.DBName; len(name) > 0 {
|
||||
dbcfg.DBName = name
|
||||
}
|
||||
|
||||
return mysql.NewConnector(dbcfg)
|
||||
}
|
||||
}
|
117
database/integration_test.go
Normal file
117
database/integration_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Mock types for testing SQLC integration patterns
|
||||
type mockQueries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
type mockQueriesTx struct {
|
||||
*mockQueries
|
||||
tx *sql.Tx
|
||||
}
|
||||
|
||||
// Mock the Begin method pattern that SQLC generates
|
||||
func (q *mockQueries) Begin(ctx context.Context) (*mockQueriesTx, error) {
|
||||
// This would normally be: tx, err := q.db.(*sql.DB).BeginTx(ctx, nil)
|
||||
// For our test, we return a mock
|
||||
return &mockQueriesTx{mockQueries: q, tx: nil}, nil
|
||||
}
|
||||
|
||||
func (qtx *mockQueriesTx) Commit(ctx context.Context) error {
|
||||
return nil // Mock implementation
|
||||
}
|
||||
|
||||
func (qtx *mockQueriesTx) Rollback(ctx context.Context) error {
|
||||
return nil // Mock implementation
|
||||
}
|
||||
|
||||
// This test verifies that our common database interfaces are compatible with SQLC-generated code
|
||||
func TestSQLCIntegration(t *testing.T) {
|
||||
// Test that SQLC's DBTX interface matches our DBTX interface
|
||||
t.Run("DBTX Interface Compatibility", func(t *testing.T) {
|
||||
// Test interface compatibility by assignment without execution
|
||||
var ourDBTX DBTX
|
||||
|
||||
// Test with sql.DB (should implement DBTX)
|
||||
var db *sql.DB
|
||||
ourDBTX = db // This will compile only if interfaces are compatible
|
||||
_ = ourDBTX // Use the variable to avoid "unused" warning
|
||||
|
||||
// Test with sql.Tx (should implement DBTX)
|
||||
var tx *sql.Tx
|
||||
ourDBTX = tx // This will compile only if interfaces are compatible
|
||||
_ = ourDBTX // Use the variable to avoid "unused" warning
|
||||
|
||||
// If we reach here, interfaces are compatible
|
||||
t.Log("DBTX interface is compatible with sql.DB and sql.Tx")
|
||||
})
|
||||
|
||||
t.Run("Transaction Interface Compatibility", func(t *testing.T) {
|
||||
// This test verifies our transaction interfaces work with SQLC patterns
|
||||
// We can't define methods inside a function, so we test interface compatibility
|
||||
|
||||
// Verify our DB interface is compatible with what SQLC expects
|
||||
var dbInterface DB[*mockQueriesTx]
|
||||
var mockDB *mockQueries = &mockQueries{}
|
||||
dbInterface = mockDB
|
||||
|
||||
// Test that our transaction helper can work with this pattern
|
||||
err := WithTransaction(context.Background(), dbInterface, func(ctx context.Context, qtx *mockQueriesTx) error {
|
||||
// This would be where you'd call SQLC-generated query methods
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Transaction helper failed: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test that demonstrates how the common package would be used with real SQLC patterns
|
||||
func TestRealWorldUsagePattern(t *testing.T) {
|
||||
// This test shows how a package would typically use our common database code
|
||||
|
||||
t.Run("Database Opening Pattern", func(t *testing.T) {
|
||||
// Test that our configuration options work as expected
|
||||
opts := DefaultConfigOptions()
|
||||
|
||||
// Modify for test environment (no actual database connection)
|
||||
opts.ConfigFiles = []string{} // No config files for unit test
|
||||
opts.PrometheusRegisterer = nil // No metrics for unit test
|
||||
|
||||
// This would normally open a database: db, err := OpenDB(ctx, opts)
|
||||
// For our unit test, we just verify the options are reasonable
|
||||
if opts.MaxOpenConns <= 0 {
|
||||
t.Error("MaxOpenConns should be positive")
|
||||
}
|
||||
if opts.MaxIdleConns <= 0 {
|
||||
t.Error("MaxIdleConns should be positive")
|
||||
}
|
||||
if opts.ConnMaxLifetime <= 0 {
|
||||
t.Error("ConnMaxLifetime should be positive")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Monitor Package Configuration", func(t *testing.T) {
|
||||
opts := MonitorConfigOptions()
|
||||
|
||||
// Verify monitor-specific settings
|
||||
if opts.EnablePoolMonitoring {
|
||||
t.Error("Monitor package should not enable pool monitoring")
|
||||
}
|
||||
if opts.PrometheusRegisterer != nil {
|
||||
t.Error("Monitor package should not have Prometheus registerer")
|
||||
}
|
||||
if opts.MaxOpenConns != 10 {
|
||||
t.Errorf("Expected MaxOpenConns=10 for monitor, got %d", opts.MaxOpenConns)
|
||||
}
|
||||
if opts.MaxIdleConns != 5 {
|
||||
t.Errorf("Expected MaxIdleConns=5 for monitor, got %d", opts.MaxIdleConns)
|
||||
}
|
||||
})
|
||||
}
|
34
database/interfaces.go
Normal file
34
database/interfaces.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// DBTX matches the interface expected by SQLC-generated code
|
||||
// This interface is implemented by both *sql.DB and *sql.Tx
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// BaseQuerier provides basic query functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerier interface {
|
||||
WithTx(tx *sql.Tx) BaseQuerier
|
||||
}
|
||||
|
||||
// BaseQuerierTx provides transaction functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerierTx interface {
|
||||
BaseQuerier
|
||||
Begin(ctx context.Context) (BaseQuerierTx, error)
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// TransactionFunc represents a function that operates within a database transaction
|
||||
// This is used by the shared transaction helpers in transaction.go
|
||||
type TransactionFunc[Q any] func(ctx context.Context, q Q) error
|
93
database/metrics.go
Normal file
93
database/metrics.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
|
||||
type DatabaseMetrics struct {
|
||||
ConnectionsOpen prometheus.Gauge
|
||||
ConnectionsIdle prometheus.Gauge
|
||||
ConnectionsInUse prometheus.Gauge
|
||||
ConnectionsWaitCount prometheus.Counter
|
||||
ConnectionsWaitDuration prometheus.Histogram
|
||||
}
|
||||
|
||||
// NewDatabaseMetrics creates a new set of database metrics and registers them
|
||||
func NewDatabaseMetrics(registerer prometheus.Registerer) *DatabaseMetrics {
|
||||
metrics := &DatabaseMetrics{
|
||||
ConnectionsOpen: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_open",
|
||||
Help: "Number of open database connections",
|
||||
}),
|
||||
ConnectionsIdle: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_idle",
|
||||
Help: "Number of idle database connections",
|
||||
}),
|
||||
ConnectionsInUse: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_in_use",
|
||||
Help: "Number of database connections in use",
|
||||
}),
|
||||
ConnectionsWaitCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "database_connections_wait_count_total",
|
||||
Help: "Total number of times a connection had to wait",
|
||||
}),
|
||||
ConnectionsWaitDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "database_connections_wait_duration_seconds",
|
||||
Help: "Time spent waiting for a database connection",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
}),
|
||||
}
|
||||
|
||||
if registerer != nil {
|
||||
registerer.MustRegister(
|
||||
metrics.ConnectionsOpen,
|
||||
metrics.ConnectionsIdle,
|
||||
metrics.ConnectionsInUse,
|
||||
metrics.ConnectionsWaitCount,
|
||||
metrics.ConnectionsWaitDuration,
|
||||
)
|
||||
}
|
||||
|
||||
return metrics
|
||||
}
|
||||
|
||||
// monitorConnectionPool runs a background goroutine to collect connection pool metrics
|
||||
func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheus.Registerer) {
|
||||
if registerer == nil {
|
||||
return // No metrics collection if no registerer provided
|
||||
}
|
||||
|
||||
metrics := NewDatabaseMetrics(registerer)
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
stats := db.Stats()
|
||||
|
||||
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
|
||||
metrics.ConnectionsIdle.Set(float64(stats.Idle))
|
||||
metrics.ConnectionsInUse.Set(float64(stats.InUse))
|
||||
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
|
||||
|
||||
if stats.WaitDuration > 0 {
|
||||
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds())
|
||||
}
|
||||
|
||||
// Log connection pool stats for high usage or waiting
|
||||
if stats.OpenConnections > 20 || stats.WaitCount > 0 {
|
||||
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n",
|
||||
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
78
database/pool.go
Normal file
78
database/pool.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// OpenDB opens a database connection with the specified configuration options
|
||||
func OpenDB(ctx context.Context, options ConfigOptions) (*sql.DB, error) {
|
||||
log := logger.Setup()
|
||||
|
||||
configFile, err := findConfigFile(options.ConfigFiles)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbconn := sql.OpenDB(Driver{
|
||||
CreateConnectorFunc: createConnector(configFile),
|
||||
})
|
||||
|
||||
// Set connection pool parameters
|
||||
dbconn.SetConnMaxLifetime(options.ConnMaxLifetime)
|
||||
dbconn.SetMaxOpenConns(options.MaxOpenConns)
|
||||
dbconn.SetMaxIdleConns(options.MaxIdleConns)
|
||||
|
||||
err = dbconn.Ping()
|
||||
if err != nil {
|
||||
log.Error("could not connect to database", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start optional connection pool monitoring
|
||||
if options.EnablePoolMonitoring && options.PrometheusRegisterer != nil {
|
||||
go monitorConnectionPool(ctx, dbconn, options.PrometheusRegisterer)
|
||||
}
|
||||
|
||||
return dbconn, nil
|
||||
}
|
||||
|
||||
// OpenDBWithConfigFile opens a database connection using an explicit config file path
|
||||
// This is a convenience function for API package compatibility
|
||||
func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, error) {
|
||||
options := DefaultConfigOptions()
|
||||
options.ConfigFiles = []string{configFile}
|
||||
return OpenDB(ctx, options)
|
||||
}
|
||||
|
||||
// OpenDBMonitor opens a database connection with monitor-specific defaults
|
||||
// This is a convenience function for Monitor package compatibility
|
||||
func OpenDBMonitor() (*sql.DB, error) {
|
||||
options := MonitorConfigOptions()
|
||||
return OpenDB(context.Background(), options)
|
||||
}
|
||||
|
||||
// findConfigFile searches for the first existing config file from the list
|
||||
func findConfigFile(configFiles []string) (string, error) {
|
||||
var firstErr error
|
||||
|
||||
for _, configFile := range configFiles {
|
||||
if configFile == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(configFile); err == nil {
|
||||
return configFile, nil
|
||||
} else if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
if firstErr != nil {
|
||||
return "", fmt.Errorf("no config file found: %w", firstErr)
|
||||
}
|
||||
return "", fmt.Errorf("no valid config files provided")
|
||||
}
|
69
database/transaction.go
Normal file
69
database/transaction.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// DB interface for database operations that can begin transactions
|
||||
type DB[Q any] interface {
|
||||
Begin(ctx context.Context) (Q, error)
|
||||
}
|
||||
|
||||
// TX interface for transaction operations
|
||||
type TX interface {
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// WithTransaction executes a function within a database transaction
|
||||
// Handles proper rollback on error and commit on success
|
||||
func WithTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
var committed bool
|
||||
defer func() {
|
||||
if !committed {
|
||||
if rbErr := tx.Rollback(ctx); rbErr != nil {
|
||||
// Log rollback error but don't override original error
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "failed to rollback transaction", "error", rbErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := fn(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
committed = true // Mark as committed regardless of commit success/failure
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithReadOnlyTransaction executes a read-only function within a transaction
|
||||
// Always rolls back at the end (for consistent read isolation)
|
||||
func WithReadOnlyTransaction[Q TX](ctx context.Context, db DB[Q], fn func(ctx context.Context, q Q) error) error {
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin read-only transaction: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rbErr := tx.Rollback(ctx); rbErr != nil {
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "failed to rollback read-only transaction", "error", rbErr)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, tx)
|
||||
}
|
69
database/transaction_base.go
Normal file
69
database/transaction_base.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// Shared interface definitions that both packages use identically
|
||||
type BaseBeginner interface {
|
||||
Begin(context.Context) (sql.Tx, error)
|
||||
}
|
||||
|
||||
type BaseTx interface {
|
||||
BaseBeginner
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// BeginTransactionForQuerier contains the shared Begin() logic from both packages
|
||||
func BeginTransactionForQuerier(ctx context.Context, db DBTX) (DBTX, error) {
|
||||
if sqlDB, ok := db.(*sql.DB); ok {
|
||||
tx, err := sqlDB.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tx, nil
|
||||
} else {
|
||||
// Handle transaction case
|
||||
if beginner, ok := db.(BaseBeginner); ok {
|
||||
tx, err := beginner.Begin(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tx, nil
|
||||
}
|
||||
return nil, fmt.Errorf("database connection does not support transactions")
|
||||
}
|
||||
}
|
||||
|
||||
// CommitTransactionForQuerier contains the shared Commit() logic from both packages
|
||||
func CommitTransactionForQuerier(ctx context.Context, db DBTX) error {
|
||||
if sqlTx, ok := db.(*sql.Tx); ok {
|
||||
return sqlTx.Commit()
|
||||
}
|
||||
|
||||
tx, ok := db.(BaseTx)
|
||||
if !ok {
|
||||
log := logger.FromContext(ctx)
|
||||
log.ErrorContext(ctx, "could not get a Tx", "type", fmt.Sprintf("%T", db))
|
||||
return sql.ErrTxDone
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// RollbackTransactionForQuerier contains the shared Rollback() logic from both packages
|
||||
func RollbackTransactionForQuerier(ctx context.Context, db DBTX) error {
|
||||
if sqlTx, ok := db.(*sql.Tx); ok {
|
||||
return sqlTx.Rollback()
|
||||
}
|
||||
|
||||
tx, ok := db.(BaseTx)
|
||||
if !ok {
|
||||
return sql.ErrTxDone
|
||||
}
|
||||
return tx.Rollback(ctx)
|
||||
}
|
157
database/transaction_test.go
Normal file
157
database/transaction_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Mock implementations for testing
|
||||
type mockDB struct {
|
||||
beginError error
|
||||
txMock *mockTX
|
||||
}
|
||||
|
||||
func (m *mockDB) Begin(ctx context.Context) (*mockTX, error) {
|
||||
if m.beginError != nil {
|
||||
return nil, m.beginError
|
||||
}
|
||||
return m.txMock, nil
|
||||
}
|
||||
|
||||
type mockTX struct {
|
||||
commitError error
|
||||
rollbackError error
|
||||
commitCalled bool
|
||||
rollbackCalled bool
|
||||
}
|
||||
|
||||
func (m *mockTX) Commit(ctx context.Context) error {
|
||||
m.commitCalled = true
|
||||
return m.commitError
|
||||
}
|
||||
|
||||
func (m *mockTX) Rollback(ctx context.Context) error {
|
||||
m.rollbackCalled = true
|
||||
return m.rollbackError
|
||||
}
|
||||
|
||||
func TestWithTransaction_Success(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
var functionCalled bool
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
functionCalled = true
|
||||
if q != tx {
|
||||
t.Error("Expected transaction to be passed to function")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !functionCalled {
|
||||
t.Error("Expected function to be called")
|
||||
}
|
||||
if !tx.commitCalled {
|
||||
t.Error("Expected commit to be called")
|
||||
}
|
||||
if tx.rollbackCalled {
|
||||
t.Error("Expected rollback NOT to be called on success")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_FunctionError(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
expectedError := errors.New("function error")
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return expectedError
|
||||
})
|
||||
|
||||
if err != expectedError {
|
||||
t.Errorf("Expected error %v, got %v", expectedError, err)
|
||||
}
|
||||
if tx.commitCalled {
|
||||
t.Error("Expected commit NOT to be called on function error")
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called on function error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_BeginError(t *testing.T) {
|
||||
expectedError := errors.New("begin error")
|
||||
db := &mockDB{beginError: expectedError}
|
||||
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
t.Error("Function should not be called when Begin fails")
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil || !errors.Is(err, expectedError) {
|
||||
t.Errorf("Expected wrapped begin error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithTransaction_CommitError(t *testing.T) {
|
||||
commitError := errors.New("commit error")
|
||||
tx := &mockTX{commitError: commitError}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
err := WithTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
if err == nil || !errors.Is(err, commitError) {
|
||||
t.Errorf("Expected wrapped commit error, got %v", err)
|
||||
}
|
||||
if !tx.commitCalled {
|
||||
t.Error("Expected commit to be called")
|
||||
}
|
||||
if tx.rollbackCalled {
|
||||
t.Error("Expected rollback NOT to be called when commit fails")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithReadOnlyTransaction_Success(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
var functionCalled bool
|
||||
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
functionCalled = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !functionCalled {
|
||||
t.Error("Expected function to be called")
|
||||
}
|
||||
if tx.commitCalled {
|
||||
t.Error("Expected commit NOT to be called in read-only transaction")
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called in read-only transaction")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithReadOnlyTransaction_FunctionError(t *testing.T) {
|
||||
tx := &mockTX{}
|
||||
db := &mockDB{txMock: tx}
|
||||
|
||||
expectedError := errors.New("function error")
|
||||
err := WithReadOnlyTransaction(context.Background(), db, func(ctx context.Context, q *mockTX) error {
|
||||
return expectedError
|
||||
})
|
||||
|
||||
if err != expectedError {
|
||||
t.Errorf("Expected error %v, got %v", expectedError, err)
|
||||
}
|
||||
if !tx.rollbackCalled {
|
||||
t.Error("Expected rollback to be called")
|
||||
}
|
||||
}
|
66
ekko/ekko.go
66
ekko/ekko.go
@@ -1,3 +1,32 @@
|
||||
// Package ekko provides an enhanced Echo web framework wrapper with pre-configured middleware.
|
||||
//
|
||||
// This package wraps the Echo web framework with a comprehensive middleware stack including:
|
||||
// - OpenTelemetry distributed tracing with request context propagation
|
||||
// - Prometheus metrics collection with per-service subsystems
|
||||
// - Structured logging with trace ID correlation
|
||||
// - Security headers (HSTS, content security policy)
|
||||
// - Gzip compression for response optimization
|
||||
// - Recovery middleware with detailed error logging
|
||||
// - HTTP/2 support with H2C (HTTP/2 Cleartext) capability
|
||||
//
|
||||
// The package uses functional options pattern for flexible configuration
|
||||
// and supports graceful shutdown with configurable timeouts. It's designed
|
||||
// as the standard web service foundation for NTP Pool project services.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// ekko, err := ekko.New("myservice",
|
||||
// ekko.WithPort(8080),
|
||||
// ekko.WithPrometheus(prometheus.DefaultRegisterer),
|
||||
// ekko.WithEchoSetup(func(e *echo.Echo) error {
|
||||
// e.GET("/health", healthHandler)
|
||||
// return nil
|
||||
// }),
|
||||
// )
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// err = ekko.Start(ctx)
|
||||
package ekko
|
||||
|
||||
import (
|
||||
@@ -20,6 +49,25 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// New creates a new Ekko instance with the specified service name and functional options.
|
||||
// The name parameter is used for OpenTelemetry service identification, Prometheus metrics
|
||||
// subsystem naming, and server identification headers.
|
||||
//
|
||||
// Default configuration includes:
|
||||
// - 60 second write timeout
|
||||
// - 30 second read header timeout
|
||||
// - HTTP/2 support with H2C
|
||||
// - Standard middleware stack (tracing, metrics, logging, security)
|
||||
//
|
||||
// Use functional options to customize behavior:
|
||||
// - WithPort(): Set server port (required for Start())
|
||||
// - WithPrometheus(): Enable Prometheus metrics
|
||||
// - WithEchoSetup(): Configure routes and handlers
|
||||
// - WithLogFilters(): Filter access logs
|
||||
// - WithOtelMiddleware(): Custom OpenTelemetry middleware
|
||||
// - WithWriteTimeout(): Custom write timeout
|
||||
// - WithReadHeaderTimeout(): Custom read header timeout
|
||||
// - WithGzipConfig(): Custom gzip compression settings
|
||||
func New(name string, options ...func(*Ekko)) (*Ekko, error) {
|
||||
ek := &Ekko{
|
||||
writeTimeout: 60 * time.Second,
|
||||
@@ -32,13 +80,25 @@ func New(name string, options ...func(*Ekko)) (*Ekko, error) {
|
||||
return ek, nil
|
||||
}
|
||||
|
||||
// Setup Echo; only intended for testing
|
||||
// SetupEcho creates and configures an Echo instance without starting the server.
|
||||
// This method is primarily intended for testing scenarios where you need access
|
||||
// to the configured Echo instance without starting the HTTP server.
|
||||
//
|
||||
// The returned Echo instance includes all configured middleware and routes
|
||||
// but requires manual server lifecycle management.
|
||||
func (ek *Ekko) SetupEcho(ctx context.Context) (*echo.Echo, error) {
|
||||
return ek.setup(ctx)
|
||||
}
|
||||
|
||||
// Setup Echo and start the server. Will return if the http server
|
||||
// returns or the context is done.
|
||||
// Start creates the Echo instance and starts the HTTP server with graceful shutdown support.
|
||||
// The server runs until either an error occurs or the provided context is cancelled.
|
||||
//
|
||||
// The server supports HTTP/2 with H2C (HTTP/2 Cleartext) and includes a 5-second
|
||||
// graceful shutdown timeout when the context is cancelled. Server configuration
|
||||
// (port, timeouts, middleware) must be set via functional options during New().
|
||||
//
|
||||
// Returns an error if server startup fails or if shutdown doesn't complete within
|
||||
// the timeout period. Returns nil for clean shutdown via context cancellation.
|
||||
func (ek *Ekko) Start(ctx context.Context) error {
|
||||
log := logger.Setup()
|
||||
|
||||
|
@@ -9,6 +9,9 @@ import (
|
||||
slogecho "github.com/samber/slog-echo"
|
||||
)
|
||||
|
||||
// Ekko represents an enhanced Echo web server with pre-configured middleware stack.
|
||||
// It encapsulates server configuration, middleware options, and lifecycle management
|
||||
// for NTP Pool web services. Use New() with functional options to configure.
|
||||
type Ekko struct {
|
||||
name string
|
||||
prom prometheus.Registerer
|
||||
@@ -22,50 +25,76 @@ type Ekko struct {
|
||||
readHeaderTimeout time.Duration
|
||||
}
|
||||
|
||||
// RouteFn defines a function type for configuring Echo routes and handlers.
|
||||
// It receives a configured Echo instance and should register all application
|
||||
// routes, middleware, and handlers. Return an error to abort server startup.
|
||||
type RouteFn func(e *echo.Echo) error
|
||||
|
||||
// WithPort sets the HTTP server port. This option is required when using Start().
|
||||
// The port should be available and the process should have permission to bind to it.
|
||||
func WithPort(port int) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.port = port
|
||||
}
|
||||
}
|
||||
|
||||
// WithPrometheus enables Prometheus metrics collection using the provided registerer.
|
||||
// Metrics include HTTP request duration, request count, and response size histograms.
|
||||
// The service name is used as the metrics subsystem for namespacing.
|
||||
func WithPrometheus(reg prometheus.Registerer) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.prom = reg
|
||||
}
|
||||
}
|
||||
|
||||
// WithEchoSetup configures application routes and handlers via a setup function.
|
||||
// The provided function receives the configured Echo instance after all middleware
|
||||
// is applied and should register routes, custom middleware, and handlers.
|
||||
func WithEchoSetup(rfn RouteFn) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.routeFn = rfn
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogFilters configures access log filtering to reduce log noise.
|
||||
// Filters can exclude specific paths, methods, or status codes from access logs.
|
||||
// Useful for excluding health checks, metrics endpoints, and other high-frequency requests.
|
||||
func WithLogFilters(f []slogecho.Filter) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.logFilters = f
|
||||
}
|
||||
}
|
||||
|
||||
// WithOtelMiddleware replaces the default OpenTelemetry middleware with a custom implementation.
|
||||
// The default middleware provides distributed tracing for all requests. Use this option
|
||||
// when you need custom trace configuration or want to disable tracing entirely.
|
||||
func WithOtelMiddleware(mw echo.MiddlewareFunc) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.otelmiddleware = mw
|
||||
}
|
||||
}
|
||||
|
||||
// WithWriteTimeout configures the HTTP server write timeout.
|
||||
// This is the maximum duration before timing out writes of the response.
|
||||
// Default is 60 seconds. Should be longer than expected response generation time.
|
||||
func WithWriteTimeout(t time.Duration) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.writeTimeout = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithReadHeaderTimeout configures the HTTP server read header timeout.
|
||||
// This is the amount of time allowed to read request headers.
|
||||
// Default is 30 seconds. Should be sufficient for slow clients and large headers.
|
||||
func WithReadHeaderTimeout(t time.Duration) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.readHeaderTimeout = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithGzipConfig provides custom gzip compression configuration.
|
||||
// By default, gzip compression is enabled with standard settings.
|
||||
// Use this option to customize compression level, skip patterns, or disable compression.
|
||||
func WithGzipConfig(gzipConfig *middleware.GzipConfig) func(*Ekko) {
|
||||
return func(ek *Ekko) {
|
||||
ek.gzipConfig = gzipConfig
|
||||
|
5
go.mod
5
go.mod
@@ -4,10 +4,12 @@ go 1.23.5
|
||||
|
||||
require (
|
||||
github.com/abh/certman v0.4.0
|
||||
github.com/go-sql-driver/mysql v1.9.3
|
||||
github.com/labstack/echo-contrib v0.17.2
|
||||
github.com/labstack/echo/v4 v4.13.3
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
github.com/prometheus/client_golang v1.20.5
|
||||
github.com/prometheus/client_model v0.6.1
|
||||
github.com/remychantenay/slog-otel v1.3.2
|
||||
github.com/samber/slog-echo v1.14.8
|
||||
github.com/samber/slog-multi v1.2.4
|
||||
@@ -28,9 +30,11 @@ require (
|
||||
golang.org/x/net v0.33.0
|
||||
golang.org/x/sync v0.10.0
|
||||
google.golang.org/grpc v1.69.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
filippo.io/edwards25519 v1.1.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
@@ -47,7 +51,6 @@ require (
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.61.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
github.com/samber/lo v1.47.0 // indirect
|
||||
|
12
go.sum
12
go.sum
@@ -1,3 +1,5 @@
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/abh/certman v0.4.0 h1:XHoDtb0YyRQPclaHMrBDlKTVZpNjTK6vhB0S3Bd/Sbs=
|
||||
github.com/abh/certman v0.4.0/go.mod h1:x8QhpKVZifmV1Hdiwdg9gLo2GMPAxezz1s3zrVnPs+I=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@@ -17,6 +19,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
|
||||
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
@@ -30,6 +34,10 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/labstack/echo-contrib v0.17.2 h1:K1zivqmtcC70X9VdBFdLomjPDEVHlrcAObqmuFj1c6w=
|
||||
@@ -65,6 +73,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
|
||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||
github.com/remychantenay/slog-otel v1.3.2 h1:ZBx8qnwfLJ6e18Vba4e9Xp9B7khTmpIwFsU1sAmActw=
|
||||
github.com/remychantenay/slog-otel v1.3.2/go.mod h1:gKW4tQ8cGOKoA+bi7wtYba/tcJ6Tc9XyQ/EW8gHA/2E=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
|
||||
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
|
||||
@@ -211,6 +221,8 @@ google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7
|
||||
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
|
||||
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@@ -1,3 +1,32 @@
|
||||
// Package kafconn provides a Kafka client wrapper with TLS support for secure log streaming.
|
||||
//
|
||||
// This package handles Kafka connections with mutual TLS authentication for the NTP Pool
|
||||
// project's log streaming infrastructure. It provides factories for creating Kafka readers
|
||||
// and writers with automatic broker discovery, TLS configuration, and connection management.
|
||||
//
|
||||
// The package is designed specifically for the NTP Pool pipeline infrastructure and includes
|
||||
// hardcoded bootstrap servers and group configurations. It uses certman for automatic
|
||||
// certificate renewal and provides compression and batching optimizations.
|
||||
//
|
||||
// Key features:
|
||||
// - Mutual TLS authentication with automatic certificate renewal
|
||||
// - Broker discovery and connection pooling
|
||||
// - Reader and writer factory methods with optimized configurations
|
||||
// - LZ4 compression for efficient data transfer
|
||||
// - Configurable batch sizes and load balancing
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// tlsSetup := kafconn.TLSSetup{
|
||||
// CA: "/path/to/ca.pem",
|
||||
// Cert: "/path/to/client.pem",
|
||||
// Key: "/path/to/client.key",
|
||||
// }
|
||||
// kafka, err := kafconn.NewKafka(ctx, tlsSetup)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// writer, err := kafka.NewWriter("logs")
|
||||
package kafconn
|
||||
|
||||
import (
|
||||
@@ -24,12 +53,17 @@ const (
|
||||
// kafkaMinBatchSize = 1000
|
||||
)
|
||||
|
||||
// TLSSetup contains file paths for TLS certificate configuration.
|
||||
// All fields are required for establishing secure Kafka connections.
|
||||
type TLSSetup struct {
|
||||
CA string
|
||||
Key string
|
||||
Cert string
|
||||
CA string // Path to CA certificate file for server verification
|
||||
Key string // Path to client private key file
|
||||
Cert string // Path to client certificate file
|
||||
}
|
||||
|
||||
// Kafka represents a configured Kafka client with TLS support.
|
||||
// It manages connections, brokers, and provides factory methods for readers and writers.
|
||||
// The client handles broker discovery, connection pooling, and TLS configuration automatically.
|
||||
type Kafka struct {
|
||||
tls TLSSetup
|
||||
|
||||
@@ -116,6 +150,19 @@ func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) {
|
||||
return transport, nil
|
||||
}
|
||||
|
||||
// NewKafka creates a new Kafka client with TLS configuration and establishes initial connections.
|
||||
// It performs broker discovery, validates TLS certificates, and prepares the client for creating
|
||||
// readers and writers.
|
||||
//
|
||||
// The function validates TLS configuration, establishes a connection to the bootstrap server,
|
||||
// discovers all available brokers, and configures transport layers for optimal performance.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: Context for connection establishment and timeouts
|
||||
// - tls: TLS configuration with paths to CA, certificate, and key files
|
||||
//
|
||||
// Returns a configured Kafka client ready for creating readers and writers, or an error
|
||||
// if TLS setup fails, connection cannot be established, or broker discovery fails.
|
||||
func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
|
||||
l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)
|
||||
|
||||
@@ -171,6 +218,12 @@ func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
|
||||
return k, nil
|
||||
}
|
||||
|
||||
// NewReader creates a new Kafka reader with the client's broker list and TLS configuration.
|
||||
// The provided config is enhanced with the discovered brokers and configured dialer.
|
||||
// The reader supports automatic offset management, consumer group coordination, and reconnection.
|
||||
//
|
||||
// The caller should configure the reader's Topic, GroupID, and other consumer-specific settings
|
||||
// in the provided config. The client automatically sets Brokers and Dialer fields.
|
||||
func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader {
|
||||
config.Brokers = k.brokerAddrs()
|
||||
config.Dialer = k.dialer
|
||||
@@ -186,6 +239,16 @@ func (k *Kafka) brokerAddrs() []string {
|
||||
return addrs
|
||||
}
|
||||
|
||||
// NewWriter creates a new Kafka writer for the specified topic with optimized configuration.
|
||||
// The writer uses LZ4 compression, least-bytes load balancing, and batching for performance.
|
||||
//
|
||||
// Configuration includes:
|
||||
// - Batch size: 2000 messages for efficient throughput
|
||||
// - Compression: LZ4 for fast compression with good ratios
|
||||
// - Balancer: LeastBytes for optimal partition distribution
|
||||
// - Transport: TLS-configured transport with connection pooling
|
||||
//
|
||||
// The writer is ready for immediate use and handles connection management automatically.
|
||||
func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
|
||||
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
|
||||
w := &kafka.Writer{
|
||||
@@ -202,6 +265,12 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// CheckPartitions verifies that the Kafka connection can read partition metadata.
|
||||
// This method is useful for health checks and connection validation.
|
||||
//
|
||||
// Returns an error if partition metadata cannot be retrieved, which typically
|
||||
// indicates connection problems, authentication failures, or broker unavailability.
|
||||
// Logs a warning if no partitions are available but does not return an error.
|
||||
func (k *Kafka) CheckPartitions() error {
|
||||
partitions, err := k.conn.ReadPartitions()
|
||||
if err != nil {
|
||||
|
108
logger/logger.go
108
logger/logger.go
@@ -1,3 +1,25 @@
|
||||
// Package logger provides structured logging with OpenTelemetry trace integration.
|
||||
//
|
||||
// This package offers multiple logging configurations for different deployment scenarios:
|
||||
// - Text logging to stderr with optional timestamp removal for systemd
|
||||
// - OTLP (OpenTelemetry Protocol) logging for observability pipelines
|
||||
// - Multi-logger setup that outputs to both text and OTLP simultaneously
|
||||
// - Context-aware logging with trace ID correlation
|
||||
//
|
||||
// The package automatically detects systemd environments and adjusts timestamp handling
|
||||
// accordingly. It supports debug level configuration via environment variables and
|
||||
// provides compatibility bridges for legacy logging interfaces.
|
||||
//
|
||||
// Key features:
|
||||
// - Automatic OpenTelemetry trace and span ID inclusion in log entries
|
||||
// - Configurable log levels via DEBUG environment variable (with optional prefix)
|
||||
// - Systemd-compatible output (no timestamps when INVOCATION_ID is present)
|
||||
// - Thread-safe logger setup with sync.Once protection
|
||||
// - Context propagation for request-scoped logging
|
||||
//
|
||||
// Environment variables:
|
||||
// - DEBUG: Enable debug level logging (configurable prefix via ConfigPrefix)
|
||||
// - INVOCATION_ID: Systemd detection for timestamp handling
|
||||
package logger
|
||||
|
||||
import (
|
||||
@@ -13,6 +35,9 @@ import (
|
||||
"go.opentelemetry.io/contrib/bridges/otelslog"
|
||||
)
|
||||
|
||||
// ConfigPrefix allows customizing the environment variable prefix for configuration.
|
||||
// When set, environment variables like DEBUG become {ConfigPrefix}_DEBUG.
|
||||
// This enables multiple services to have independent logging configuration.
|
||||
var ConfigPrefix = ""
|
||||
|
||||
var (
|
||||
@@ -67,10 +92,15 @@ func setupOtlpLogger() *slog.Logger {
|
||||
return otlpLogger
|
||||
}
|
||||
|
||||
// SetupMultiLogger will setup and make default a logger that
|
||||
// logs as described in Setup() as well as an OLTP logger.
|
||||
// The "multi logger" is made the default the first time
|
||||
// this function is called
|
||||
// SetupMultiLogger creates a logger that outputs to both text (stderr) and OTLP simultaneously.
|
||||
// This is useful for services that need both human-readable logs and structured observability data.
|
||||
//
|
||||
// The multi-logger combines:
|
||||
// - Text handler: Stderr output with OpenTelemetry trace integration
|
||||
// - OTLP handler: Structured logs sent via OpenTelemetry Protocol
|
||||
//
|
||||
// On first call, this logger becomes the default logger returned by Setup().
|
||||
// The function is thread-safe and uses sync.Once to ensure single initialization.
|
||||
func SetupMultiLogger() *slog.Logger {
|
||||
setupMulti.Do(func() {
|
||||
textHandler := Setup().Handler()
|
||||
@@ -89,28 +119,38 @@ func SetupMultiLogger() *slog.Logger {
|
||||
return multiLogger
|
||||
}
|
||||
|
||||
// SetupOLTP configures and returns a logger sending logs
|
||||
// via OpenTelemetry (configured via the tracing package).
|
||||
// SetupOLTP creates a logger that sends structured logs via OpenTelemetry Protocol.
|
||||
// This logger is designed for observability pipelines and log aggregation systems.
|
||||
//
|
||||
// This was made to work with Loki + Grafana that makes it
|
||||
// hard to view the log attributes in the UI, so the log
|
||||
// message is formatted similarly to the text logger. The
|
||||
// attributes are duplicated as OLTP attributes in the
|
||||
// log messages. https://github.com/grafana/loki/issues/14788
|
||||
// The OTLP logger formats log messages similarly to the text logger for better
|
||||
// compatibility with Loki + Grafana, while still providing structured attributes.
|
||||
// Log attributes are available both in the message format and as OTLP attributes.
|
||||
//
|
||||
// This logger does not become the default logger and must be used explicitly.
|
||||
// It requires OpenTelemetry tracing configuration to be set up via the tracing package.
|
||||
//
|
||||
// See: https://github.com/grafana/loki/issues/14788 for formatting rationale.
|
||||
func SetupOLTP() *slog.Logger {
|
||||
return setupOtlpLogger()
|
||||
}
|
||||
|
||||
// Setup returns an slog.Logger configured for text formatting
|
||||
// to stderr.
|
||||
// OpenTelemetry trace_id and span_id's are logged as attributes
|
||||
// when available.
|
||||
// When the application is running under systemd timestamps are
|
||||
// omitted. On first call the slog default logger is set to this
|
||||
// logger as well.
|
||||
// Setup creates and returns the standard text logger for the application.
|
||||
// This is the primary logging function that most applications should use.
|
||||
//
|
||||
// If SetupMultiLogger has been called Setup() will return
|
||||
// the "multi logger"
|
||||
// Features:
|
||||
// - Text formatting to stderr with human-readable output
|
||||
// - Automatic OpenTelemetry trace_id and span_id inclusion when available
|
||||
// - Systemd compatibility: omits timestamps when INVOCATION_ID environment variable is present
|
||||
// - Debug level support via DEBUG environment variable (respects ConfigPrefix)
|
||||
// - Thread-safe initialization with sync.Once
|
||||
//
|
||||
// On first call, this logger becomes the slog default logger. If SetupMultiLogger()
|
||||
// has been called previously, Setup() returns the multi-logger instead of the text logger.
|
||||
//
|
||||
// The logger automatically detects execution context:
|
||||
// - Systemd: Removes timestamps (systemd adds its own)
|
||||
// - Debug mode: Enables debug level logging based on environment variables
|
||||
// - OpenTelemetry: Includes trace correlation when tracing is active
|
||||
func Setup() *slog.Logger {
|
||||
setupText.Do(func() {
|
||||
h := setupStdErrHandler()
|
||||
@@ -129,15 +169,33 @@ func Setup() *slog.Logger {
|
||||
|
||||
type loggerKey struct{}
|
||||
|
||||
// NewContext adds the logger to the context. Use this
|
||||
// to for example make a request specific logger available
|
||||
// to other functions through the context
|
||||
// NewContext stores a logger in the context for request-scoped logging.
|
||||
// This enables passing request-specific loggers (e.g., with request IDs,
|
||||
// user context, or other correlation data) through the call stack.
|
||||
//
|
||||
// Use this to create context-aware logging where different parts of the
|
||||
// application can access the same enriched logger instance.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// logger := slog.With("request_id", requestID)
|
||||
// ctx := logger.NewContext(ctx, logger)
|
||||
// // Pass ctx to downstream functions
|
||||
func NewContext(ctx context.Context, l *slog.Logger) context.Context {
|
||||
return context.WithValue(ctx, loggerKey{}, l)
|
||||
}
|
||||
|
||||
// FromContext retrieves a logger from the context. If there is none,
|
||||
// it returns the default logger
|
||||
// FromContext retrieves a logger from the context.
|
||||
// If no logger is stored in the context, it returns the default logger from Setup().
|
||||
//
|
||||
// This function provides a safe way to access context-scoped loggers without
|
||||
// needing to check for nil values. It ensures that logging is always available,
|
||||
// falling back to the application's standard logger configuration.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// log := logger.FromContext(ctx)
|
||||
// log.Info("processing request") // Uses context logger or default
|
||||
func FromContext(ctx context.Context) *slog.Logger {
|
||||
if l, ok := ctx.Value(loggerKey{}).(*slog.Logger); ok {
|
||||
return l
|
||||
|
@@ -5,12 +5,24 @@ import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// stdLoggerish provides a bridge between legacy log interfaces and slog.
|
||||
// It implements common logging methods (Println, Printf, Fatalf) that
|
||||
// delegate to structured logging with a consistent key prefix.
|
||||
type stdLoggerish struct {
|
||||
key string
|
||||
log *slog.Logger
|
||||
f func(string, ...any)
|
||||
key string // Prefix key for all log messages
|
||||
log *slog.Logger // Underlying structured logger
|
||||
f func(string, ...any) // Log function (Info or Debug level)
|
||||
}
|
||||
|
||||
// NewStdLog creates a legacy-compatible logger that bridges to structured logging.
|
||||
// This is useful for third-party libraries that expect a standard log.Logger interface.
|
||||
//
|
||||
// Parameters:
|
||||
// - key: Prefix added to all log messages for identification
|
||||
// - debug: If true, logs at debug level; otherwise logs at info level
|
||||
// - log: Underlying slog.Logger (uses Setup() if nil)
|
||||
//
|
||||
// The returned logger implements Println, Printf, and Fatalf methods.
|
||||
func NewStdLog(key string, debug bool, log *slog.Logger) *stdLoggerish {
|
||||
if log == nil {
|
||||
log = Setup()
|
||||
@@ -27,14 +39,18 @@ func NewStdLog(key string, debug bool, log *slog.Logger) *stdLoggerish {
|
||||
return sl
|
||||
}
|
||||
|
||||
// Println logs the arguments using the configured log level with the instance key.
|
||||
func (l stdLoggerish) Println(msg ...any) {
|
||||
l.f(l.key, "msg", msg)
|
||||
}
|
||||
|
||||
// Printf logs a formatted message using the configured log level with the instance key.
|
||||
func (l stdLoggerish) Printf(msg string, args ...any) {
|
||||
l.f(l.key, "msg", fmt.Sprintf(msg, args...))
|
||||
}
|
||||
|
||||
// Fatalf logs a formatted error message and panics.
|
||||
// Note: This implementation panics instead of calling os.Exit for testability.
|
||||
func (l stdLoggerish) Fatalf(msg string, args ...any) {
|
||||
l.log.Error(l.key, "msg", fmt.Sprintf(msg, args...))
|
||||
panic("fatal error") // todo: does this make sense at all?
|
||||
|
@@ -1,17 +0,0 @@
|
||||
package logger
|
||||
|
||||
type Error struct {
|
||||
Msg string
|
||||
Data []any
|
||||
}
|
||||
|
||||
func NewError(msg string, data ...any) *Error {
|
||||
return &Error{
|
||||
Msg: msg,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Error) Error() string {
|
||||
return "not implemented"
|
||||
}
|
@@ -15,7 +15,7 @@ mkdir -p $DIR
|
||||
|
||||
BASE=https://geodns.bitnames.com/${BASE}/builds/${BUILD}
|
||||
|
||||
files=`curl -sSf ${BASE}/checksums.txt | awk '{print $2}'`
|
||||
files=`curl -sSf ${BASE}/checksums.txt | sed 's/^[a-f0-9]*[[:space:]]*//'`
|
||||
metafiles="checksums.txt metadata.json CHANGELOG.md artifacts.json"
|
||||
|
||||
for f in $metafiles; do
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
go install github.com/goreleaser/goreleaser/v2@v2.8.2
|
||||
go install github.com/goreleaser/goreleaser/v2@v2.11.0
|
||||
|
||||
if [ ! -z "${harbor_username:-}" ]; then
|
||||
DOCKER_FILE=~/.docker/config.json
|
||||
|
@@ -1,3 +1,36 @@
|
||||
// Package tracing provides OpenTelemetry distributed tracing setup with OTLP export support.
|
||||
//
|
||||
// This package handles the complete OpenTelemetry SDK initialization including:
|
||||
// - Trace provider configuration with batching and resource detection
|
||||
// - Log provider setup for structured log export via OTLP
|
||||
// - Automatic resource discovery (service name, version, host, container, process info)
|
||||
// - Support for both gRPC and HTTP OTLP exporters with TLS configuration
|
||||
// - Propagation context setup for distributed tracing across services
|
||||
// - Graceful shutdown handling for all telemetry components
|
||||
//
|
||||
// The package supports various deployment scenarios:
|
||||
// - Development: Local OTLP collectors or observability backends
|
||||
// - Production: Secure OTLP export with mutual TLS authentication
|
||||
// - Container environments: Automatic container and Kubernetes resource detection
|
||||
//
|
||||
// Configuration is primarily handled via standard OpenTelemetry environment variables:
|
||||
// - OTEL_SERVICE_NAME: Service identification
|
||||
// - OTEL_EXPORTER_OTLP_PROTOCOL: Protocol selection (grpc, http/protobuf)
|
||||
// - OTEL_TRACES_EXPORTER: Exporter type (otlp, autoexport)
|
||||
// - OTEL_RESOURCE_ATTRIBUTES: Additional resource attributes
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// cfg := &tracing.TracerConfig{
|
||||
// ServiceName: "my-service",
|
||||
// Environment: "production",
|
||||
// Endpoint: "https://otlp.example.com:4317",
|
||||
// }
|
||||
// shutdown, err := tracing.InitTracer(ctx, cfg)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// defer shutdown(ctx)
|
||||
package tracing
|
||||
|
||||
// todo, review:
|
||||
@@ -43,34 +76,68 @@ var errInvalidOTLPProtocol = errors.New("invalid OTLP protocol - should be one o
|
||||
|
||||
// https://github.com/open-telemetry/opentelemetry-go/blob/main/exporters/otlp/otlptrace/otlptracehttp/example_test.go
|
||||
|
||||
// TpShutdownFunc represents a function that gracefully shuts down telemetry providers.
|
||||
// It should be called during application shutdown to ensure all telemetry data is flushed
|
||||
// and exporters are properly closed. The context can be used to set shutdown timeouts.
|
||||
type TpShutdownFunc func(ctx context.Context) error
|
||||
|
||||
// Tracer returns the configured OpenTelemetry tracer for the NTP Pool project.
|
||||
// This tracer should be used for creating spans and distributed tracing throughout
|
||||
// the application. It uses the global tracer provider set up by InitTracer/SetupSDK.
|
||||
func Tracer() trace.Tracer {
|
||||
traceProvider := otel.GetTracerProvider()
|
||||
return traceProvider.Tracer("ntppool-tracer")
|
||||
}
|
||||
|
||||
// Start creates a new span with the given name and options using the configured tracer.
|
||||
// This is a convenience function that wraps the standard OpenTelemetry span creation.
|
||||
// It returns a new context containing the span and the span itself for further configuration.
|
||||
//
|
||||
// The returned context should be used for downstream operations to maintain trace correlation.
|
||||
func Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||
return Tracer().Start(ctx, spanName, opts...)
|
||||
}
|
||||
|
||||
// GetClientCertificate defines a function type for providing client certificates for mutual TLS.
|
||||
// This is used when exporting telemetry data to secured OTLP endpoints that require
|
||||
// client certificate authentication.
|
||||
type GetClientCertificate func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
|
||||
|
||||
// TracerConfig provides configuration options for OpenTelemetry tracing setup.
|
||||
// It supplements standard OpenTelemetry environment variables with additional
|
||||
// NTP Pool-specific configuration including TLS settings for secure OTLP export.
|
||||
type TracerConfig struct {
|
||||
ServiceName string
|
||||
Environment string
|
||||
Endpoint string
|
||||
EndpointURL string
|
||||
ServiceName string // Service name for resource identification (overrides OTEL_SERVICE_NAME)
|
||||
Environment string // Deployment environment (development, staging, production)
|
||||
Endpoint string // OTLP endpoint hostname/port (e.g., "otlp.example.com:4317")
|
||||
EndpointURL string // Complete OTLP endpoint URL (e.g., "https://otlp.example.com:4317/v1/traces")
|
||||
|
||||
CertificateProvider GetClientCertificate
|
||||
RootCAs *x509.CertPool
|
||||
CertificateProvider GetClientCertificate // Client certificate provider for mutual TLS
|
||||
RootCAs *x509.CertPool // CA certificate pool for server verification
|
||||
}
|
||||
|
||||
// InitTracer initializes the OpenTelemetry SDK with the provided configuration.
|
||||
// This is the main entry point for setting up distributed tracing in applications.
|
||||
//
|
||||
// The function configures trace and log providers, sets up OTLP exporters,
|
||||
// and returns a shutdown function that must be called during application termination.
|
||||
//
|
||||
// Returns a shutdown function and an error. The shutdown function should be called
|
||||
// with a context that has an appropriate timeout for graceful shutdown.
|
||||
func InitTracer(ctx context.Context, cfg *TracerConfig) (TpShutdownFunc, error) {
|
||||
// todo: setup environment from cfg
|
||||
return SetupSDK(ctx, cfg)
|
||||
}
|
||||
|
||||
// SetupSDK performs the complete OpenTelemetry SDK initialization including resource
|
||||
// discovery, exporter configuration, provider setup, and shutdown function creation.
|
||||
//
|
||||
// The function automatically discovers system resources (service info, host, container,
|
||||
// process details) and configures both trace and log exporters. It supports multiple
|
||||
// OTLP protocols (gRPC, HTTP) and handles TLS configuration for secure deployments.
|
||||
//
|
||||
// The returned shutdown function coordinates graceful shutdown of all telemetry
|
||||
// components in the reverse order of their initialization.
|
||||
func SetupSDK(ctx context.Context, cfg *TracerConfig) (shutdown TpShutdownFunc, err error) {
|
||||
if cfg == nil {
|
||||
cfg = &TracerConfig{}
|
||||
|
@@ -1,3 +1,17 @@
|
||||
// Package types provides shared data structures for the NTP Pool project.
|
||||
//
|
||||
// This package contains common types used across different NTP Pool services
|
||||
// for data exchange, logging, and database operations. The types are designed
|
||||
// to support JSON serialization for API responses and SQL database storage
|
||||
// with automatic marshaling/unmarshaling.
|
||||
//
|
||||
// Current types include:
|
||||
// - LogScoreAttributes: NTP server scoring metadata for monitoring and analysis
|
||||
//
|
||||
// All types implement appropriate interfaces for:
|
||||
// - JSON serialization (json.Marshaler/json.Unmarshaler)
|
||||
// - SQL database storage (database/sql/driver.Valuer/sql.Scanner)
|
||||
// - String representation for logging and debugging
|
||||
package types
|
||||
|
||||
import (
|
||||
@@ -6,17 +20,26 @@ import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
// LogScoreAttributes contains metadata about NTP server scoring and monitoring results.
|
||||
// This structure captures both NTP protocol-specific information (leap, stratum) and
|
||||
// operational data (errors, warnings, response status) for analysis and alerting.
|
||||
//
|
||||
// The type supports JSON serialization for API responses and database storage
|
||||
// via the database/sql/driver interfaces. Fields use omitempty tags to minimize
|
||||
// JSON payload size when values are at their zero state.
|
||||
type LogScoreAttributes struct {
|
||||
Leap int8 `json:"leap,omitempty"`
|
||||
Stratum int8 `json:"stratum,omitempty"`
|
||||
NoResponse bool `json:"no_response,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
Warning string `json:"warning,omitempty"`
|
||||
Leap int8 `json:"leap,omitempty"` // NTP leap indicator (0=no warning, 1=+1s, 2=-1s, 3=unsynchronized)
|
||||
Stratum int8 `json:"stratum,omitempty"` // NTP stratum level (1=primary, 2-15=secondary, 16=unsynchronized)
|
||||
NoResponse bool `json:"no_response,omitempty"` // True if server failed to respond to NTP queries
|
||||
Error string `json:"error,omitempty"` // Error message if scoring failed
|
||||
Warning string `json:"warning,omitempty"` // Warning message for non-fatal issues
|
||||
|
||||
FromLSID int `json:"from_ls_id,omitempty"`
|
||||
FromSSID int `json:"from_ss_id,omitempty"`
|
||||
FromLSID int `json:"from_ls_id,omitempty"` // Source log server ID for traceability
|
||||
FromSSID int `json:"from_ss_id,omitempty"` // Source scoring system ID for traceability
|
||||
}
|
||||
|
||||
// String returns a JSON representation of the LogScoreAttributes for logging and debugging.
|
||||
// Returns an empty string if JSON marshaling fails.
|
||||
func (lsa *LogScoreAttributes) String() string {
|
||||
b, err := json.Marshal(lsa)
|
||||
if err != nil {
|
||||
@@ -25,10 +48,17 @@ func (lsa *LogScoreAttributes) String() string {
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// Value implements the database/sql/driver.Valuer interface for database storage.
|
||||
// It serializes the LogScoreAttributes to JSON for storage in SQL databases.
|
||||
// Returns the JSON bytes or an error if marshaling fails.
|
||||
func (lsa *LogScoreAttributes) Value() (driver.Value, error) {
|
||||
return json.Marshal(lsa)
|
||||
}
|
||||
|
||||
// Scan implements the database/sql.Scanner interface for reading from SQL databases.
|
||||
// It deserializes JSON data from the database back into LogScoreAttributes.
|
||||
// Supports both []byte and string input types, with nil values treated as no-op.
|
||||
// Returns an error if the input type is unsupported or JSON unmarshaling fails.
|
||||
func (lsa *LogScoreAttributes) Scan(value any) error {
|
||||
var source []byte
|
||||
_t := LogScoreAttributes{}
|
||||
|
@@ -1,3 +1,27 @@
|
||||
// Package fastlyxff provides Fastly CDN IP range management for trusted proxy handling.
|
||||
//
|
||||
// This package parses Fastly's public IP ranges JSON file and generates Echo framework
|
||||
// trust options for proper client IP extraction from X-Forwarded-For headers.
|
||||
// It's designed specifically for services deployed behind Fastly's CDN that need
|
||||
// to identify real client IPs for logging, rate limiting, and security purposes.
|
||||
//
|
||||
// Fastly publishes their edge server IP ranges in a JSON format that this package
|
||||
// consumes to automatically configure trusted proxy ranges. This ensures that
|
||||
// X-Forwarded-For headers are only trusted when they originate from legitimate
|
||||
// Fastly edge servers.
|
||||
//
|
||||
// Key features:
|
||||
// - Automatic parsing of Fastly's IP ranges JSON format
|
||||
// - Support for both IPv4 and IPv6 address ranges
|
||||
// - Echo framework integration via TrustOption generation
|
||||
// - CIDR notation parsing and validation
|
||||
//
|
||||
// The JSON file typically contains IP ranges in this format:
|
||||
//
|
||||
// {
|
||||
// "addresses": ["23.235.32.0/20", "43.249.72.0/22", ...],
|
||||
// "ipv6_addresses": ["2a04:4e40::/32", "2a04:4e42::/32", ...]
|
||||
// }
|
||||
package fastlyxff
|
||||
|
||||
import (
|
||||
@@ -9,15 +33,29 @@ import (
|
||||
"github.com/labstack/echo/v4"
|
||||
)
|
||||
|
||||
// FastlyXFF represents Fastly's published IP ranges for their CDN edge servers.
|
||||
// This structure matches the JSON format provided by Fastly for their public IP ranges.
|
||||
// It contains separate lists for IPv4 and IPv6 CIDR ranges.
|
||||
type FastlyXFF struct {
|
||||
IPv4 []string `json:"addresses"`
|
||||
IPv6 []string `json:"ipv6_addresses"`
|
||||
IPv4 []string `json:"addresses"` // IPv4 CIDR ranges (e.g., "23.235.32.0/20")
|
||||
IPv6 []string `json:"ipv6_addresses"` // IPv6 CIDR ranges (e.g., "2a04:4e40::/32")
|
||||
}
|
||||
|
||||
// TrustedNets holds parsed network prefixes for efficient IP range checking.
|
||||
// This type is currently unused but reserved for future optimizations
|
||||
// where frequent IP range lookups might benefit from pre-parsed prefixes.
|
||||
type TrustedNets struct {
|
||||
prefixes []netip.Prefix
|
||||
prefixes []netip.Prefix // Parsed network prefixes for efficient lookups
|
||||
}
|
||||
|
||||
// New loads and parses Fastly IP ranges from a JSON file.
|
||||
// The file should contain Fastly's published IP ranges in their standard JSON format.
|
||||
//
|
||||
// Parameters:
|
||||
// - fileName: Path to the Fastly IP ranges JSON file
|
||||
//
|
||||
// Returns the parsed FastlyXFF structure or an error if the file cannot be
|
||||
// read or the JSON format is invalid.
|
||||
func New(fileName string) (*FastlyXFF, error) {
|
||||
b, err := os.ReadFile(fileName)
|
||||
if err != nil {
|
||||
@@ -34,6 +72,19 @@ func New(fileName string) (*FastlyXFF, error) {
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
// EchoTrustOption converts Fastly IP ranges into Echo framework trust options.
|
||||
// This method generates trust configurations that tell Echo to accept X-Forwarded-For
|
||||
// headers only from Fastly's edge servers, ensuring accurate client IP extraction.
|
||||
//
|
||||
// The generated trust options should be used with Echo's IP extractor:
|
||||
//
|
||||
// options, err := fastlyRanges.EchoTrustOption()
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// e.IPExtractor = echo.ExtractIPFromXFFHeader(options...)
|
||||
//
|
||||
// Returns a slice of Echo trust options or an error if any CIDR range cannot be parsed.
|
||||
func (xff *FastlyXFF) EchoTrustOption() ([]echo.TrustOption, error) {
|
||||
ranges := []echo.TrustOption{}
|
||||
|
||||
|
Reference in New Issue
Block a user