Compare commits
No commits in common. "be9b63f3820815c536a8b54d76bfd68005e0df50" and "2a021b453d472372aae8589e34c9197dcc6f7709" have entirely different histories.
be9b63f382
...
2a021b453d
@ -6,39 +6,20 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.ntppool.org/common/logger"
|
|
||||||
"golang.org/x/exp/slog"
|
"golang.org/x/exp/slog"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
// HealthCheckListener runs simple http server on the specified port for
|
||||||
log *slog.Logger
|
// health check probes
|
||||||
healthFn http.HandlerFunc
|
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
|
||||||
}
|
log.Info("Starting health listener", "port", port)
|
||||||
|
|
||||||
func NewServer(healthFn http.HandlerFunc) *Server {
|
|
||||||
if healthFn == nil {
|
|
||||||
healthFn = basicHealth
|
|
||||||
}
|
|
||||||
srv := &Server{
|
|
||||||
log: logger.Setup(),
|
|
||||||
healthFn: healthFn,
|
|
||||||
}
|
|
||||||
return srv
|
|
||||||
}
|
|
||||||
|
|
||||||
func (srv *Server) SetLogger(log *slog.Logger) {
|
|
||||||
srv.log = log
|
|
||||||
}
|
|
||||||
|
|
||||||
func (srv *Server) Listen(ctx context.Context, port int) error {
|
|
||||||
srv.log.Info("Starting health listener", "port", port)
|
|
||||||
|
|
||||||
serveMux := http.NewServeMux()
|
serveMux := http.NewServeMux()
|
||||||
|
|
||||||
serveMux.HandleFunc("/__health", srv.healthFn)
|
serveMux.HandleFunc("/__health", basicHealth)
|
||||||
|
|
||||||
hsrv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: ":" + strconv.Itoa(port),
|
Addr: ":" + strconv.Itoa(port),
|
||||||
ReadTimeout: 10 * time.Second,
|
ReadTimeout: 10 * time.Second,
|
||||||
WriteTimeout: 20 * time.Second,
|
WriteTimeout: 20 * time.Second,
|
||||||
@ -49,9 +30,9 @@ func (srv *Server) Listen(ctx context.Context, port int) error {
|
|||||||
g, ctx := errgroup.WithContext(ctx)
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
err := hsrv.ListenAndServe()
|
err := srv.ListenAndServe()
|
||||||
if err != http.ErrServerClosed {
|
if err != http.ErrServerClosed {
|
||||||
srv.log.Warn("health check server done listening", "err", err)
|
log.Warn("health check server done listening", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -63,8 +44,8 @@ func (srv *Server) Listen(ctx context.Context, port int) error {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
if err := hsrv.Shutdown(ctx); err != nil {
|
if err := srv.Shutdown(ctx); err != nil {
|
||||||
srv.log.Error("health check server shutdown failed", "err", err)
|
log.Error("health check server shutdown failed", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -73,14 +54,6 @@ func (srv *Server) Listen(ctx context.Context, port int) error {
|
|||||||
return g.Wait()
|
return g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HealthCheckListener runs simple http server on the specified port for
|
|
||||||
// health check probes
|
|
||||||
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
|
|
||||||
srv := NewServer(nil)
|
|
||||||
srv.SetLogger(log)
|
|
||||||
return srv.Listen(ctx, port)
|
|
||||||
}
|
|
||||||
|
|
||||||
func basicHealth(w http.ResponseWriter, r *http.Request) {
|
func basicHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
w.Write([]byte("ok"))
|
w.Write([]byte("ok"))
|
||||||
|
@ -12,7 +12,6 @@ import (
|
|||||||
|
|
||||||
"github.com/abh/certman"
|
"github.com/abh/certman"
|
||||||
"github.com/segmentio/kafka-go"
|
"github.com/segmentio/kafka-go"
|
||||||
"go.ntppool.org/common/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -204,17 +203,3 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
|
|||||||
|
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) CheckPartitions() error {
|
|
||||||
partitions, err := k.conn.ReadPartitions()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// should this result in an error?
|
|
||||||
if len(partitions) == 0 {
|
|
||||||
log := logger.Setup()
|
|
||||||
log.Info("kafka connection has no partitions")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user