Compare commits

...

2 Commits

2 changed files with 52 additions and 10 deletions

View File

@ -6,20 +6,39 @@ import (
"strconv" "strconv"
"time" "time"
"go.ntppool.org/common/logger"
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// HealthCheckListener runs simple http server on the specified port for type Server struct {
// health check probes log *slog.Logger
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error { healthFn http.HandlerFunc
log.Info("Starting health listener", "port", port) }
func NewServer(healthFn http.HandlerFunc) *Server {
if healthFn == nil {
healthFn = basicHealth
}
srv := &Server{
log: logger.Setup(),
healthFn: healthFn,
}
return srv
}
func (srv *Server) SetLogger(log *slog.Logger) {
srv.log = log
}
func (srv *Server) Listen(ctx context.Context, port int) error {
srv.log.Info("Starting health listener", "port", port)
serveMux := http.NewServeMux() serveMux := http.NewServeMux()
serveMux.HandleFunc("/__health", basicHealth) serveMux.HandleFunc("/__health", srv.healthFn)
srv := &http.Server{ hsrv := &http.Server{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
ReadTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second,
WriteTimeout: 20 * time.Second, WriteTimeout: 20 * time.Second,
@ -30,9 +49,9 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { g.Go(func() error {
err := srv.ListenAndServe() err := hsrv.ListenAndServe()
if err != http.ErrServerClosed { if err != http.ErrServerClosed {
log.Warn("health check server done listening", "err", err) srv.log.Warn("health check server done listening", "err", err)
return err return err
} }
return nil return nil
@ -44,8 +63,8 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
defer cancel() defer cancel()
g.Go(func() error { g.Go(func() error {
if err := srv.Shutdown(ctx); err != nil { if err := hsrv.Shutdown(ctx); err != nil {
log.Error("health check server shutdown failed", "err", err) srv.log.Error("health check server shutdown failed", "err", err)
return err return err
} }
return nil return nil
@ -54,6 +73,14 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
return g.Wait() return g.Wait()
} }
// HealthCheckListener runs simple http server on the specified port for
// health check probes
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
srv := NewServer(nil)
srv.SetLogger(log)
return srv.Listen(ctx, port)
}
func basicHealth(w http.ResponseWriter, r *http.Request) { func basicHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) w.WriteHeader(200)
w.Write([]byte("ok")) w.Write([]byte("ok"))

View File

@ -12,6 +12,7 @@ import (
"github.com/abh/certman" "github.com/abh/certman"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"go.ntppool.org/common/logger"
) )
const ( const (
@ -203,3 +204,17 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
return w, nil return w, nil
} }
func (k *Kafka) CheckPartitions() error {
partitions, err := k.conn.ReadPartitions()
if err != nil {
return err
}
// should this result in an error?
if len(partitions) == 0 {
log := logger.Setup()
log.Info("kafka connection has no partitions")
}
return nil
}