Compare commits
	
		
			2 Commits
		
	
	
		
			2a021b453d
			...
			be9b63f382
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| be9b63f382 | |||
| e9d0f7419a | 
@@ -6,20 +6,39 @@ import (
 | 
				
			|||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"go.ntppool.org/common/logger"
 | 
				
			||||||
	"golang.org/x/exp/slog"
 | 
						"golang.org/x/exp/slog"
 | 
				
			||||||
	"golang.org/x/sync/errgroup"
 | 
						"golang.org/x/sync/errgroup"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// HealthCheckListener runs simple http server on the specified port for
 | 
					type Server struct {
 | 
				
			||||||
// health check probes
 | 
						log      *slog.Logger
 | 
				
			||||||
func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
 | 
						healthFn http.HandlerFunc
 | 
				
			||||||
	log.Info("Starting health listener", "port", port)
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewServer(healthFn http.HandlerFunc) *Server {
 | 
				
			||||||
 | 
						if healthFn == nil {
 | 
				
			||||||
 | 
							healthFn = basicHealth
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						srv := &Server{
 | 
				
			||||||
 | 
							log:      logger.Setup(),
 | 
				
			||||||
 | 
							healthFn: healthFn,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return srv
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (srv *Server) SetLogger(log *slog.Logger) {
 | 
				
			||||||
 | 
						srv.log = log
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (srv *Server) Listen(ctx context.Context, port int) error {
 | 
				
			||||||
 | 
						srv.log.Info("Starting health listener", "port", port)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	serveMux := http.NewServeMux()
 | 
						serveMux := http.NewServeMux()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	serveMux.HandleFunc("/__health", basicHealth)
 | 
						serveMux.HandleFunc("/__health", srv.healthFn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	srv := &http.Server{
 | 
						hsrv := &http.Server{
 | 
				
			||||||
		Addr:         ":" + strconv.Itoa(port),
 | 
							Addr:         ":" + strconv.Itoa(port),
 | 
				
			||||||
		ReadTimeout:  10 * time.Second,
 | 
							ReadTimeout:  10 * time.Second,
 | 
				
			||||||
		WriteTimeout: 20 * time.Second,
 | 
							WriteTimeout: 20 * time.Second,
 | 
				
			||||||
@@ -30,9 +49,9 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
 | 
				
			|||||||
	g, ctx := errgroup.WithContext(ctx)
 | 
						g, ctx := errgroup.WithContext(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	g.Go(func() error {
 | 
						g.Go(func() error {
 | 
				
			||||||
		err := srv.ListenAndServe()
 | 
							err := hsrv.ListenAndServe()
 | 
				
			||||||
		if err != http.ErrServerClosed {
 | 
							if err != http.ErrServerClosed {
 | 
				
			||||||
			log.Warn("health check server done listening", "err", err)
 | 
								srv.log.Warn("health check server done listening", "err", err)
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
@@ -44,8 +63,8 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
 | 
				
			|||||||
	defer cancel()
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	g.Go(func() error {
 | 
						g.Go(func() error {
 | 
				
			||||||
		if err := srv.Shutdown(ctx); err != nil {
 | 
							if err := hsrv.Shutdown(ctx); err != nil {
 | 
				
			||||||
			log.Error("health check server shutdown failed", "err", err)
 | 
								srv.log.Error("health check server shutdown failed", "err", err)
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
@@ -54,6 +73,14 @@ func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error
 | 
				
			|||||||
	return g.Wait()
 | 
						return g.Wait()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// HealthCheckListener runs simple http server on the specified port for
 | 
				
			||||||
 | 
					// health check probes
 | 
				
			||||||
 | 
					func HealthCheckListener(ctx context.Context, port int, log *slog.Logger) error {
 | 
				
			||||||
 | 
						srv := NewServer(nil)
 | 
				
			||||||
 | 
						srv.SetLogger(log)
 | 
				
			||||||
 | 
						return srv.Listen(ctx, port)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func basicHealth(w http.ResponseWriter, r *http.Request) {
 | 
					func basicHealth(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
	w.WriteHeader(200)
 | 
						w.WriteHeader(200)
 | 
				
			||||||
	w.Write([]byte("ok"))
 | 
						w.Write([]byte("ok"))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -12,6 +12,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"github.com/abh/certman"
 | 
						"github.com/abh/certman"
 | 
				
			||||||
	"github.com/segmentio/kafka-go"
 | 
						"github.com/segmentio/kafka-go"
 | 
				
			||||||
 | 
						"go.ntppool.org/common/logger"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
@@ -203,3 +204,17 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return w, nil
 | 
						return w, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (k *Kafka) CheckPartitions() error {
 | 
				
			||||||
 | 
						partitions, err := k.conn.ReadPartitions()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// should this result in an error?
 | 
				
			||||||
 | 
						if len(partitions) == 0 {
 | 
				
			||||||
 | 
							log := logger.Setup()
 | 
				
			||||||
 | 
							log.Info("kafka connection has no partitions")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user