From bb99ca584394a6515be2c9cd3a7f2aec83978029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sun, 2 Jul 2023 23:22:58 -0700 Subject: [PATCH] kafka: Add reader helper --- kafka/kafka.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/kafka/kafka.go b/kafka/kafka.go index b5015cf..701c1dd 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -118,7 +118,7 @@ func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) { } func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { - l := log.New(os.Stdout, "kafka reader: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) + l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) k := &Kafka{ l: l, @@ -172,17 +172,26 @@ func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { return k, nil } -func (k *Kafka) Writer(topic string) (*kafka.Writer, error) { +func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader { + config.Brokers = k.brokerAddrs() + config.Dialer = k.dialer + return kafka.NewReader(config) +} + +func (k *Kafka) brokerAddrs() []string { addrs := []string{} - for _, b := range k.brokers { addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port)) } + return addrs +} + +func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) { // https://pkg.go.dev/github.com/segmentio/kafka-go#Writer w := &kafka.Writer{ - Addr: kafka.TCP(addrs...), + Addr: kafka.TCP(k.brokerAddrs()...), Transport: k.transport, Topic: topic, Balancer: &kafka.LeastBytes{},