package kafconn import ( "context" "crypto/tls" "crypto/x509" "errors" "fmt" "log" "os" "time" "github.com/abh/certman" "github.com/segmentio/kafka-go" ) const ( // bootstrapAddress = "logs-kafka-bootstrap:9093" bootstrapAddress = "logs-kafka-0.logs-kafka-brokers.ntppipeline.svc.cluster.local:9093" kafkaGroup = "logferry-api" // kafkaMaxBatchSize = 250000 // kafkaMinBatchSize = 1000 ) type TLSSetup struct { CA string Key string Cert string } type Kafka struct { tls TLSSetup transport *kafka.Transport brokers []kafka.Broker dialer *kafka.Dialer conn *kafka.Conn l *log.Logger // wr *kafka.Writer } func (k *Kafka) tlsConfig() (*tls.Config, error) { cm, err := certman.New(k.tls.Cert, k.tls.Key) if err != nil { return nil, err } cm.Logger(k.l) err = cm.Watch() if err != nil { return nil, err } capool, err := k.caPool() if err != nil { return nil, err } tlsConfig := &tls.Config{ InsecureSkipVerify: true, // MinVersion: tls.VersionTLS12, RootCAs: capool, GetClientCertificate: cm.GetClientCertificate, } return tlsConfig, nil } func (k *Kafka) caPool() (*x509.CertPool, error) { capool := x509.NewCertPool() pem, err := os.ReadFile(k.tls.CA) if err != nil { return nil, err } if !capool.AppendCertsFromPEM(pem) { return nil, errors.New("credentials: failed to append certificates") } return capool, nil } func (k *Kafka) kafkaDialer() (*kafka.Dialer, error) { tlsConfig, err := k.tlsConfig() if err != nil { return nil, err } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: false, TLS: tlsConfig, } return dialer, nil } func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) { tlsConfig, err := k.tlsConfig() if err != nil { return nil, err } transport := &kafka.Transport{ DialTimeout: 10 * time.Second, IdleTimeout: 60 * time.Second, TLS: tlsConfig, Context: ctx, } return transport, nil } func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { l := log.New(os.Stdout, "kafka reader: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) k := &Kafka{ l: l, tls: tls, } if len(k.tls.CA) == 0 || len(k.tls.Cert) == 0 { return nil, fmt.Errorf("tls setup missing") } dialer, err := k.kafkaDialer() if err != nil { return nil, err } k.dialer = dialer transport, err := k.kafkaTransport(ctx) if err != nil { return nil, err } k.transport = transport conn, err := dialer.DialContext(ctx, "tcp", bootstrapAddress) if err != nil { return nil, fmt.Errorf("kafka conn error: %s", err) } k.conn = conn brokers, err := conn.Brokers() if err != nil { return nil, fmt.Errorf("could not get brokers: %s", err) } k.brokers = brokers for _, b := range brokers { l.Printf("broker host: %s", b.Host) } // wr := kafka.NewWriter( // kafka.WriterConfig{ // Brokers: []string{brokerAddress}, // Topic: kafkaTopic, // Dialer: dialer, // Logger: l, // }, // ) // k.wr = wr return k, nil } func (k *Kafka) Writer(topic string) (*kafka.Writer, error) { addrs := []string{} for _, b := range k.brokers { addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port)) } // https://pkg.go.dev/github.com/segmentio/kafka-go#Writer w := &kafka.Writer{ Addr: kafka.TCP(addrs...), Transport: k.transport, Topic: topic, Balancer: &kafka.LeastBytes{}, BatchSize: 2000, Compression: kafka.Snappy, Logger: k.l, ErrorLogger: k.l, } return w, nil }