diff --git a/kafka/kafka.go b/kafka/kafka.go index b5015cf..701c1dd 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -118,7 +118,7 @@ func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) { } func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { - l := log.New(os.Stdout, "kafka reader: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) + l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) k := &Kafka{ l: l, @@ -172,17 +172,26 @@ func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { return k, nil } -func (k *Kafka) Writer(topic string) (*kafka.Writer, error) { +func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader { + config.Brokers = k.brokerAddrs() + config.Dialer = k.dialer + return kafka.NewReader(config) +} + +func (k *Kafka) brokerAddrs() []string { addrs := []string{} - for _, b := range k.brokers { addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port)) } + return addrs +} + +func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) { // https://pkg.go.dev/github.com/segmentio/kafka-go#Writer w := &kafka.Writer{ - Addr: kafka.TCP(addrs...), + Addr: kafka.TCP(k.brokerAddrs()...), Transport: k.transport, Topic: topic, Balancer: &kafka.LeastBytes{},