kafka: Add reader helper

This commit is contained in:
Ask Bjørn Hansen 2023-07-02 23:22:58 -07:00
parent 57cfb90777
commit bb99ca5843

View File

@ -118,7 +118,7 @@ func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) {
}
func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
l := log.New(os.Stdout, "kafka reader: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)
l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)
k := &Kafka{
l: l,
@ -172,17 +172,26 @@ func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
return k, nil
}
func (k *Kafka) Writer(topic string) (*kafka.Writer, error) {
func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader {
config.Brokers = k.brokerAddrs()
config.Dialer = k.dialer
return kafka.NewReader(config)
}
func (k *Kafka) brokerAddrs() []string {
addrs := []string{}
for _, b := range k.brokers {
addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port))
}
return addrs
}
func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
w := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Addr: kafka.TCP(k.brokerAddrs()...),
Transport: k.transport,
Topic: topic,
Balancer: &kafka.LeastBytes{},