fix(database): correct metrics and improve error handling

- Fix metrics double-counting: track deltas for WaitCount/WaitDuration
  instead of adding cumulative values each tick
- Replace fmt.Printf with structured logging in pool monitor
- Add PoolOptions validation (MaxConns > 0, MinConns >= 0)
- Warn when DATABASE_URI overrides non-default PoolOptions
- Improve findAndParseConfig to report all tried files and errors
- Remove dead code in pgdb/config.go (unreachable host default)
- Fix errcheck lint issues for file.Close() calls
- Add context parameter to OpenDBMonitor() (breaking change)
This commit is contained in:
2025-11-29 12:56:49 -08:00
parent 283d3936f6
commit 94b718a925
6 changed files with 70 additions and 34 deletions

View File

@@ -50,7 +50,7 @@ func createConnector(configFile string) CreateConnectorFunc {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer dbFile.Close() defer func() { _ = dbFile.Close() }()
dec := yaml.NewDecoder(dbFile) dec := yaml.NewDecoder(dbFile)
cfg := Config{} cfg := Config{}

View File

@@ -7,9 +7,7 @@ import (
) )
// Mock types for testing SQLC integration patterns // Mock types for testing SQLC integration patterns
type mockQueries struct { type mockQueries struct{}
db DBTX
}
type mockQueriesTx struct { type mockQueriesTx struct {
*mockQueries *mockQueries
@@ -58,7 +56,7 @@ func TestSQLCIntegration(t *testing.T) {
// Verify our DB interface is compatible with what SQLC expects // Verify our DB interface is compatible with what SQLC expects
var dbInterface DB[*mockQueriesTx] var dbInterface DB[*mockQueriesTx]
var mockDB *mockQueries = &mockQueries{} mockDB := &mockQueries{}
dbInterface = mockDB dbInterface = mockDB
// Test that our transaction helper can work with this pattern // Test that our transaction helper can work with this pattern

View File

@@ -3,10 +3,10 @@ package database
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.ntppool.org/common/logger"
) )
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring // DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
@@ -16,6 +16,10 @@ type DatabaseMetrics struct {
ConnectionsInUse prometheus.Gauge ConnectionsInUse prometheus.Gauge
ConnectionsWaitCount prometheus.Counter ConnectionsWaitCount prometheus.Counter
ConnectionsWaitDuration prometheus.Histogram ConnectionsWaitDuration prometheus.Histogram
// Track last values for delta calculation (cumulative stats from sql.DBStats)
lastWaitCount int64
lastWaitDuration time.Duration
} }
// NewDatabaseMetrics creates a new set of database metrics and registers them // NewDatabaseMetrics creates a new set of database metrics and registers them
@@ -67,26 +71,44 @@ func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheu
ticker := time.NewTicker(30 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop() defer ticker.Stop()
log := logger.FromContext(ctx)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.InfoContext(ctx, "database connection pool monitor stopped")
return return
case <-ticker.C: case <-ticker.C:
stats := db.Stats() stats := db.Stats()
// Update gauge metrics (current state)
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections)) metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
metrics.ConnectionsIdle.Set(float64(stats.Idle)) metrics.ConnectionsIdle.Set(float64(stats.Idle))
metrics.ConnectionsInUse.Set(float64(stats.InUse)) metrics.ConnectionsInUse.Set(float64(stats.InUse))
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
if stats.WaitDuration > 0 { // Update counter with delta (WaitCount is cumulative in sql.DBStats)
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds()) waitCountDelta := stats.WaitCount - metrics.lastWaitCount
if waitCountDelta > 0 {
metrics.ConnectionsWaitCount.Add(float64(waitCountDelta))
metrics.lastWaitCount = stats.WaitCount
}
// Update histogram with delta (WaitDuration is cumulative in sql.DBStats)
waitDurationDelta := stats.WaitDuration - metrics.lastWaitDuration
if waitDurationDelta > 0 {
metrics.ConnectionsWaitDuration.Observe(waitDurationDelta.Seconds())
metrics.lastWaitDuration = stats.WaitDuration
} }
// Log connection pool stats for high usage or waiting // Log connection pool stats for high usage or waiting
if stats.OpenConnections > 20 || stats.WaitCount > 0 { if stats.OpenConnections > 20 || waitCountDelta > 0 {
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n", log.WarnContext(ctx, "high database connection usage",
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration) "open", stats.OpenConnections,
"idle", stats.Idle,
"in_use", stats.InUse,
"wait_count", stats.WaitCount,
"wait_duration", stats.WaitDuration,
)
} }
} }
} }

View File

@@ -27,15 +27,10 @@ func CreatePoolConfig(cfg *database.PostgresConfig) (*pgxpool.Config, error) {
"require": true, "verify-ca": true, "verify-full": true, "require": true, "verify-ca": true, "verify-full": true,
} }
if cfg.SSLMode != "" && !validSSLModes[cfg.SSLMode] { if cfg.SSLMode != "" && !validSSLModes[cfg.SSLMode] {
return nil, fmt.Errorf("postgres: invalid sslmode: %s", cfg.SSLMode) return nil, fmt.Errorf("postgres: invalid sslmode: %s (valid: disable, allow, prefer, require, verify-ca, verify-full)", cfg.SSLMode)
}
// Set defaults
host := cfg.Host
if host == "" {
host = "localhost"
} }
// Apply defaults for optional fields (host is validated as required above)
port := cfg.Port port := cfg.Port
if port == 0 { if port == 0 {
port = 5432 port = 5432
@@ -49,7 +44,7 @@ func CreatePoolConfig(cfg *database.PostgresConfig) (*pgxpool.Config, error) {
// Build connection string WITHOUT password // Build connection string WITHOUT password
// We'll set the password separately in the config // We'll set the password separately in the config
connString := fmt.Sprintf("host=%s port=%d user=%s dbname=%s sslmode=%s", connString := fmt.Sprintf("host=%s port=%d user=%s dbname=%s sslmode=%s",
host, port, cfg.User, cfg.Name, sslmode) cfg.Host, port, cfg.User, cfg.Name, sslmode)
// Parse the connection string // Parse the connection string
poolConfig, err := pgxpool.ParseConfig(connString) poolConfig, err := pgxpool.ParseConfig(connString)

View File

@@ -2,12 +2,14 @@ package pgdb
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"time" "time"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"go.ntppool.org/common/database" "go.ntppool.org/common/database"
"go.ntppool.org/common/logger"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@@ -61,6 +63,19 @@ func DefaultPoolOptions() PoolOptions {
// can be specified in the URI query string and PoolOptions are ignored. // can be specified in the URI query string and PoolOptions are ignored.
// When using config files, PoolOptions are applied. // When using config files, PoolOptions are applied.
func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) { func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
log := logger.FromContext(ctx)
// Validate PoolOptions
if options.MaxConns <= 0 {
return nil, fmt.Errorf("pgdb: MaxConns must be positive, got: %d", options.MaxConns)
}
if options.MinConns < 0 {
return nil, fmt.Errorf("pgdb: MinConns must be non-negative, got: %d", options.MinConns)
}
if options.MinConns > options.MaxConns {
return nil, fmt.Errorf("pgdb: MinConns (%d) cannot exceed MaxConns (%d)", options.MinConns, options.MaxConns)
}
var poolConfig *pgxpool.Config var poolConfig *pgxpool.Config
var err error var err error
@@ -70,7 +85,15 @@ func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse DATABASE_URI: %w", err) return nil, fmt.Errorf("failed to parse DATABASE_URI: %w", err)
} }
// Pool settings from URI are used; PoolOptions ignored
// Log when PoolOptions differ from defaults (they will be ignored)
defaults := DefaultPoolOptions()
if options.MaxConns != defaults.MaxConns || options.MinConns != defaults.MinConns {
log.WarnContext(ctx, "DATABASE_URI is set; PoolOptions are ignored (use URI query parameters for pool settings)",
"ignored_max_conns", options.MaxConns,
"ignored_min_conns", options.MinConns,
)
}
} else { } else {
// Fall back to config file approach // Fall back to config file approach
pgCfg, err := findAndParseConfig(options.ConfigFiles) pgCfg, err := findAndParseConfig(options.ConfigFiles)
@@ -99,7 +122,7 @@ func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
// Test the connection // Test the connection
if err := pool.Ping(ctx); err != nil { if err := pool.Ping(ctx); err != nil {
pool.Close() pool.Close() // pgxpool.Pool.Close() doesn't return an error
return nil, fmt.Errorf("failed to ping database: %w", err) return nil, fmt.Errorf("failed to ping database: %w", err)
} }
@@ -116,35 +139,33 @@ func OpenPoolWithConfigFile(ctx context.Context, configFile string) (*pgxpool.Po
// findAndParseConfig searches for and parses the first existing config file // findAndParseConfig searches for and parses the first existing config file
func findAndParseConfig(configFiles []string) (*database.PostgresConfig, error) { func findAndParseConfig(configFiles []string) (*database.PostgresConfig, error) {
var firstErr error var errs []error
var triedFiles []string
for _, configFile := range configFiles { for _, configFile := range configFiles {
if configFile == "" { if configFile == "" {
continue continue
} }
triedFiles = append(triedFiles, configFile)
// Check if file exists // Check if file exists
if _, err := os.Stat(configFile); err != nil { if _, err := os.Stat(configFile); err != nil {
if firstErr == nil { errs = append(errs, fmt.Errorf("%s: %w", configFile, err))
firstErr = err
}
continue continue
} }
// Try to read and parse the file // Try to read and parse the file
pgCfg, err := parseConfigFile(configFile) pgCfg, err := parseConfigFile(configFile)
if err != nil { if err != nil {
if firstErr == nil { errs = append(errs, fmt.Errorf("%s: %w", configFile, err))
firstErr = err
}
continue continue
} }
return pgCfg, nil return pgCfg, nil
} }
if firstErr != nil { if len(errs) > 0 {
return nil, fmt.Errorf("no config file found: %w", firstErr) return nil, fmt.Errorf("no valid config file found (tried: %v): %w", triedFiles, errors.Join(errs...))
} }
return nil, fmt.Errorf("no valid config files provided") return nil, fmt.Errorf("no valid config files provided")
} }
@@ -155,7 +176,7 @@ func parseConfigFile(configFile string) (*database.PostgresConfig, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open config file: %w", err) return nil, fmt.Errorf("failed to open config file: %w", err)
} }
defer file.Close() defer func() { _ = file.Close() }()
dec := yaml.NewDecoder(file) dec := yaml.NewDecoder(file)
cfg := database.Config{} cfg := database.Config{}

View File

@@ -51,9 +51,9 @@ func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, erro
// OpenDBMonitor opens a database connection with monitor-specific defaults // OpenDBMonitor opens a database connection with monitor-specific defaults
// This is a convenience function for Monitor package compatibility // This is a convenience function for Monitor package compatibility
func OpenDBMonitor() (*sql.DB, error) { func OpenDBMonitor(ctx context.Context) (*sql.DB, error) {
options := MonitorConfigOptions() options := MonitorConfigOptions()
return OpenDB(context.Background(), options) return OpenDB(ctx, options)
} }
// findConfigFile searches for the first existing config file from the list // findConfigFile searches for the first existing config file from the list