package configuration import ( . "antoine-roux.tk/kafka/internal/kafka" . "antoine-roux.tk/kafka/internal/kafka/provider" "crypto/tls" "crypto/x509" "flag" "fmt" "github.com/Shopify/sarama" "gopkg.in/yaml.v3" "os" "path/filepath" "strings" ) const usage = ` Usage of %s: -c, --configuration path to yaml configuration file -h, --help prints help information ` type AuthType string const ( MTLS_AUTH AuthType = "mtls" SRAM_AUTH AuthType = "scram-sha-512" ) var configurationPath string type Configuration interface { IsVerbose() bool CreateApplication() (application *Application) } type kafkaConfiguration struct { Version string `yaml:"version"` Brokers string `yaml:"brokers"` AuthType AuthType `yaml:"auth-type"` UserName *string `yaml:"username"` Password *string `yaml:"password"` Cert *string `yaml:"cert"` Key *string `yaml:"key"` Ca string `yaml:"ca"` GroupName string `yaml:"group-name"` Topics struct { In string `yaml:"in"` Out string `yaml:"out"` } } type configuration struct { Verbose bool `yaml:"verbose"` Kafka kafkaConfiguration } func NewConfiguration() Configuration { flag.StringVar(&configurationPath, "configuration", "./configuration.yaml", "configuration path") flag.StringVar(&configurationPath, "c", "./configuration.yaml", "configuration path") flag.Usage = func() { fmt.Printf(usage, os.Args[0]) } flag.Parse() filename, _ := filepath.Abs(configurationPath) configurationFile, err := os.ReadFile(filename) if err != nil { panic("Invalid configuration file path, fail to read it") } var configuration configuration err = yaml.Unmarshal(configurationFile, &configuration) return &configuration } func (c *configuration) IsVerbose() bool { return c.Verbose } func (c *configuration) CreateApplication() (application *Application) { producerConfigurationFactory := newKafkaProducerConfigurationFactory(c.Kafka) consumerConfigurationFactory := newKafkaConsumerConfigurationFactory(c.Kafka) application = &Application{} application.ProducerProvider = NewProducerProvider( strings.Split(c.Kafka.Brokers, ","), producerConfigurationFactory, ) application.ConsumerProvider = NewConsumerProvider( strings.Split(c.Kafka.Brokers, ","), c.Kafka.GroupName, consumerConfigurationFactory, ) if len(c.Kafka.Topics.In) == 0 { panic("no input In topic given to be consumed, please set kafka.topics.in") } application.ReceiverTopic = Topic(c.Kafka.Topics.In) if len(c.Kafka.Topics.Out) == 0 { panic("no input Out topic given to produce to, please set kafka.topics.out") } application.EmitterTopic = Topic(c.Kafka.Topics.Out) return } func newKafkaConsumerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config { version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version) if err != nil { panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err)) } if len(kafkaConfiguration.Brokers) == 0 { panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file") } if kafkaConfiguration.AuthType == SRAM_AUTH { if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 { panic("no Kafka scram username defined, please set Kafka.username in configuration file") } if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 { panic("no Kafka scram password defined, please set Kafka.password in configuration file") } } else if kafkaConfiguration.AuthType == MTLS_AUTH { if kafkaConfiguration.Cert == nil { panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file") } if kafkaConfiguration.Key == nil { panic("no Kafka mtls key defined, please set Kafka.key in configuration file") } } else { panic("invalid kafka auth type in configuration file") } configuration := sarama.NewConfig() return func() *sarama.Config { configuration.Version = version configuration.ClientID = "client_id" configuration.Metadata.Full = true configuration.Net.MaxOpenRequests = 1 configuration.Net.TLS.Enable = true caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca)) configuration.Net.TLS.Config = &tls.Config{ RootCAs: caCertPool, InsecureSkipVerify: false, } if kafkaConfiguration.AuthType == SRAM_AUTH { configuration.Net.SASL.Enable = true configuration.Net.SASL.User = *kafkaConfiguration.UserName configuration.Net.SASL.Password = *kafkaConfiguration.Password configuration.Net.SASL.Handshake = true configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 } else { cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key)) if err != nil { panic("Invalid configuration failed to read client certificates") } configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert} } return configuration } } func newKafkaProducerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config { version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version) if err != nil { panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err)) } if len(kafkaConfiguration.Brokers) == 0 { panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file") } if kafkaConfiguration.AuthType == SRAM_AUTH { if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 { panic("no Kafka scram username defined, please set Kafka.username in configuration file") } if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 { panic("no Kafka scram password defined, please set Kafka.password in configuration file") } } else if kafkaConfiguration.AuthType == MTLS_AUTH { if kafkaConfiguration.Cert == nil { panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file") } if kafkaConfiguration.Key == nil { panic("no Kafka mtls key defined, please set Kafka.key in configuration file") } } else { panic("invalid kafka auth type in configuration file") } configuration := sarama.NewConfig() return func() *sarama.Config { configuration.Version = version configuration.ClientID = "client_id" configuration.Metadata.Full = true configuration.Producer.Idempotent = true configuration.Producer.Return.Errors = true configuration.Producer.RequiredAcks = sarama.WaitForAll configuration.Producer.Partitioner = sarama.NewRoundRobinPartitioner configuration.Producer.Transaction.Retry.Backoff = 10 configuration.Producer.Transaction.ID = "txn_producer" configuration.Producer.Retry.Max = 1 configuration.Producer.Return.Successes = true configuration.Net.MaxOpenRequests = 1 configuration.Net.TLS.Enable = true caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca)) configuration.Net.TLS.Config = &tls.Config{ RootCAs: caCertPool, InsecureSkipVerify: false, } if kafkaConfiguration.AuthType == SRAM_AUTH { configuration.Net.SASL.Enable = true configuration.Net.SASL.User = *kafkaConfiguration.UserName configuration.Net.SASL.Password = *kafkaConfiguration.Password configuration.Net.SASL.Handshake = true configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 } else { cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key)) if err != nil { panic("Invalid configuration failed to read client certificates") } configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert} } return configuration } } type Topic string type Application struct { ProducerProvider Provider[sarama.SyncProducer] ConsumerProvider Provider[sarama.ConsumerGroup] ReceiverTopic Topic EmitterTopic Topic }