Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
28d05d1d0e | |||
a774f92bf7 |
@ -1,6 +1,7 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -36,10 +37,20 @@ type ConfigOptions struct {
|
|||||||
ConnMaxLifetime time.Duration
|
ConnMaxLifetime time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getConfigFiles returns the list of config files to search for database configuration.
|
||||||
|
// If DATABASE_CONFIG_FILE environment variable is set, it returns that single file.
|
||||||
|
// Otherwise, it returns the default paths.
|
||||||
|
func getConfigFiles() []string {
|
||||||
|
if configFile := os.Getenv("DATABASE_CONFIG_FILE"); configFile != "" {
|
||||||
|
return []string{configFile}
|
||||||
|
}
|
||||||
|
return []string{"database.yaml", "/vault/secrets/database.yaml"}
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultConfigOptions returns the standard configuration options used by API package
|
// DefaultConfigOptions returns the standard configuration options used by API package
|
||||||
func DefaultConfigOptions() ConfigOptions {
|
func DefaultConfigOptions() ConfigOptions {
|
||||||
return ConfigOptions{
|
return ConfigOptions{
|
||||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
ConfigFiles: getConfigFiles(),
|
||||||
EnablePoolMonitoring: true,
|
EnablePoolMonitoring: true,
|
||||||
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
PrometheusRegisterer: prometheus.DefaultRegisterer,
|
||||||
MaxOpenConns: 25,
|
MaxOpenConns: 25,
|
||||||
@ -51,7 +62,7 @@ func DefaultConfigOptions() ConfigOptions {
|
|||||||
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
// MonitorConfigOptions returns configuration options optimized for Monitor package
|
||||||
func MonitorConfigOptions() ConfigOptions {
|
func MonitorConfigOptions() ConfigOptions {
|
||||||
return ConfigOptions{
|
return ConfigOptions{
|
||||||
ConfigFiles: []string{"database.yaml", "/vault/secrets/database.yaml"},
|
ConfigFiles: getConfigFiles(),
|
||||||
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
EnablePoolMonitoring: false, // Monitor doesn't need metrics
|
||||||
PrometheusRegisterer: nil, // No Prometheus dependency
|
PrometheusRegisterer: nil, // No Prometheus dependency
|
||||||
MaxOpenConns: 10,
|
MaxOpenConns: 10,
|
||||||
|
@ -23,9 +23,8 @@ type bufferingExporter struct {
|
|||||||
// Real exporter (created when tracing is configured)
|
// Real exporter (created when tracing is configured)
|
||||||
exporter otellog.Exporter
|
exporter otellog.Exporter
|
||||||
|
|
||||||
// Thread-safe initialization
|
// Thread-safe initialization state (managed only by checkReadiness)
|
||||||
initOnce sync.Once
|
initErr error
|
||||||
initErr error
|
|
||||||
|
|
||||||
// Background checker
|
// Background checker
|
||||||
stopChecker chan struct{}
|
stopChecker chan struct{}
|
||||||
@ -48,20 +47,13 @@ func newBufferingExporter() *bufferingExporter {
|
|||||||
|
|
||||||
// Export implements otellog.Exporter
|
// Export implements otellog.Exporter
|
||||||
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
func (e *bufferingExporter) Export(ctx context.Context, records []otellog.Record) error {
|
||||||
// Try initialization once
|
// Check if exporter is ready (initialization handled by checkReadiness goroutine)
|
||||||
e.initOnce.Do(func() {
|
e.mu.RLock()
|
||||||
e.initErr = e.initialize()
|
exporter := e.exporter
|
||||||
})
|
e.mu.RUnlock()
|
||||||
|
|
||||||
// If initialization succeeded, use the exporter
|
if exporter != nil {
|
||||||
if e.initErr == nil {
|
return exporter.Export(ctx, records)
|
||||||
e.mu.RLock()
|
|
||||||
exporter := e.exporter
|
|
||||||
e.mu.RUnlock()
|
|
||||||
|
|
||||||
if exporter != nil {
|
|
||||||
return exporter.Export(ctx, records)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not ready yet, buffer the records
|
// Not ready yet, buffer the records
|
||||||
@ -117,24 +109,31 @@ func (e *bufferingExporter) bufferRecords(records []otellog.Record) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkReadiness periodically checks if tracing is configured
|
// checkReadiness periodically attempts initialization until successful
|
||||||
func (e *bufferingExporter) checkReadiness() {
|
func (e *bufferingExporter) checkReadiness() {
|
||||||
defer close(e.checkerDone)
|
defer close(e.checkerDone)
|
||||||
|
|
||||||
ticker := time.NewTicker(1 * time.Second) // Reduced frequency since OTLP handles retries
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
// If initialization failed, reset sync.Once to allow retry
|
// Check if we already have a working exporter
|
||||||
// The OTLP exporter will handle its own retry logic
|
e.mu.RLock()
|
||||||
if e.initErr != nil {
|
hasExporter := e.exporter != nil
|
||||||
e.initOnce = sync.Once{}
|
e.mu.RUnlock()
|
||||||
} else if e.exporter != nil {
|
|
||||||
|
if hasExporter {
|
||||||
return // Exporter ready, checker no longer needed
|
return // Exporter ready, checker no longer needed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to initialize
|
||||||
|
err := e.initialize()
|
||||||
|
e.mu.Lock()
|
||||||
|
e.initErr = err
|
||||||
|
e.mu.Unlock()
|
||||||
|
|
||||||
case <-e.stopChecker:
|
case <-e.stopChecker:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -180,14 +179,21 @@ func (e *bufferingExporter) Shutdown(ctx context.Context) error {
|
|||||||
// Stop the readiness checker from continuing
|
// Stop the readiness checker from continuing
|
||||||
close(e.stopChecker)
|
close(e.stopChecker)
|
||||||
|
|
||||||
// Give one final chance for TLS/tracing to become ready before fully shutting down
|
|
||||||
e.initOnce.Do(func() {
|
|
||||||
e.initErr = e.initialize()
|
|
||||||
})
|
|
||||||
|
|
||||||
// Wait for readiness checker goroutine to complete
|
// Wait for readiness checker goroutine to complete
|
||||||
<-e.checkerDone
|
<-e.checkerDone
|
||||||
|
|
||||||
|
// Give one final chance for TLS/tracing to become ready for buffer flushing
|
||||||
|
e.mu.RLock()
|
||||||
|
hasExporter := e.exporter != nil
|
||||||
|
e.mu.RUnlock()
|
||||||
|
|
||||||
|
if !hasExporter {
|
||||||
|
err := e.initialize()
|
||||||
|
e.mu.Lock()
|
||||||
|
e.initErr = err
|
||||||
|
e.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user