kafka/internal/configuration/configuration.go

254 lines
7.9 KiB
Go

package configuration
import (
. "antoine-roux.tk/kafka/internal/kafka"
. "antoine-roux.tk/kafka/internal/kafka/provider"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"github.com/Shopify/sarama"
"gopkg.in/yaml.v3"
"os"
"path/filepath"
"strings"
)
const usage = `
Usage of %s:
-c, --configuration path to yaml configuration file
-h, --help prints help information
`
type AuthType string
const (
MTLS_AUTH AuthType = "mtls"
SRAM_AUTH AuthType = "scram-sha-512"
)
var configurationPath string
type Configuration interface {
IsVerbose() bool
CreateApplication() (application *Application)
}
type kafkaConfiguration struct {
Version string `yaml:"version"`
Brokers string `yaml:"brokers"`
AuthType AuthType `yaml:"auth-type"`
UserName *string `yaml:"username"`
Password *string `yaml:"password"`
Cert *string `yaml:"cert"`
Key *string `yaml:"key"`
Ca string `yaml:"ca"`
GroupName string `yaml:"group-name"`
Topics struct {
In string `yaml:"in"`
Out string `yaml:"out"`
}
}
type configuration struct {
Verbose bool `yaml:"verbose"`
Kafka kafkaConfiguration
}
func NewConfiguration() Configuration {
flag.StringVar(&configurationPath, "configuration", "./configuration.yaml", "configuration path")
flag.StringVar(&configurationPath, "c", "./configuration.yaml", "configuration path")
flag.Usage = func() { fmt.Printf(usage, os.Args[0]) }
flag.Parse()
filename, _ := filepath.Abs(configurationPath)
configurationFile, err := os.ReadFile(filename)
if err != nil {
panic("Invalid configuration file path, fail to read it")
}
var configuration configuration
err = yaml.Unmarshal(configurationFile, &configuration)
return &configuration
}
func (c *configuration) IsVerbose() bool {
return c.Verbose
}
func (c *configuration) CreateApplication() (application *Application) {
producerConfigurationFactory := newKafkaProducerConfigurationFactory(c.Kafka)
consumerConfigurationFactory := newKafkaConsumerConfigurationFactory(c.Kafka)
application = &Application{}
application.ProducerProvider = NewProducerProvider(
strings.Split(c.Kafka.Brokers, ","),
producerConfigurationFactory,
)
application.ConsumerProvider = NewConsumerProvider(
strings.Split(c.Kafka.Brokers, ","),
c.Kafka.GroupName,
consumerConfigurationFactory,
)
if len(c.Kafka.Topics.In) == 0 {
panic("no input In topic given to be consumed, please set kafka.topics.in")
}
application.ReceiverTopic = Topic(c.Kafka.Topics.In)
if len(c.Kafka.Topics.Out) == 0 {
panic("no input Out topic given to produce to, please set kafka.topics.out")
}
application.EmitterTopic = Topic(c.Kafka.Topics.Out)
return
}
func newKafkaConsumerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
if err != nil {
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
}
if len(kafkaConfiguration.Brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
}
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
}
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
if kafkaConfiguration.Cert == nil {
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
}
if kafkaConfiguration.Key == nil {
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
}
} else {
panic("invalid kafka auth type in configuration file")
}
configuration := sarama.NewConfig()
return func() *sarama.Config {
configuration.Version = version
configuration.ClientID = "client_id"
configuration.Metadata.Full = true
configuration.Net.MaxOpenRequests = 1
configuration.Net.TLS.Enable = true
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
configuration.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
configuration.Net.SASL.Enable = true
configuration.Net.SASL.User = *kafkaConfiguration.UserName
configuration.Net.SASL.Password = *kafkaConfiguration.Password
configuration.Net.SASL.Handshake = true
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else {
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
if err != nil {
panic("Invalid configuration failed to read client certificates")
}
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
}
return configuration
}
}
func newKafkaProducerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
if err != nil {
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
}
if len(kafkaConfiguration.Brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
}
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
}
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
if kafkaConfiguration.Cert == nil {
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
}
if kafkaConfiguration.Key == nil {
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
}
} else {
panic("invalid kafka auth type in configuration file")
}
configuration := sarama.NewConfig()
return func() *sarama.Config {
configuration.Version = version
configuration.ClientID = "client_id"
configuration.Metadata.Full = true
configuration.Producer.Idempotent = true
configuration.Producer.Return.Errors = true
configuration.Producer.RequiredAcks = sarama.WaitForAll
configuration.Producer.Partitioner = sarama.NewRoundRobinPartitioner
configuration.Producer.Transaction.Retry.Backoff = 10
configuration.Producer.Transaction.ID = "txn_producer"
configuration.Producer.Retry.Max = 1
configuration.Producer.Return.Successes = true
configuration.Net.MaxOpenRequests = 1
configuration.Net.TLS.Enable = true
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
configuration.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
configuration.Net.SASL.Enable = true
configuration.Net.SASL.User = *kafkaConfiguration.UserName
configuration.Net.SASL.Password = *kafkaConfiguration.Password
configuration.Net.SASL.Handshake = true
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else {
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
if err != nil {
panic("Invalid configuration failed to read client certificates")
}
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
}
return configuration
}
}
type Topic string
type Application struct {
ProducerProvider Provider[sarama.SyncProducer]
ConsumerProvider Provider[sarama.ConsumerGroup]
ReceiverTopic Topic
EmitterTopic Topic
}