package kafconn

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"errors"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/abh/certman"
	"github.com/segmentio/kafka-go"
	"go.ntppool.org/common/logger"
)

const (
	// bootstrapAddress = "logs-kafka-bootstrap:9093"
	bootstrapAddress = "logs-kafka-0.logs-kafka-brokers.ntppipeline.svc.cluster.local:9093"
	kafkaGroup       = "logferry-api"

	// kafkaMaxBatchSize = 250000
	// kafkaMinBatchSize = 1000
)

type TLSSetup struct {
	CA   string
	Key  string
	Cert string
}

type Kafka struct {
	tls TLSSetup

	transport *kafka.Transport
	brokers   []kafka.Broker

	dialer *kafka.Dialer
	conn   *kafka.Conn

	l *log.Logger

	// wr *kafka.Writer

}

func (k *Kafka) tlsConfig() (*tls.Config, error) {

	cm, err := certman.New(k.tls.Cert, k.tls.Key)
	if err != nil {
		return nil, err
	}
	cm.Logger(k.l)

	err = cm.Watch()
	if err != nil {
		return nil, err
	}

	capool, err := k.caPool()
	if err != nil {
		return nil, err
	}

	tlsConfig := &tls.Config{
		InsecureSkipVerify: true,
		// MinVersion:           tls.VersionTLS12,
		RootCAs:              capool,
		GetClientCertificate: cm.GetClientCertificate,
	}
	return tlsConfig, nil
}

func (k *Kafka) caPool() (*x509.CertPool, error) {
	capool := x509.NewCertPool()

	pem, err := os.ReadFile(k.tls.CA)
	if err != nil {
		return nil, err
	}

	if !capool.AppendCertsFromPEM(pem) {
		return nil, errors.New("credentials: failed to append certificates")
	}

	return capool, nil
}

func (k *Kafka) kafkaDialer() (*kafka.Dialer, error) {
	tlsConfig, err := k.tlsConfig()
	if err != nil {
		return nil, err
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: false,
		TLS:       tlsConfig,
	}

	return dialer, nil
}

func (k *Kafka) kafkaTransport(ctx context.Context) (*kafka.Transport, error) {
	tlsConfig, err := k.tlsConfig()
	if err != nil {
		return nil, err
	}

	transport := &kafka.Transport{
		DialTimeout: 10 * time.Second,
		IdleTimeout: 60 * time.Second,
		TLS:         tlsConfig,
		Context:     ctx,
	}

	return transport, nil
}

func NewKafka(ctx context.Context, tls TLSSetup) (*Kafka, error) {
	l := log.New(os.Stdout, "kafka: ", log.Ldate|log.Ltime|log.LUTC|log.Lmsgprefix|log.Lmicroseconds)

	k := &Kafka{
		l:   l,
		tls: tls,
	}

	if len(k.tls.CA) == 0 || len(k.tls.Cert) == 0 {
		return nil, fmt.Errorf("tls setup missing")
	}

	dialer, err := k.kafkaDialer()
	if err != nil {
		return nil, err
	}
	k.dialer = dialer

	transport, err := k.kafkaTransport(ctx)
	if err != nil {
		return nil, err
	}
	k.transport = transport

	conn, err := dialer.DialContext(ctx, "tcp", bootstrapAddress)
	if err != nil {
		return nil, fmt.Errorf("kafka conn error: %s", err)
	}

	k.conn = conn

	brokers, err := conn.Brokers()
	if err != nil {
		return nil, fmt.Errorf("could not get brokers: %s", err)
	}
	k.brokers = brokers

	for _, b := range brokers {
		l.Printf("broker host: %s", b.Host)
	}

	// wr := kafka.NewWriter(
	// 	kafka.WriterConfig{
	// 		Brokers: []string{brokerAddress},
	// 		Topic:   kafkaTopic,
	// 		Dialer:  dialer,
	// 		Logger:  l,
	// 	},
	// )

	// k.wr = wr

	return k, nil
}

func (k *Kafka) NewReader(config kafka.ReaderConfig) *kafka.Reader {
	config.Brokers = k.brokerAddrs()
	config.Dialer = k.dialer

	return kafka.NewReader(config)
}

func (k *Kafka) brokerAddrs() []string {
	addrs := []string{}
	for _, b := range k.brokers {
		addrs = append(addrs, fmt.Sprintf("%s:%d", b.Host, b.Port))
	}
	return addrs
}

func (k *Kafka) NewWriter(topic string) (*kafka.Writer, error) {

	// https://pkg.go.dev/github.com/segmentio/kafka-go#Writer
	w := &kafka.Writer{
		Addr:        kafka.TCP(k.brokerAddrs()...),
		Transport:   k.transport,
		Topic:       topic,
		Balancer:    &kafka.LeastBytes{},
		BatchSize:   2000,
		Compression: kafka.Lz4,
		// Logger:      k.l,
		ErrorLogger: k.l,
	}

	return w, nil
}

func (k *Kafka) CheckPartitions() error {
	partitions, err := k.conn.ReadPartitions()
	if err != nil {
		return err
	}

	// should this result in an error?
	if len(partitions) == 0 {
		log := logger.Setup()
		log.Info("kafka connection has no partitions")
	}
	return nil
}