5 Commits

8 changed files with 72 additions and 21 deletions

8
go.mod
View File

@@ -1,6 +1,6 @@
module go.ntppool.org/common
go 1.20
go 1.21
require (
github.com/abh/certman v0.4.0
@@ -8,7 +8,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.42
github.com/spf13/cobra v1.7.0
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/mod v0.12.0
golang.org/x/sync v0.3.0
)
@@ -25,10 +25,10 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/sys v0.12.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

12
go.sum
View File

@@ -48,6 +48,8 @@ github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdO
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU=
github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg=
@@ -70,10 +72,12 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22 h1:FqrVOBQxQ8r/UwwXibI0KMolVhvFiGobSfdE33deHJM=
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI=
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
@@ -97,6 +101,10 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

View File

@@ -2,24 +2,43 @@ package health
import (
"context"
"log/slog"
"net/http"
"strconv"
"time"
"golang.org/x/exp/slog"
"go.ntppool.org/common/logger"
"golang.org/x/sync/errgroup"
)
// HealthCheckListener runs simple http server on the specified port for
// health check probes
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
log.Info("Starting health listener", "port", port)
type Server struct {
log *slog.Logger
healthFn http.HandlerFunc
}
func NewServer(healthFn http.HandlerFunc) *Server {
if healthFn == nil {
healthFn = basicHealth
}
srv := &Server{
log: logger.Setup(),
healthFn: healthFn,
}
return srv
}
func (srv *Server) SetLogger(log *slog.Logger) {
srv.log = log
}
func (srv *Server) Listen(ctx context.Context, port int) error {
srv.log.Info("Starting health listener", "port", port)
serveMux := http.NewServeMux()
serveMux.HandleFunc("/__health", basicHealth)
serveMux.HandleFunc("/__health", srv.healthFn)
srv := &http.Server{
hsrv := &http.Server{
Addr: ":" + strconv.Itoa(port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 20 * time.Second,
@@ -30,9 +49,9 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
err := srv.ListenAndServe()
err := hsrv.ListenAndServe()
if err != http.ErrServerClosed {
log.Warn("health check server done listening", "err", err)
srv.log.Warn("health check server done listening", "err", err)
return err
}
return nil
@@ -44,8 +63,8 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
defer cancel()
g.Go(func() error {
if err := srv.Shutdown(ctx); err != nil {
log.Error("health check server shutdown failed", "err", err)
if err := hsrv.Shutdown(ctx); err != nil {
srv.log.Error("health check server shutdown failed", "err", err)
return err
}
return nil
@@ -54,6 +73,14 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
return g.Wait()
}
// HealthCheckListener runs simple http server on the specified port for
// health check probes
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
srv := NewServer(nil)
srv.SetLogger(log)
return srv.Listen(ctx, port)
}
func basicHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("ok"))

View File

@@ -12,6 +12,7 @@ import (
"github.com/abh/certman"
"github.com/segmentio/kafka-go"
"go.ntppool.org/common/logger"
)
const (
@@ -203,3 +204,17 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
return w, nil
}
func (k *Kafka) CheckPartitions() error {
partitions, err := k.conn.ReadPartitions()
if err != nil {
return err
}
// should this result in an error?
if len(partitions) == 0 {
log := logger.Setup()
log.Info("kafka connection has no partitions")
}
return nil
}

View File

@@ -7,7 +7,7 @@ import (
"strconv"
"sync"
"golang.org/x/exp/slog"
"log/slog"
)
var ConfigPrefix = ""

View File

@@ -2,8 +2,7 @@ package logger
import (
"fmt"
"golang.org/x/exp/slog"
"log/slog"
)
type stdLoggerish struct {

View File

@@ -27,7 +27,7 @@ func New() *Metrics {
return m
}
func (m *Metrics) Registry() prometheus.Registerer {
func (m *Metrics) Registry() *prometheus.Registry {
return m.r
}

View File

@@ -2,6 +2,8 @@
set -euo pipefail
go install github.com/goreleaser/goreleaser@v1.20.0
DRONE_TAG=${DRONE_TAG-""}
is_snapshot=""