Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6420d0b174 |
@@ -23,13 +23,6 @@ func (c *Config) WebHostname() string {
|
|||||||
return c.webHostname
|
return c.webHostname
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) PoolDomain() string {
|
|
||||||
if c == nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return c.poolDomain
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) Valid() bool {
|
func (c *Config) Valid() bool {
|
||||||
if c == nil {
|
if c == nil {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ func createConnector(configFile string) CreateConnectorFunc {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func() { _ = dbFile.Close() }()
|
defer dbFile.Close()
|
||||||
|
|
||||||
dec := yaml.NewDecoder(dbFile)
|
dec := yaml.NewDecoder(dbFile)
|
||||||
cfg := Config{}
|
cfg := Config{}
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Mock types for testing SQLC integration patterns
|
// Mock types for testing SQLC integration patterns
|
||||||
type mockQueries struct{}
|
type mockQueries struct {
|
||||||
|
db DBTX
|
||||||
|
}
|
||||||
|
|
||||||
type mockQueriesTx struct {
|
type mockQueriesTx struct {
|
||||||
*mockQueries
|
*mockQueries
|
||||||
@@ -56,7 +58,7 @@ func TestSQLCIntegration(t *testing.T) {
|
|||||||
|
|
||||||
// Verify our DB interface is compatible with what SQLC expects
|
// Verify our DB interface is compatible with what SQLC expects
|
||||||
var dbInterface DB[*mockQueriesTx]
|
var dbInterface DB[*mockQueriesTx]
|
||||||
mockDB := &mockQueries{}
|
var mockDB *mockQueries = &mockQueries{}
|
||||||
dbInterface = mockDB
|
dbInterface = mockDB
|
||||||
|
|
||||||
// Test that our transaction helper can work with this pattern
|
// Test that our transaction helper can work with this pattern
|
||||||
|
|||||||
@@ -3,10 +3,10 @@ package database
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.ntppool.org/common/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
|
// DatabaseMetrics holds the Prometheus metrics for database connection pool monitoring
|
||||||
@@ -16,10 +16,6 @@ type DatabaseMetrics struct {
|
|||||||
ConnectionsInUse prometheus.Gauge
|
ConnectionsInUse prometheus.Gauge
|
||||||
ConnectionsWaitCount prometheus.Counter
|
ConnectionsWaitCount prometheus.Counter
|
||||||
ConnectionsWaitDuration prometheus.Histogram
|
ConnectionsWaitDuration prometheus.Histogram
|
||||||
|
|
||||||
// Track last values for delta calculation (cumulative stats from sql.DBStats)
|
|
||||||
lastWaitCount int64
|
|
||||||
lastWaitDuration time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabaseMetrics creates a new set of database metrics and registers them
|
// NewDatabaseMetrics creates a new set of database metrics and registers them
|
||||||
@@ -71,44 +67,26 @@ func monitorConnectionPool(ctx context.Context, db *sql.DB, registerer prometheu
|
|||||||
ticker := time.NewTicker(30 * time.Second)
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
log := logger.FromContext(ctx)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.InfoContext(ctx, "database connection pool monitor stopped")
|
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
stats := db.Stats()
|
stats := db.Stats()
|
||||||
|
|
||||||
// Update gauge metrics (current state)
|
|
||||||
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
|
metrics.ConnectionsOpen.Set(float64(stats.OpenConnections))
|
||||||
metrics.ConnectionsIdle.Set(float64(stats.Idle))
|
metrics.ConnectionsIdle.Set(float64(stats.Idle))
|
||||||
metrics.ConnectionsInUse.Set(float64(stats.InUse))
|
metrics.ConnectionsInUse.Set(float64(stats.InUse))
|
||||||
|
metrics.ConnectionsWaitCount.Add(float64(stats.WaitCount))
|
||||||
|
|
||||||
// Update counter with delta (WaitCount is cumulative in sql.DBStats)
|
if stats.WaitDuration > 0 {
|
||||||
waitCountDelta := stats.WaitCount - metrics.lastWaitCount
|
metrics.ConnectionsWaitDuration.Observe(stats.WaitDuration.Seconds())
|
||||||
if waitCountDelta > 0 {
|
|
||||||
metrics.ConnectionsWaitCount.Add(float64(waitCountDelta))
|
|
||||||
metrics.lastWaitCount = stats.WaitCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update histogram with delta (WaitDuration is cumulative in sql.DBStats)
|
|
||||||
waitDurationDelta := stats.WaitDuration - metrics.lastWaitDuration
|
|
||||||
if waitDurationDelta > 0 {
|
|
||||||
metrics.ConnectionsWaitDuration.Observe(waitDurationDelta.Seconds())
|
|
||||||
metrics.lastWaitDuration = stats.WaitDuration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log connection pool stats for high usage or waiting
|
// Log connection pool stats for high usage or waiting
|
||||||
if stats.OpenConnections > 20 || waitCountDelta > 0 {
|
if stats.OpenConnections > 20 || stats.WaitCount > 0 {
|
||||||
log.WarnContext(ctx, "high database connection usage",
|
fmt.Printf("Connection pool stats: open=%d idle=%d in_use=%d wait_count=%d wait_duration=%s\n",
|
||||||
"open", stats.OpenConnections,
|
stats.OpenConnections, stats.Idle, stats.InUse, stats.WaitCount, stats.WaitDuration)
|
||||||
"idle", stats.Idle,
|
|
||||||
"in_use", stats.InUse,
|
|
||||||
"wait_count", stats.WaitCount,
|
|
||||||
"wait_duration", stats.WaitDuration,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,17 +75,13 @@ sslmode: prefer
|
|||||||
|
|
||||||
### PoolOptions
|
### PoolOptions
|
||||||
|
|
||||||
Defaults match [pgxpool defaults](https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Config):
|
|
||||||
|
|
||||||
- `ConfigFiles` - List of config file paths to search (default: `database.yaml`, `/vault/secrets/database.yaml`)
|
- `ConfigFiles` - List of config file paths to search (default: `database.yaml`, `/vault/secrets/database.yaml`)
|
||||||
- `MinConns` - Minimum connections (default: 0)
|
- `MinConns` - Minimum connections (default: 0)
|
||||||
- `MaxConns` - Maximum connections (default: 4)
|
- `MaxConns` - Maximum connections (default: 25)
|
||||||
- `MaxConnLifetime` - Connection lifetime (default: 1 hour)
|
- `MaxConnLifetime` - Connection lifetime (default: 1 hour)
|
||||||
- `MaxConnIdleTime` - Idle timeout (default: 30 minutes)
|
- `MaxConnIdleTime` - Idle timeout (default: 30 minutes)
|
||||||
- `HealthCheckPeriod` - Health check interval (default: 1 minute)
|
- `HealthCheckPeriod` - Health check interval (default: 1 minute)
|
||||||
|
|
||||||
For higher connection limits, set via `PoolOptions` or URI query parameter `?pool_max_conns=25`.
|
|
||||||
|
|
||||||
### PostgreSQL Config Fields
|
### PostgreSQL Config Fields
|
||||||
|
|
||||||
- `host` - Database host (required)
|
- `host` - Database host (required)
|
||||||
@@ -97,34 +93,8 @@ For higher connection limits, set via `PoolOptions` or URI query parameter `?poo
|
|||||||
|
|
||||||
## Environment Variables
|
## Environment Variables
|
||||||
|
|
||||||
- `DATABASE_URI` - PostgreSQL connection URI (takes precedence over config files)
|
|
||||||
- `DATABASE_CONFIG_FILE` - Override config file location
|
- `DATABASE_CONFIG_FILE` - Override config file location
|
||||||
|
|
||||||
### URI Format
|
|
||||||
|
|
||||||
Standard PostgreSQL URI format:
|
|
||||||
```
|
|
||||||
postgresql://user:password@host:port/database?sslmode=require&pool_max_conns=10
|
|
||||||
```
|
|
||||||
|
|
||||||
Pool settings can be included in the URI query string:
|
|
||||||
- `pool_max_conns`, `pool_min_conns`
|
|
||||||
- `pool_max_conn_lifetime`, `pool_max_conn_idle_time`
|
|
||||||
- `pool_health_check_period`
|
|
||||||
|
|
||||||
When using `DATABASE_URI`, pool settings come from the URI. Since `PoolOptions` defaults match pgxpool defaults, behavior is consistent whether using URI or config files.
|
|
||||||
|
|
||||||
Example with CloudNativePG:
|
|
||||||
```yaml
|
|
||||||
# Mount the secret's 'uri' key as DATABASE_URI
|
|
||||||
env:
|
|
||||||
- name: DATABASE_URI
|
|
||||||
valueFrom:
|
|
||||||
secretKeyRef:
|
|
||||||
name: mydb-app
|
|
||||||
key: uri
|
|
||||||
```
|
|
||||||
|
|
||||||
## When to Use
|
## When to Use
|
||||||
|
|
||||||
**Use `pgdb.OpenPool()`** (this package) when:
|
**Use `pgdb.OpenPool()`** (this package) when:
|
||||||
|
|||||||
@@ -27,10 +27,15 @@ func CreatePoolConfig(cfg *database.PostgresConfig) (*pgxpool.Config, error) {
|
|||||||
"require": true, "verify-ca": true, "verify-full": true,
|
"require": true, "verify-ca": true, "verify-full": true,
|
||||||
}
|
}
|
||||||
if cfg.SSLMode != "" && !validSSLModes[cfg.SSLMode] {
|
if cfg.SSLMode != "" && !validSSLModes[cfg.SSLMode] {
|
||||||
return nil, fmt.Errorf("postgres: invalid sslmode: %s (valid: disable, allow, prefer, require, verify-ca, verify-full)", cfg.SSLMode)
|
return nil, fmt.Errorf("postgres: invalid sslmode: %s", cfg.SSLMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set defaults
|
||||||
|
host := cfg.Host
|
||||||
|
if host == "" {
|
||||||
|
host = "localhost"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply defaults for optional fields (host is validated as required above)
|
|
||||||
port := cfg.Port
|
port := cfg.Port
|
||||||
if port == 0 {
|
if port == 0 {
|
||||||
port = 5432
|
port = 5432
|
||||||
@@ -44,7 +49,7 @@ func CreatePoolConfig(cfg *database.PostgresConfig) (*pgxpool.Config, error) {
|
|||||||
// Build connection string WITHOUT password
|
// Build connection string WITHOUT password
|
||||||
// We'll set the password separately in the config
|
// We'll set the password separately in the config
|
||||||
connString := fmt.Sprintf("host=%s port=%d user=%s dbname=%s sslmode=%s",
|
connString := fmt.Sprintf("host=%s port=%d user=%s dbname=%s sslmode=%s",
|
||||||
cfg.Host, port, cfg.User, cfg.Name, sslmode)
|
host, port, cfg.User, cfg.Name, sslmode)
|
||||||
|
|
||||||
// Parse the connection string
|
// Parse the connection string
|
||||||
poolConfig, err := pgxpool.ParseConfig(connString)
|
poolConfig, err := pgxpool.ParseConfig(connString)
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package pgdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -12,47 +11,38 @@ import (
|
|||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PoolOptions configures pgxpool connection behavior.
|
// PoolOptions configures pgxpool connection behavior
|
||||||
//
|
|
||||||
// Default values match pgxpool defaults from github.com/jackc/pgx/v5/pgxpool.
|
|
||||||
// See: https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Config
|
|
||||||
//
|
|
||||||
// To customize pool settings, either:
|
|
||||||
// - Modify PoolOptions before calling OpenPool (for config file mode)
|
|
||||||
// - Use URI query parameters like ?pool_max_conns=25 (for DATABASE_URI mode)
|
|
||||||
type PoolOptions struct {
|
type PoolOptions struct {
|
||||||
// ConfigFiles is a list of config file paths to search for database configuration
|
// ConfigFiles is a list of config file paths to search for database configuration
|
||||||
ConfigFiles []string
|
ConfigFiles []string
|
||||||
|
|
||||||
// MinConns is the minimum number of connections in the pool.
|
// MinConns is the minimum number of connections in the pool
|
||||||
// Default: 0 (matches pgxpool default)
|
// Default: 0 (no minimum)
|
||||||
MinConns int32
|
MinConns int32
|
||||||
|
|
||||||
// MaxConns is the maximum number of connections in the pool.
|
// MaxConns is the maximum number of connections in the pool
|
||||||
// Default: 4 (matches pgxpool default)
|
// Default: 25
|
||||||
// For higher concurrency, increase via PoolOptions or URI ?pool_max_conns=N
|
|
||||||
MaxConns int32
|
MaxConns int32
|
||||||
|
|
||||||
// MaxConnLifetime is the maximum lifetime of a connection.
|
// MaxConnLifetime is the maximum lifetime of a connection
|
||||||
// Default: 1 hour (matches pgxpool default)
|
// Default: 1 hour
|
||||||
MaxConnLifetime time.Duration
|
MaxConnLifetime time.Duration
|
||||||
|
|
||||||
// MaxConnIdleTime is the maximum idle time of a connection.
|
// MaxConnIdleTime is the maximum idle time of a connection
|
||||||
// Default: 30 minutes (matches pgxpool default)
|
// Default: 30 minutes
|
||||||
MaxConnIdleTime time.Duration
|
MaxConnIdleTime time.Duration
|
||||||
|
|
||||||
// HealthCheckPeriod is how often to check connection health.
|
// HealthCheckPeriod is how often to check connection health
|
||||||
// Default: 1 minute (matches pgxpool default)
|
// Default: 1 minute
|
||||||
HealthCheckPeriod time.Duration
|
HealthCheckPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultPoolOptions returns defaults matching pgxpool.
|
// DefaultPoolOptions returns sensible defaults for pgxpool
|
||||||
// See https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Config for pgxpool defaults.
|
|
||||||
func DefaultPoolOptions() PoolOptions {
|
func DefaultPoolOptions() PoolOptions {
|
||||||
return PoolOptions{
|
return PoolOptions{
|
||||||
ConfigFiles: GetConfigFiles(),
|
ConfigFiles: getConfigFiles(),
|
||||||
MinConns: 0,
|
MinConns: 0,
|
||||||
MaxConns: 4,
|
MaxConns: 25,
|
||||||
MaxConnLifetime: time.Hour,
|
MaxConnLifetime: time.Hour,
|
||||||
MaxConnIdleTime: 30 * time.Minute,
|
MaxConnIdleTime: 30 * time.Minute,
|
||||||
HealthCheckPeriod: time.Minute,
|
HealthCheckPeriod: time.Minute,
|
||||||
@@ -61,59 +51,26 @@ func DefaultPoolOptions() PoolOptions {
|
|||||||
|
|
||||||
// OpenPool opens a native pgx connection pool with the specified configuration
|
// OpenPool opens a native pgx connection pool with the specified configuration
|
||||||
// This is the primary and recommended way to connect to PostgreSQL
|
// This is the primary and recommended way to connect to PostgreSQL
|
||||||
//
|
|
||||||
// Configuration precedence (highest to lowest):
|
|
||||||
// 1. DATABASE_URI environment variable (pool settings can be included in URI)
|
|
||||||
// 2. DATABASE_CONFIG_FILE environment variable (YAML)
|
|
||||||
// 3. Default config files (database.yaml, /vault/secrets/database.yaml)
|
|
||||||
//
|
|
||||||
// When using DATABASE_URI, pool settings (pool_max_conns, pool_min_conns, etc.)
|
|
||||||
// can be specified in the URI query string and PoolOptions are ignored.
|
|
||||||
// When using config files, PoolOptions are applied.
|
|
||||||
func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
|
func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
|
||||||
// Validate PoolOptions
|
// Find and read config file
|
||||||
if options.MaxConns <= 0 {
|
pgCfg, err := findAndParseConfig(options.ConfigFiles)
|
||||||
return nil, fmt.Errorf("pgdb: MaxConns must be positive, got: %d", options.MaxConns)
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
if options.MinConns < 0 {
|
|
||||||
return nil, fmt.Errorf("pgdb: MinConns must be non-negative, got: %d", options.MinConns)
|
|
||||||
}
|
|
||||||
if options.MinConns > options.MaxConns {
|
|
||||||
return nil, fmt.Errorf("pgdb: MinConns (%d) cannot exceed MaxConns (%d)", options.MinConns, options.MaxConns)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var poolConfig *pgxpool.Config
|
// Create pool config from PostgreSQL config
|
||||||
var err error
|
poolConfig, err := CreatePoolConfig(pgCfg)
|
||||||
|
if err != nil {
|
||||||
// Check DATABASE_URI environment variable first (highest priority)
|
return nil, err
|
||||||
// When using DATABASE_URI, pool settings come from URI query parameters
|
|
||||||
// (e.g., ?pool_max_conns=25). PoolOptions are not applied since our defaults
|
|
||||||
// match pgxpool defaults.
|
|
||||||
if uri := os.Getenv("DATABASE_URI"); uri != "" {
|
|
||||||
poolConfig, err = pgxpool.ParseConfig(uri)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse DATABASE_URI: %w", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Fall back to config file approach
|
|
||||||
pgCfg, _, err := FindConfig(options.ConfigFiles)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
poolConfig, err = CreatePoolConfig(pgCfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply pool-specific settings from PoolOptions (config files don't support these)
|
|
||||||
poolConfig.MinConns = options.MinConns
|
|
||||||
poolConfig.MaxConns = options.MaxConns
|
|
||||||
poolConfig.MaxConnLifetime = options.MaxConnLifetime
|
|
||||||
poolConfig.MaxConnIdleTime = options.MaxConnIdleTime
|
|
||||||
poolConfig.HealthCheckPeriod = options.HealthCheckPeriod
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply pool-specific settings
|
||||||
|
poolConfig.MinConns = options.MinConns
|
||||||
|
poolConfig.MaxConns = options.MaxConns
|
||||||
|
poolConfig.MaxConnLifetime = options.MaxConnLifetime
|
||||||
|
poolConfig.MaxConnIdleTime = options.MaxConnIdleTime
|
||||||
|
poolConfig.HealthCheckPeriod = options.HealthCheckPeriod
|
||||||
|
|
||||||
// Create the pool
|
// Create the pool
|
||||||
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
|
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -122,7 +79,7 @@ func OpenPool(ctx context.Context, options PoolOptions) (*pgxpool.Pool, error) {
|
|||||||
|
|
||||||
// Test the connection
|
// Test the connection
|
||||||
if err := pool.Ping(ctx); err != nil {
|
if err := pool.Ping(ctx); err != nil {
|
||||||
pool.Close() // pgxpool.Pool.Close() doesn't return an error
|
pool.Close()
|
||||||
return nil, fmt.Errorf("failed to ping database: %w", err)
|
return nil, fmt.Errorf("failed to ping database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,54 +94,39 @@ func OpenPoolWithConfigFile(ctx context.Context, configFile string) (*pgxpool.Po
|
|||||||
return OpenPool(ctx, options)
|
return OpenPool(ctx, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindConfig searches for and parses the first existing config file.
|
// findAndParseConfig searches for and parses the first existing config file
|
||||||
// Returns the PostgresConfig, the path to the config file used, and any error.
|
func findAndParseConfig(configFiles []string) (*database.PostgresConfig, error) {
|
||||||
// If DATABASE_URI env var is set, returns nil config with empty path (use ParseURIConfig instead).
|
var firstErr error
|
||||||
func FindConfig(configFiles []string) (*database.PostgresConfig, string, error) {
|
|
||||||
// Check if DATABASE_URI takes precedence
|
|
||||||
if os.Getenv("DATABASE_URI") != "" {
|
|
||||||
return nil, "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var errs []error
|
|
||||||
var triedFiles []string
|
|
||||||
|
|
||||||
for _, configFile := range configFiles {
|
for _, configFile := range configFiles {
|
||||||
if configFile == "" {
|
if configFile == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
triedFiles = append(triedFiles, configFile)
|
|
||||||
|
|
||||||
// Check if file exists
|
// Check if file exists
|
||||||
if _, err := os.Stat(configFile); err != nil {
|
if _, err := os.Stat(configFile); err != nil {
|
||||||
errs = append(errs, fmt.Errorf("%s: %w", configFile, err))
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to read and parse the file
|
// Try to read and parse the file
|
||||||
pgCfg, err := parseConfigFile(configFile)
|
pgCfg, err := parseConfigFile(configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, fmt.Errorf("%s: %w", configFile, err))
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return pgCfg, configFile, nil
|
return pgCfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(errs) > 0 {
|
if firstErr != nil {
|
||||||
return nil, "", fmt.Errorf("no valid config file found (tried: %v): %w", triedFiles, errors.Join(errs...))
|
return nil, fmt.Errorf("no config file found: %w", firstErr)
|
||||||
}
|
}
|
||||||
return nil, "", fmt.Errorf("no valid config files provided")
|
return nil, fmt.Errorf("no valid config files provided")
|
||||||
}
|
|
||||||
|
|
||||||
// ParseURIConfig extracts connection info from DATABASE_URI environment variable.
|
|
||||||
// Returns nil if DATABASE_URI is not set.
|
|
||||||
func ParseURIConfig() (*pgxpool.Config, error) {
|
|
||||||
uri := os.Getenv("DATABASE_URI")
|
|
||||||
if uri == "" {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return pgxpool.ParseConfig(uri)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseConfigFile reads and parses a YAML config file
|
// parseConfigFile reads and parses a YAML config file
|
||||||
@@ -193,7 +135,7 @@ func parseConfigFile(configFile string) (*database.PostgresConfig, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open config file: %w", err)
|
return nil, fmt.Errorf("failed to open config file: %w", err)
|
||||||
}
|
}
|
||||||
defer func() { _ = file.Close() }()
|
defer file.Close()
|
||||||
|
|
||||||
dec := yaml.NewDecoder(file)
|
dec := yaml.NewDecoder(file)
|
||||||
cfg := database.Config{}
|
cfg := database.Config{}
|
||||||
@@ -222,9 +164,8 @@ func parseConfigFile(configFile string) (*database.PostgresConfig, error) {
|
|||||||
return nil, fmt.Errorf("no PostgreSQL configuration found in %s", configFile)
|
return nil, fmt.Errorf("no PostgreSQL configuration found in %s", configFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConfigFiles returns the list of config files to search for database configuration.
|
// getConfigFiles returns the list of config files to search
|
||||||
// Checks DATABASE_CONFIG_FILE env var first, otherwise returns default paths.
|
func getConfigFiles() []string {
|
||||||
func GetConfigFiles() []string {
|
|
||||||
if configFile := os.Getenv("DATABASE_CONFIG_FILE"); configFile != "" {
|
if configFile := os.Getenv("DATABASE_CONFIG_FILE"); configFile != "" {
|
||||||
return []string{configFile}
|
return []string{configFile}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,6 @@
|
|||||||
package pgdb
|
package pgdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -115,8 +112,8 @@ func TestDefaultPoolOptions(t *testing.T) {
|
|||||||
if opts.MinConns != 0 {
|
if opts.MinConns != 0 {
|
||||||
t.Errorf("Expected MinConns=0, got %d", opts.MinConns)
|
t.Errorf("Expected MinConns=0, got %d", opts.MinConns)
|
||||||
}
|
}
|
||||||
if opts.MaxConns != 4 {
|
if opts.MaxConns != 25 {
|
||||||
t.Errorf("Expected MaxConns=4 (pgxpool default), got %d", opts.MaxConns)
|
t.Errorf("Expected MaxConns=25, got %d", opts.MaxConns)
|
||||||
}
|
}
|
||||||
if opts.MaxConnLifetime != time.Hour {
|
if opts.MaxConnLifetime != time.Hour {
|
||||||
t.Errorf("Expected MaxConnLifetime=1h, got %v", opts.MaxConnLifetime)
|
t.Errorf("Expected MaxConnLifetime=1h, got %v", opts.MaxConnLifetime)
|
||||||
@@ -152,63 +149,3 @@ func TestCreatePoolConfigDefaults(t *testing.T) {
|
|||||||
t.Errorf("Expected default Port=5432, got %d", poolCfg.ConnConfig.Port)
|
t.Errorf("Expected default Port=5432, got %d", poolCfg.ConnConfig.Port)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOpenPoolWithDatabaseURI(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
// This test requires a running PostgreSQL instance
|
|
||||||
uri := os.Getenv("TEST_DATABASE_URI")
|
|
||||||
if uri == "" {
|
|
||||||
t.Skip("TEST_DATABASE_URI not set")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
t.Setenv("DATABASE_URI", uri)
|
|
||||||
|
|
||||||
pool, err := OpenPool(ctx, DefaultPoolOptions())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("OpenPool failed: %v", err)
|
|
||||||
}
|
|
||||||
defer pool.Close()
|
|
||||||
|
|
||||||
// Verify connection works
|
|
||||||
var result int
|
|
||||||
err = pool.QueryRow(ctx, "SELECT 1").Scan(&result)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("query failed: %v", err)
|
|
||||||
}
|
|
||||||
if result != 1 {
|
|
||||||
t.Errorf("expected 1, got %d", result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDatabaseURIPrecedence(t *testing.T) {
|
|
||||||
// Test that DATABASE_URI takes precedence over config files
|
|
||||||
// We use localhost with a port that's unlikely to have postgres running
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Set DATABASE_URI to a parseable URI pointing to a non-listening port
|
|
||||||
t.Setenv("DATABASE_URI", "postgres://testuser:testpass@127.0.0.1:59999/testdb?connect_timeout=1")
|
|
||||||
|
|
||||||
// Set config files to a nonexistent path - if this were used, we'd get
|
|
||||||
// "config file not found" error instead of connection refused
|
|
||||||
opts := DefaultPoolOptions()
|
|
||||||
opts.ConfigFiles = []string{"/nonexistent/path/database.yaml"}
|
|
||||||
|
|
||||||
_, err := OpenPool(ctx, opts)
|
|
||||||
|
|
||||||
// Should fail with connection error (not config file error)
|
|
||||||
// This proves DATABASE_URI was used instead of config files
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// The error should be about connection failure, not about missing config file
|
|
||||||
errStr := err.Error()
|
|
||||||
if strings.Contains(errStr, "config file") || strings.Contains(errStr, "no such file") {
|
|
||||||
t.Errorf("expected connection error, got config file error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -51,9 +51,9 @@ func OpenDBWithConfigFile(ctx context.Context, configFile string) (*sql.DB, erro
|
|||||||
|
|
||||||
// OpenDBMonitor opens a database connection with monitor-specific defaults
|
// OpenDBMonitor opens a database connection with monitor-specific defaults
|
||||||
// This is a convenience function for Monitor package compatibility
|
// This is a convenience function for Monitor package compatibility
|
||||||
func OpenDBMonitor(ctx context.Context) (*sql.DB, error) {
|
func OpenDBMonitor() (*sql.DB, error) {
|
||||||
options := MonitorConfigOptions()
|
options := MonitorConfigOptions()
|
||||||
return OpenDB(ctx, options)
|
return OpenDB(context.Background(), options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// findConfigFile searches for the first existing config file from the list
|
// findConfigFile searches for the first existing config file from the list
|
||||||
|
|||||||
@@ -3,24 +3,6 @@
|
|||||||
// This package implements a dedicated metrics server that exposes application metrics
|
// This package implements a dedicated metrics server that exposes application metrics
|
||||||
// via HTTP. It uses a custom Prometheus registry to avoid conflicts with other metric
|
// via HTTP. It uses a custom Prometheus registry to avoid conflicts with other metric
|
||||||
// collectors and provides graceful shutdown capabilities.
|
// collectors and provides graceful shutdown capabilities.
|
||||||
//
|
|
||||||
// # Usage
|
|
||||||
//
|
|
||||||
// Create a new metrics server and register your metrics with its Registry():
|
|
||||||
//
|
|
||||||
// m := metricsserver.New()
|
|
||||||
// myCounter := prometheus.NewCounter(...)
|
|
||||||
// m.Registry().MustRegister(myCounter)
|
|
||||||
//
|
|
||||||
// When you need a Gatherer (for example, to pass to other libraries), use the Gatherer() method
|
|
||||||
// instead of prometheus.DefaultGatherer to ensure your custom metrics are collected:
|
|
||||||
//
|
|
||||||
// gatherer := m.Gatherer() // Returns the custom registry as a Gatherer
|
|
||||||
//
|
|
||||||
// To use the default Prometheus gatherer alongside your custom registry:
|
|
||||||
//
|
|
||||||
// m := metricsserver.NewWithDefaultGatherer()
|
|
||||||
// m.Gatherer() // Returns prometheus.DefaultGatherer
|
|
||||||
package metricsserver
|
package metricsserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -39,32 +21,15 @@ import (
|
|||||||
// Metrics provides a custom Prometheus registry and HTTP handlers for metrics exposure.
|
// Metrics provides a custom Prometheus registry and HTTP handlers for metrics exposure.
|
||||||
// It isolates application metrics from the default global registry.
|
// It isolates application metrics from the default global registry.
|
||||||
type Metrics struct {
|
type Metrics struct {
|
||||||
r *prometheus.Registry
|
r *prometheus.Registry
|
||||||
useDefaultGatherer bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new Metrics instance with a custom Prometheus registry.
|
// New creates a new Metrics instance with a custom Prometheus registry.
|
||||||
// Use this when you want isolated metrics that don't interfere with the default registry.
|
|
||||||
func New() *Metrics {
|
func New() *Metrics {
|
||||||
r := prometheus.NewRegistry()
|
r := prometheus.NewRegistry()
|
||||||
|
|
||||||
m := &Metrics{
|
m := &Metrics{
|
||||||
r: r,
|
r: r,
|
||||||
useDefaultGatherer: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewWithDefaultGatherer creates a new Metrics instance that uses the default Prometheus gatherer.
|
|
||||||
// This is useful when you want to expose metrics from the default registry alongside your custom metrics.
|
|
||||||
// The custom registry will still be available via Registry() but Gatherer() will return prometheus.DefaultGatherer.
|
|
||||||
func NewWithDefaultGatherer() *Metrics {
|
|
||||||
r := prometheus.NewRegistry()
|
|
||||||
|
|
||||||
m := &Metrics{
|
|
||||||
r: r,
|
|
||||||
useDefaultGatherer: true,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
@@ -76,23 +41,6 @@ func (m *Metrics) Registry() *prometheus.Registry {
|
|||||||
return m.r
|
return m.r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gatherer returns the Prometheus gatherer to use for collecting metrics.
|
|
||||||
// This returns the custom registry's Gatherer by default, ensuring your registered
|
|
||||||
// metrics are collected. If the instance was created with NewWithDefaultGatherer(),
|
|
||||||
// this returns prometheus.DefaultGatherer instead.
|
|
||||||
//
|
|
||||||
// Use this method when you need a prometheus.Gatherer interface, for example when
|
|
||||||
// configuring other libraries that need to collect metrics.
|
|
||||||
//
|
|
||||||
// IMPORTANT: Do not use prometheus.DefaultGatherer directly if you want to collect
|
|
||||||
// metrics registered with this instance's Registry(). Always use this Gatherer() method.
|
|
||||||
func (m *Metrics) Gatherer() prometheus.Gatherer {
|
|
||||||
if m.useDefaultGatherer {
|
|
||||||
return prometheus.DefaultGatherer
|
|
||||||
}
|
|
||||||
return m.r
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handler returns an HTTP handler for the /metrics endpoint with OpenMetrics support.
|
// Handler returns an HTTP handler for the /metrics endpoint with OpenMetrics support.
|
||||||
func (m *Metrics) Handler() http.Handler {
|
func (m *Metrics) Handler() http.Handler {
|
||||||
log := logger.NewStdLog("prom http", false, nil)
|
log := logger.NewStdLog("prom http", false, nil)
|
||||||
|
|||||||
@@ -67,47 +67,6 @@ func TestRegistry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGatherer(t *testing.T) {
|
|
||||||
metrics := New()
|
|
||||||
|
|
||||||
gatherer := metrics.Gatherer()
|
|
||||||
if gatherer == nil {
|
|
||||||
t.Fatal("Gatherer() returned nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register a test metric
|
|
||||||
counter := prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Name: "test_gatherer_counter",
|
|
||||||
Help: "A test counter for gatherer",
|
|
||||||
})
|
|
||||||
|
|
||||||
metrics.Registry().MustRegister(counter)
|
|
||||||
counter.Inc()
|
|
||||||
|
|
||||||
// Test that the gatherer collects our custom metric
|
|
||||||
metricFamilies, err := gatherer.Gather()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("failed to gather metrics: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
found := false
|
|
||||||
for _, mf := range metricFamilies {
|
|
||||||
if mf.GetName() == "test_gatherer_counter" {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
t.Error("registered metric not found via Gatherer()")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify gatherer is the same as registry
|
|
||||||
if gatherer != metrics.r {
|
|
||||||
t.Error("Gatherer() should return the same object as the registry for custom registry mode")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandler(t *testing.T) {
|
func TestHandler(t *testing.T) {
|
||||||
metrics := New()
|
metrics := New()
|
||||||
|
|
||||||
@@ -253,45 +212,6 @@ func TestListenAndServeContextCancellation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewWithDefaultGatherer(t *testing.T) {
|
|
||||||
metrics := NewWithDefaultGatherer()
|
|
||||||
|
|
||||||
if metrics == nil {
|
|
||||||
t.Fatal("NewWithDefaultGatherer returned nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !metrics.useDefaultGatherer {
|
|
||||||
t.Error("useDefaultGatherer should be true")
|
|
||||||
}
|
|
||||||
|
|
||||||
gatherer := metrics.Gatherer()
|
|
||||||
if gatherer == nil {
|
|
||||||
t.Fatal("Gatherer() returned nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify it returns the default gatherer
|
|
||||||
if gatherer != prometheus.DefaultGatherer {
|
|
||||||
t.Error("Gatherer() should return prometheus.DefaultGatherer when useDefaultGatherer is true")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify the custom registry is still available and separate
|
|
||||||
if metrics.Registry() == nil {
|
|
||||||
t.Error("Registry() should still return a custom registry")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that registering in custom registry doesn't affect default gatherer check
|
|
||||||
counter := prometheus.NewCounter(prometheus.CounterOpts{
|
|
||||||
Name: "test_default_gatherer_counter",
|
|
||||||
Help: "A test counter",
|
|
||||||
})
|
|
||||||
metrics.Registry().MustRegister(counter)
|
|
||||||
|
|
||||||
// The gatherer should still be the default one, not our custom registry
|
|
||||||
if metrics.Gatherer() != prometheus.DefaultGatherer {
|
|
||||||
t.Error("Gatherer() should continue to return prometheus.DefaultGatherer")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Benchmark the metrics handler response time
|
// Benchmark the metrics handler response time
|
||||||
func BenchmarkMetricsHandler(b *testing.B) {
|
func BenchmarkMetricsHandler(b *testing.B) {
|
||||||
metrics := New()
|
metrics := New()
|
||||||
|
|||||||
Reference in New Issue
Block a user