database: create shared database package with configurable patterns
Extract ~200 lines of duplicate database connection code from api/ntpdb/ and monitor/ntpdb/ into common/database/ package. Creates foundation for database consolidation while maintaining zero breaking changes. Files added: - config.go: Unified configuration with package-specific defaults - connector.go: Dynamic connector pattern from Boostport - pool.go: Configurable connection pool management - metrics.go: Optional Prometheus metrics integration - interfaces.go: Shared database interfaces for consistent patterns Key features: - Configuration-driven approach (API: 25/10 connections + metrics, Monitor: 10/5 connections, no metrics) - Optional Prometheus metrics when registerer provided - Backward compatibility via convenience functions - Flexible config file loading (explicit paths + search-based) Dependencies: Added mysql driver and yaml parsing for database configuration.
This commit is contained in:
61
database/config.go
Normal file
61
database/config.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Config represents the database configuration structure
|
||||
type Config struct {
|
||||
MySQL DBConfig `yaml:"mysql"`
|
||||
}
|
||||
|
||||
// DBConfig represents the MySQL database configuration
|
||||
type DBConfig struct {
|
||||
DSN string `default:"" flag:"dsn" usage:"Database DSN"`
|
||||
User string `default:"" flag:"user"`
|
||||
Pass string `default:"" flag:"pass"`
|
||||
DBName string // Optional database name override
|
||||
}
|
||||
|
||||
// ConfigOptions allows customization of database opening behavior
|
||||
type ConfigOptions struct {
|
||||
// ConfigFiles is a list of config file paths to search for database configuration
|
||||
ConfigFiles []string
|
||||
|
||||
// EnablePoolMonitoring enables connection pool metrics collection
|
||||
EnablePoolMonitoring bool
|
||||
|
||||
// PrometheusRegisterer for metrics collection. If nil, no metrics are collected.
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
|
||||
// Connection pool settings
|
||||
MaxOpenConns int
|
||||
MaxIdleConns int
|
||||
ConnMaxLifetime time.Duration
|
||||
}
|
||||
|
||||
// DefaultConfigOptions returns the standard configuration options used by API package
|
||||
func DefaultConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: true,
|
||||
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
||||
MaxOpenConns: 25,
|
||||
MaxIdleConns: 10,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
||||
func MonitorConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
||||
PrometheusRegisterer: nil, // No Prometheus dependency
|
||||
MaxOpenConns: 10,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: 3 * time.Minute,
|
||||
}
|
||||
}
|
88
database/connector.go
Normal file
88
database/connector.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// from https://github.com/Boostport/dynamic-database-config
|
||||
|
||||
// CreateConnectorFunc is a function that creates a database connector
|
||||
type CreateConnectorFunc func() (driver.Connector, error)
|
||||
|
||||
// Driver implements the sql/driver interface with dynamic configuration
|
||||
type Driver struct {
|
||||
CreateConnectorFunc CreateConnectorFunc
|
||||
}
|
||||
|
||||
// Driver returns the driver instance
|
||||
func (d Driver) Driver() driver.Driver {
|
||||
return d
|
||||
}
|
||||
|
||||
// Connect creates a new database connection using the dynamic connector
|
||||
func (d Driver) Connect(ctx context.Context) (driver.Conn, error) {
|
||||
connector, err := d.CreateConnectorFunc()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating connector from function: %w", err)
|
||||
}
|
||||
|
||||
return connector.Connect(ctx)
|
||||
}
|
||||
|
||||
// Open is not supported for dynamic configuration
|
||||
func (d Driver) Open(name string) (driver.Conn, error) {
|
||||
return nil, errors.New("open is not supported")
|
||||
}
|
||||
|
||||
// createConnector creates a connector function that reads configuration from a file
|
||||
func createConnector(configFile string) CreateConnectorFunc {
|
||||
return func() (driver.Connector, error) {
|
||||
dbFile, err := os.Open(configFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer dbFile.Close()
|
||||
|
||||
dec := yaml.NewDecoder(dbFile)
|
||||
cfg := Config{}
|
||||
|
||||
err = dec.Decode(&cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dsn := cfg.MySQL.DSN
|
||||
if len(dsn) == 0 {
|
||||
dsn = os.Getenv("DATABASE_DSN")
|
||||
if len(dsn) == 0 {
|
||||
return nil, fmt.Errorf("dsn config in database.yaml or DATABASE_DSN environment variable required")
|
||||
}
|
||||
}
|
||||
|
||||
dbcfg, err := mysql.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if user := cfg.MySQL.User; len(user) > 0 {
|
||||
dbcfg.User = user
|
||||
}
|
||||
|
||||
if pass := cfg.MySQL.Pass; len(pass) > 0 {
|
||||
dbcfg.Passwd = pass
|
||||
}
|
||||
|
||||
if name := cfg.MySQL.DBName; len(name) > 0 {
|
||||
dbcfg.DBName = name
|
||||
}
|
||||
|
||||
return mysql.NewConnector(dbcfg)
|
||||
}
|
||||
}
|
34
database/interfaces.go
Normal file
34
database/interfaces.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// DBTX matches the interface expected by SQLC-generated code
|
||||
// This interface is implemented by both *sql.DB and *sql.Tx
|
||||
type DBTX interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
// BaseQuerier provides basic query functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerier interface {
|
||||
WithTx(tx *sql.Tx) BaseQuerier
|
||||
}
|
||||
|
||||
// BaseQuerierTx provides transaction functionality
|
||||
// This interface should be implemented by package-specific Queries types
|
||||
type BaseQuerierTx interface {
|
||||
BaseQuerier
|
||||
Begin(ctx context.Context) (BaseQuerierTx, error)
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
}
|
||||
|
||||
// TransactionFunc represents a function that operates within a database transaction
|
||||
// This is used by the shared transaction helpers in transaction.go
|
||||
type TransactionFunc[Q any] func(ctx context.Context, q Q) error
|
93
database/metrics.go
Normal file
93
database/metrics.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
|
||||
type DatabaseMetrics struct {
|
||||
ConnectionsOpen prometheus.Gauge
|
||||
ConnectionsIdle prometheus.Gauge
|
||||
ConnectionsInUse prometheus.Gauge
|
||||
ConnectionsWaitCount prometheus.Counter
|
||||
ConnectionsWaitDuration prometheus.Histogram
|
||||
}
|
||||
|
||||
// NewDatabaseMetrics creates a new set of database metrics and registers them
|
||||
func NewDatabaseMetrics(registerer prometheus.Registerer) *DatabaseMetrics {
|
||||
metrics := &DatabaseMetrics{
|
||||
ConnectionsOpen: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_open",
|
||||
Help: "Number of open database connections",
|
||||
}),
|
||||
ConnectionsIdle: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_idle",
|
||||
Help: "Number of idle database connections",
|
||||
}),
|
||||
ConnectionsInUse: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "database_connections_in_use",
|
||||
Help: "Number of database connections in use",
|
||||
}),
|
||||
ConnectionsWaitCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "database_connections_wait_count_total",
|
||||
Help: "Total number of times a connection had to wait",
|
||||
}),
|
||||
ConnectionsWaitDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "database_connections_wait_duration_seconds",
|
||||
Help: "Time spent waiting for a database connection",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
}),
|
||||
}
|
||||
|
||||
if registerer != nil {
|
||||
registerer.MustRegister(
|
||||
metrics.ConnectionsOpen,
|
||||
metrics.ConnectionsIdle,
|
||||
metrics.ConnectionsInUse,
|
||||
metrics.ConnectionsWaitCount,
|
||||
metrics.ConnectionsWaitDuration,
|
||||
)
|
||||
}
|
||||
|
||||
return metrics
|
||||
}
|
||||
|
||||
// monitorConnectionPool runs a background goroutine to collect connection pool metrics
|
||||
func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheus.Registerer) {
|
||||
if registerer == nil {
|
||||
return // No metrics collection if no registerer provided
|
||||
}
|
||||
|
||||
metrics := NewDatabaseMetrics(registerer)
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
stats := db.Stats()
|
||||
|
||||
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
|
||||
metrics.ConnectionsIdle.Set(float64(stats.Idle))
|
||||
metrics.ConnectionsInUse.Set(float64(stats.InUse))
|
||||
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
|
||||
|
||||
if stats.WaitDuration > 0 {
|
||||
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds())
|
||||
}
|
||||
|
||||
// Log connection pool stats for high usage or waiting
|
||||
if stats.OpenConnections > 20 || stats.WaitCount > 0 {
|
||||
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n",
|
||||
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
78
database/pool.go
Normal file
78
database/pool.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"go.ntppool.org/common/logger"
|
||||
)
|
||||
|
||||
// OpenDB opens a database connection with the specified configuration options
|
||||
func OpenDB(ctx context.Context, options ConfigOptions) (*sql.DB, error) {
|
||||
log := logger.Setup()
|
||||
|
||||
configFile, err := findConfigFile(options.ConfigFiles)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbconn := sql.OpenDB(Driver{
|
||||
CreateConnectorFunc: createConnector(configFile),
|
||||
})
|
||||
|
||||
// Set connection pool parameters
|
||||
dbconn.SetConnMaxLifetime(options.ConnMaxLifetime)
|
||||
dbconn.SetMaxOpenConns(options.MaxOpenConns)
|
||||
dbconn.SetMaxIdleConns(options.MaxIdleConns)
|
||||
|
||||
err = dbconn.Ping()
|
||||
if err != nil {
|
||||
log.Error("could not connect to database", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start optional connection pool monitoring
|
||||
if options.EnablePoolMonitoring && options.PrometheusRegisterer != nil {
|
||||
go monitorConnectionPool(ctx, dbconn, options.PrometheusRegisterer)
|
||||
}
|
||||
|
||||
return dbconn, nil
|
||||
}
|
||||
|
||||
// OpenDBWithConfigFile opens a database connection using an explicit config file path
|
||||
// This is a convenience function for API package compatibility
|
||||
func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, error) {
|
||||
options := DefaultConfigOptions()
|
||||
options.ConfigFiles = []string{configFile}
|
||||
return OpenDB(ctx, options)
|
||||
}
|
||||
|
||||
// OpenDBMonitor opens a database connection with monitor-specific defaults
|
||||
// This is a convenience function for Monitor package compatibility
|
||||
func OpenDBMonitor() (*sql.DB, error) {
|
||||
options := MonitorConfigOptions()
|
||||
return OpenDB(context.Background(), options)
|
||||
}
|
||||
|
||||
// findConfigFile searches for the first existing config file from the list
|
||||
func findConfigFile(configFiles []string) (string, error) {
|
||||
var firstErr error
|
||||
|
||||
for _, configFile := range configFiles {
|
||||
if configFile == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := os.Stat(configFile); err == nil {
|
||||
return configFile, nil
|
||||
} else if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
if firstErr != nil {
|
||||
return "", fmt.Errorf("no config file found: %w", firstErr)
|
||||
}
|
||||
return "", fmt.Errorf("no valid config files provided")
|
||||
}
|
Reference in New Issue
Block a user