Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
28d05d1d0e | |||
a774f92bf7 |
@ -1,6 +1,7 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -36,10 +37,20 @@ type ConfigOptions struct {
|
||||
ConnMaxLifetime time.Duration
|
||||
}
|
||||
|
||||
// getConfigFiles returns the list of config files to search for database configuration.
|
||||
// If DATABASE_CONFIG_FILE environment variable is set, it returns that single file.
|
||||
// Otherwise, it returns the default paths.
|
||||
func getConfigFiles() []string {
|
||||
if configFile := os.Getenv("DATABASE_CONFIG_FILE"); configFile != "" {
|
||||
return []string{configFile}
|
||||
}
|
||||
return []string{"database.yaml", "/vault/secrets/database.yaml"}
|
||||
}
|
||||
|
||||
// DefaultConfigOptions returns the standard configuration options used by API package
|
||||
func DefaultConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
ConfigFiles: getConfigFiles(),
|
||||
EnablePoolMonitoring: true,
|
||||
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
||||
MaxOpenConns: 25,
|
||||
@ -51,7 +62,7 @@ func DefaultConfigOptions() ConfigOptions {
|
||||
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
||||
func MonitorConfigOptions() ConfigOptions {
|
||||
return ConfigOptions{
|
||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
||||
ConfigFiles: getConfigFiles(),
|
||||
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
||||
PrometheusRegisterer: nil, // No Prometheus dependency
|
||||
MaxOpenConns: 10,
|
||||
|
@ -23,9 +23,8 @@ type bufferingExporter struct {
|
||||
// Real exporter (created when tracing is configured)
|
||||
exporter otellog.Exporter
|
||||
|
||||
// Thread-safe initialization
|
||||
initOnce sync.Once
|
||||
initErr error
|
||||
// Thread-safe initialization state (managed only by checkReadiness)
|
||||
initErr error
|
||||
|
||||
// Background checker
|
||||
stopChecker chan struct{}
|
||||
@ -48,20 +47,13 @@ func newBufferingExporter() *bufferingExporter {
|
||||
|
||||
// Export implements otellog.Exporter
|
||||
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
||||
// Try initialization once
|
||||
e.initOnce.Do(func() {
|
||||
e.initErr = e.initialize()
|
||||
})
|
||||
// Check if exporter is ready (initialization handled by checkReadiness goroutine)
|
||||
e.mu.RLock()
|
||||
exporter := e.exporter
|
||||
e.mu.RUnlock()
|
||||
|
||||
// If initialization succeeded, use the exporter
|
||||
if e.initErr == nil {
|
||||
e.mu.RLock()
|
||||
exporter := e.exporter
|
||||
e.mu.RUnlock()
|
||||
|
||||
if exporter != nil {
|
||||
return exporter.Export(ctx, records)
|
||||
}
|
||||
if exporter != nil {
|
||||
return exporter.Export(ctx, records)
|
||||
}
|
||||
|
||||
// Not ready yet, buffer the records
|
||||
@ -117,24 +109,31 @@ func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkReadiness periodically checks if tracing is configured
|
||||
// checkReadiness periodically attempts initialization until successful
|
||||
func (e *bufferingExporter) checkReadiness() {
|
||||
defer close(e.checkerDone)
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// If initialization failed, reset sync.Once to allow retry
|
||||
// The OTLP exporter will handle its own retry logic
|
||||
if e.initErr != nil {
|
||||
e.initOnce = sync.Once{}
|
||||
} else if e.exporter != nil {
|
||||
// Check if we already have a working exporter
|
||||
e.mu.RLock()
|
||||
hasExporter := e.exporter != nil
|
||||
e.mu.RUnlock()
|
||||
|
||||
if hasExporter {
|
||||
return // Exporter ready, checker no longer needed
|
||||
}
|
||||
|
||||
// Try to initialize
|
||||
err := e.initialize()
|
||||
e.mu.Lock()
|
||||
e.initErr = err
|
||||
e.mu.Unlock()
|
||||
|
||||
case <-e.stopChecker:
|
||||
return
|
||||
}
|
||||
@ -180,14 +179,21 @@ func (e *bufferingExporter) Shutdown(ctx context.Context) error {
|
||||
// Stop the readiness checker from continuing
|
||||
close(e.stopChecker)
|
||||
|
||||
// Give one final chance for TLS/tracing to become ready before fully shutting down
|
||||
e.initOnce.Do(func() {
|
||||
e.initErr = e.initialize()
|
||||
})
|
||||
|
||||
// Wait for readiness checker goroutine to complete
|
||||
<-e.checkerDone
|
||||
|
||||
// Give one final chance for TLS/tracing to become ready for buffer flushing
|
||||
e.mu.RLock()
|
||||
hasExporter := e.exporter != nil
|
||||
e.mu.RUnlock()
|
||||
|
||||
if !hasExporter {
|
||||
err := e.initialize()
|
||||
e.mu.Lock()
|
||||
e.initErr = err
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user