From e9d0f7419ae39407a00cd1f648923b2776900204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sat, 22 Jul 2023 23:47:37 -0700 Subject: [PATCH] kafka: CheckPartitions() method for health checks --- kafka/kafka.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/kafka/kafka.go b/kafka/kafka.go index e78f768..aa263cf 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -12,6 +12,7 @@ import ( "github.com/abh/certman" "github.com/segmentio/kafka-go" + "go.ntppool.org/common/logger" ) const ( @@ -203,3 +204,17 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) { return w, nil } + +func (k *Kafka) CheckPartitions() error { + partitions, err := k.conn.ReadPartitions() + if err != nil { + return err + } + + // should this result in an error? + if len(partitions) == 0 { + log := logger.Setup() + log.Info("kafka connection has no partitions") + } + return nil +}