// Package kafconn provides a Kafka client wrapper with TLS support for secure log streaming. // // This package handles Kafka connections with mutual TLS authentication for the NTP Pool // project's log streaming infrastructure. It provides factories for creating Kafka readers // and writers with automatic broker discovery, TLS configuration, and connection management. // // The package is designed specifically for the NTP Pool pipeline infrastructure and includes // hardcoded bootstrap servers and group configurations. It uses certman for automatic // certificate renewal and provides compression and batching optimizations. // // Key features: // - Mutual TLS authentication with automatic certificate renewal // - Broker discovery and connection pooling // - Reader and writer factory methods with optimized configurations // - LZ4 compression for efficient data transfer // - Configurable batch sizes and load balancing // // Example usage: // // tlsSetup := kafconn.TLSSetup{ // CA: "/path/to/ca.pem", // Cert: "/path/to/client.pem", // Key: "/path/to/client.key", // } // kafka, err := kafconn.NewKafka(ctx, tlsSetup) // if err != nil { // log.Fatal(err) // } // writer, err := kafka.NewWriter("logs") package kafconn import ( "context" "crypto/tls" "crypto/x509" "errors" "fmt" "log" "os" "time" "github.com/abh/certman" "github.com/segmentio/kafka-go" "go.ntppool.org/common/logger" ) const ( // bootstrapAddress = "logs-kafka-bootstrap:9093" bootstrapAddress = "logs-kafka-0.logs-kafka-brokers.ntppipeline.svc.cluster.local:9093" kafkaGroup = "logferry-api" // kafkaMaxBatchSize = 250000 // kafkaMinBatchSize = 1000 ) // TLSSetup contains file paths for TLS certificate configuration. // All fields are required for establishing secure Kafka connections. type TLSSetup struct { CA string // Path to CA certificate file for server verification Key string // Path to client private key file Cert string // Path to client certificate file } // Kafka represents a configured Kafka client with TLS support. // It manages connections, brokers, and provides factory methods for readers and writers. // The client handles broker discovery, connection pooling, and TLS configuration automatically. type Kafka struct { tls TLSSetup transport *kafka.Transport brokers []kafka.Broker dialer *kafka.Dialer conn *kafka.Conn l *log.Logger // wr *kafka.Writer } func (k *Kafka) tlsConfig() (*tls.Config, error) { cm, err := certman.New(k.tls.Cert, k.tls.Key) if err != nil { return nil, err } cm.Logger(k.l) err = cm.Watch() if err != nil { return nil, err } capool, err := k.caPool() if err != nil { return nil, err } tlsConfig := &tls.Config{ InsecureSkipVerify: true, // MinVersion: tls.VersionTLS12, RootCAs: capool, GetClientCertificate: cm.GetClientCertificate, } return tlsConfig, nil } func (k *Kafka) caPool() (*x509.CertPool, error) { capool := x509.NewCertPool() pem, err := os.ReadFile(k.tls.CA) if err != nil { return nil, err } if !capool.AppendCertsFromPEM(pem) { return nil, errors.New("credentials: failed to append certificates") } return capool, nil } func (k *Kafka) kafkaDialer() (*kafka.Dialer, error) { tlsConfig, err := k.tlsConfig() if err != nil { return nil, err } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: false, TLS: tlsConfig, } return dialer, nil } func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) { tlsConfig, err := k.tlsConfig() if err != nil { return nil, err } transport := &kafka.Transport{ DialTimeout: 10 * time.Second, IdleTimeout: 60 * time.Second, TLS: tlsConfig, Context: ctx, } return transport, nil } // NewKafka creates a new Kafka client with TLS configuration and establishes initial connections. // It performs broker discovery, validates TLS certificates, and prepares the client for creating // readers and writers. // // The function validates TLS configuration, establishes a connection to the bootstrap server, // discovers all available brokers, and configures transport layers for optimal performance. // // Parameters: // - ctx: Context for connection establishment and timeouts // - tls: TLS configuration with paths to CA, certificate, and key files // // Returns a configured Kafka client ready for creating readers and writers, or an error // if TLS setup fails, connection cannot be established, or broker discovery fails. func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) { l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds) k := &Kafka{ l: l, tls: tls, } if len(k.tls.CA) == 0 || len(k.tls.Cert) == 0 { return nil, fmt.Errorf("tls setup missing") } dialer, err := k.kafkaDialer() if err != nil { return nil, err } k.dialer = dialer transport, err := k.kafkaTransport(ctx) if err != nil { return nil, err } k.transport = transport conn, err := dialer.DialContext(ctx, "tcp", bootstrapAddress) if err != nil { return nil, fmt.Errorf("kafka conn error: %s", err) } k.conn = conn brokers, err := conn.Brokers() if err != nil { return nil, fmt.Errorf("could not get brokers: %s", err) } k.brokers = brokers for _, b := range brokers { l.Printf("broker host: %s", b.Host) } // wr := kafka.NewWriter( // kafka.WriterConfig{ // Brokers: []string{brokerAddress}, // Topic: kafkaTopic, // Dialer: dialer, // Logger: l, // }, // ) // k.wr = wr return k, nil } // NewReader creates a new Kafka reader with the client's broker list and TLS configuration. // The provided config is enhanced with the discovered brokers and configured dialer. // The reader supports automatic offset management, consumer group coordination, and reconnection. // // The caller should configure the reader's Topic, GroupID, and other consumer-specific settings // in the provided config. The client automatically sets Brokers and Dialer fields. func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader { config.Brokers = k.brokerAddrs() config.Dialer = k.dialer return kafka.NewReader(config) } func (k *Kafka) brokerAddrs() []string { addrs := []string{} for _, b := range k.brokers { addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port)) } return addrs } // NewWriter creates a new Kafka writer for the specified topic with optimized configuration. // The writer uses LZ4 compression, least-bytes load balancing, and batching for performance. // // Configuration includes: // - Batch size: 2000 messages for efficient throughput // - Compression: LZ4 for fast compression with good ratios // - Balancer: LeastBytes for optimal partition distribution // - Transport: TLS-configured transport with connection pooling // // The writer is ready for immediate use and handles connection management automatically. func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) { // https://pkg.go.dev/github.com/segmentio/kafka-go#Writer w := &kafka.Writer{ Addr: kafka.TCP(k.brokerAddrs()...), Transport: k.transport, Topic: topic, Balancer: &kafka.LeastBytes{}, BatchSize: 2000, Compression: kafka.Lz4, // Logger: k.l, ErrorLogger: k.l, } return w, nil } // CheckPartitions verifies that the Kafka connection can read partition metadata. // This method is useful for health checks and connection validation. // // Returns an error if partition metadata cannot be retrieved, which typically // indicates connection problems, authentication failures, or broker unavailability. // Logs a warning if no partitions are available but does not return an error. func (k *Kafka) CheckPartitions() error { partitions, err := k.conn.ReadPartitions() if err != nil { return err } // should this result in an error? if len(partitions) == 0 { log := logger.Setup() log.Info("kafka connection has no partitions") } return nil }