kafka: CheckPartitions() method for health checks

This commit is contained in:
Ask Bjørn Hansen 2023-07-22 23:47:37 -07:00
parent 2a021b453d
commit e9d0f7419a
1 changed files with 15 additions and 0 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/abh/certman"
"github.com/segmentio/kafka-go"
"go.ntppool.org/common/logger"
)
const (
@ -203,3 +204,17 @@ func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
return w, nil
}
func (k *Kafka) CheckPartitions() error {
partitions, err := k.conn.ReadPartitions()
if err != nil {
return err
}
// should this result in an error?
if len(partitions) == 0 {
log := logger.Setup()
log.Info("kafka connection has no partitions")
}
return nil
}