common/kafka/kafka.go

221 lines
4.0 KiB
Go

package kafconn
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log"
"os"
"time"
"github.com/abh/certman"
"github.com/segmentio/kafka-go"
"go.ntppool.org/common/logger"
)
const (
// bootstrapAddress = "logs-kafka-bootstrap:9093"
bootstrapAddress = "logs-kafka-0.logs-kafka-brokers.ntppipeline.svc.cluster.local:9093"
kafkaGroup = "logferry-api"
// kafkaMaxBatchSize = 250000
// kafkaMinBatchSize = 1000
)
type TLSSetup struct {
CA string
Key string
Cert string
}
type Kafka struct {
tls TLSSetup
transport *kafka.Transport
brokers []kafka.Broker
dialer *kafka.Dialer
conn *kafka.Conn
l *log.Logger
// wr *kafka.Writer
}
func (k *Kafka) tlsConfig() (*tls.Config, error) {
cm, err := certman.New(k.tls.Cert, k.tls.Key)
if err != nil {
return nil, err
}
cm.Logger(k.l)
err = cm.Watch()
if err != nil {
return nil, err
}
capool, err := k.caPool()
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
// MinVersion: tls.VersionTLS12,
RootCAs: capool,
GetClientCertificate: cm.GetClientCertificate,
}
return tlsConfig, nil
}
func (k *Kafka) caPool() (*x509.CertPool, error) {
capool := x509.NewCertPool()
pem, err := os.ReadFile(k.tls.CA)
if err != nil {
return nil, err
}
if !capool.AppendCertsFromPEM(pem) {
return nil, errors.New("credentials: failed to append certificates")
}
return capool, nil
}
func (k *Kafka) kafkaDialer() (*kafka.Dialer, error) {
tlsConfig, err := k.tlsConfig()
if err != nil {
return nil, err
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: false,
TLS: tlsConfig,
}
return dialer, nil
}
func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) {
tlsConfig, err := k.tlsConfig()
if err != nil {
return nil, err
}
transport := &kafka.Transport{
DialTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
TLS: tlsConfig,
Context: ctx,
}
return transport, nil
}
func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)
k := &Kafka{
l: l,
tls: tls,
}
if len(k.tls.CA) == 0 || len(k.tls.Cert) == 0 {
return nil, fmt.Errorf("tls setup missing")
}
dialer, err := k.kafkaDialer()
if err != nil {
return nil, err
}
k.dialer = dialer
transport, err := k.kafkaTransport(ctx)
if err != nil {
return nil, err
}
k.transport = transport
conn, err := dialer.DialContext(ctx, "tcp", bootstrapAddress)
if err != nil {
return nil, fmt.Errorf("kafka conn error: %s", err)
}
k.conn = conn
brokers, err := conn.Brokers()
if err != nil {
return nil, fmt.Errorf("could not get brokers: %s", err)
}
k.brokers = brokers
for _, b := range brokers {
l.Printf("broker host: %s", b.Host)
}
// wr := kafka.NewWriter(
// kafka.WriterConfig{
// Brokers: []string{brokerAddress},
// Topic: kafkaTopic,
// Dialer: dialer,
// Logger: l,
// },
// )
// k.wr = wr
return k, nil
}
func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader {
config.Brokers = k.brokerAddrs()
config.Dialer = k.dialer
return kafka.NewReader(config)
}
func (k *Kafka) brokerAddrs() []string {
addrs := []string{}
for _, b := range k.brokers {
addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port))
}
return addrs
}
func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {
// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
w := &kafka.Writer{
Addr: kafka.TCP(k.brokerAddrs()...),
Transport: k.transport,
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 2000,
Compression: kafka.Lz4,
// Logger: k.l,
ErrorLogger: k.l,
}
return w, nil
}
func (k *Kafka) CheckPartitions() error {
partitions, err := k.conn.ReadPartitions()
if err != nil {
return err
}
// should this result in an error?
if len(partitions) == 0 {
log := logger.Setup()
log.Info("kafka connection has no partitions")
}
return nil
}