254 lines
7.9 KiB
Go
254 lines
7.9 KiB
Go
|
package configuration
|
||
|
|
||
|
import (
|
||
|
. "antoine-roux.tk/kafka/internal/kafka"
|
||
|
. "antoine-roux.tk/kafka/internal/kafka/provider"
|
||
|
"crypto/tls"
|
||
|
"crypto/x509"
|
||
|
"flag"
|
||
|
"fmt"
|
||
|
"github.com/Shopify/sarama"
|
||
|
"gopkg.in/yaml.v3"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
)
|
||
|
|
||
|
const usage = `
|
||
|
Usage of %s:
|
||
|
-c, --configuration path to yaml configuration file
|
||
|
-h, --help prints help information
|
||
|
`
|
||
|
|
||
|
type AuthType string
|
||
|
|
||
|
const (
|
||
|
MTLS_AUTH AuthType = "mtls"
|
||
|
SRAM_AUTH AuthType = "scram-sha-512"
|
||
|
)
|
||
|
|
||
|
var configurationPath string
|
||
|
|
||
|
type Configuration interface {
|
||
|
IsVerbose() bool
|
||
|
CreateApplication() (application *Application)
|
||
|
}
|
||
|
|
||
|
type kafkaConfiguration struct {
|
||
|
Version string `yaml:"version"`
|
||
|
Brokers string `yaml:"brokers"`
|
||
|
AuthType AuthType `yaml:"auth-type"`
|
||
|
UserName *string `yaml:"username"`
|
||
|
Password *string `yaml:"password"`
|
||
|
Cert *string `yaml:"cert"`
|
||
|
Key *string `yaml:"key"`
|
||
|
Ca string `yaml:"ca"`
|
||
|
GroupName string `yaml:"group-name"`
|
||
|
Topics struct {
|
||
|
In string `yaml:"in"`
|
||
|
Out string `yaml:"out"`
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type configuration struct {
|
||
|
Verbose bool `yaml:"verbose"`
|
||
|
Kafka kafkaConfiguration
|
||
|
}
|
||
|
|
||
|
func NewConfiguration() Configuration {
|
||
|
|
||
|
flag.StringVar(&configurationPath, "configuration", "./configuration.yaml", "configuration path")
|
||
|
flag.StringVar(&configurationPath, "c", "./configuration.yaml", "configuration path")
|
||
|
flag.Usage = func() { fmt.Printf(usage, os.Args[0]) }
|
||
|
flag.Parse()
|
||
|
|
||
|
filename, _ := filepath.Abs(configurationPath)
|
||
|
|
||
|
configurationFile, err := os.ReadFile(filename)
|
||
|
if err != nil {
|
||
|
panic("Invalid configuration file path, fail to read it")
|
||
|
}
|
||
|
|
||
|
var configuration configuration
|
||
|
|
||
|
err = yaml.Unmarshal(configurationFile, &configuration)
|
||
|
return &configuration
|
||
|
}
|
||
|
|
||
|
func (c *configuration) IsVerbose() bool {
|
||
|
return c.Verbose
|
||
|
}
|
||
|
|
||
|
func (c *configuration) CreateApplication() (application *Application) {
|
||
|
producerConfigurationFactory := newKafkaProducerConfigurationFactory(c.Kafka)
|
||
|
consumerConfigurationFactory := newKafkaConsumerConfigurationFactory(c.Kafka)
|
||
|
|
||
|
application = &Application{}
|
||
|
|
||
|
application.ProducerProvider = NewProducerProvider(
|
||
|
strings.Split(c.Kafka.Brokers, ","),
|
||
|
producerConfigurationFactory,
|
||
|
)
|
||
|
application.ConsumerProvider = NewConsumerProvider(
|
||
|
strings.Split(c.Kafka.Brokers, ","),
|
||
|
c.Kafka.GroupName,
|
||
|
consumerConfigurationFactory,
|
||
|
)
|
||
|
|
||
|
if len(c.Kafka.Topics.In) == 0 {
|
||
|
panic("no input In topic given to be consumed, please set kafka.topics.in")
|
||
|
}
|
||
|
application.ReceiverTopic = Topic(c.Kafka.Topics.In)
|
||
|
|
||
|
if len(c.Kafka.Topics.Out) == 0 {
|
||
|
panic("no input Out topic given to produce to, please set kafka.topics.out")
|
||
|
}
|
||
|
application.EmitterTopic = Topic(c.Kafka.Topics.Out)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func newKafkaConsumerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
|
||
|
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
|
||
|
}
|
||
|
if len(kafkaConfiguration.Brokers) == 0 {
|
||
|
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
|
||
|
}
|
||
|
|
||
|
if kafkaConfiguration.AuthType == SRAM_AUTH {
|
||
|
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
|
||
|
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
|
||
|
}
|
||
|
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
|
||
|
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
|
||
|
}
|
||
|
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
|
||
|
if kafkaConfiguration.Cert == nil {
|
||
|
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
|
||
|
}
|
||
|
if kafkaConfiguration.Key == nil {
|
||
|
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
|
||
|
}
|
||
|
} else {
|
||
|
panic("invalid kafka auth type in configuration file")
|
||
|
}
|
||
|
|
||
|
configuration := sarama.NewConfig()
|
||
|
|
||
|
return func() *sarama.Config {
|
||
|
configuration.Version = version
|
||
|
configuration.ClientID = "client_id"
|
||
|
configuration.Metadata.Full = true
|
||
|
|
||
|
configuration.Net.MaxOpenRequests = 1
|
||
|
configuration.Net.TLS.Enable = true
|
||
|
|
||
|
caCertPool := x509.NewCertPool()
|
||
|
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
|
||
|
configuration.Net.TLS.Config = &tls.Config{
|
||
|
RootCAs: caCertPool,
|
||
|
InsecureSkipVerify: false,
|
||
|
}
|
||
|
|
||
|
if kafkaConfiguration.AuthType == SRAM_AUTH {
|
||
|
configuration.Net.SASL.Enable = true
|
||
|
configuration.Net.SASL.User = *kafkaConfiguration.UserName
|
||
|
configuration.Net.SASL.Password = *kafkaConfiguration.Password
|
||
|
configuration.Net.SASL.Handshake = true
|
||
|
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
|
||
|
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||
|
} else {
|
||
|
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
|
||
|
if err != nil {
|
||
|
panic("Invalid configuration failed to read client certificates")
|
||
|
}
|
||
|
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
|
||
|
}
|
||
|
|
||
|
return configuration
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newKafkaProducerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
|
||
|
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
|
||
|
}
|
||
|
if len(kafkaConfiguration.Brokers) == 0 {
|
||
|
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
|
||
|
}
|
||
|
|
||
|
if kafkaConfiguration.AuthType == SRAM_AUTH {
|
||
|
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
|
||
|
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
|
||
|
}
|
||
|
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
|
||
|
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
|
||
|
}
|
||
|
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
|
||
|
if kafkaConfiguration.Cert == nil {
|
||
|
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
|
||
|
}
|
||
|
if kafkaConfiguration.Key == nil {
|
||
|
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
|
||
|
}
|
||
|
} else {
|
||
|
panic("invalid kafka auth type in configuration file")
|
||
|
}
|
||
|
|
||
|
configuration := sarama.NewConfig()
|
||
|
|
||
|
return func() *sarama.Config {
|
||
|
configuration.Version = version
|
||
|
configuration.ClientID = "client_id"
|
||
|
configuration.Metadata.Full = true
|
||
|
|
||
|
configuration.Producer.Idempotent = true
|
||
|
configuration.Producer.Return.Errors = true
|
||
|
configuration.Producer.RequiredAcks = sarama.WaitForAll
|
||
|
configuration.Producer.Partitioner = sarama.NewRoundRobinPartitioner
|
||
|
configuration.Producer.Transaction.Retry.Backoff = 10
|
||
|
configuration.Producer.Transaction.ID = "txn_producer"
|
||
|
configuration.Producer.Retry.Max = 1
|
||
|
configuration.Producer.Return.Successes = true
|
||
|
|
||
|
configuration.Net.MaxOpenRequests = 1
|
||
|
configuration.Net.TLS.Enable = true
|
||
|
|
||
|
caCertPool := x509.NewCertPool()
|
||
|
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
|
||
|
configuration.Net.TLS.Config = &tls.Config{
|
||
|
RootCAs: caCertPool,
|
||
|
InsecureSkipVerify: false,
|
||
|
}
|
||
|
|
||
|
if kafkaConfiguration.AuthType == SRAM_AUTH {
|
||
|
configuration.Net.SASL.Enable = true
|
||
|
configuration.Net.SASL.User = *kafkaConfiguration.UserName
|
||
|
configuration.Net.SASL.Password = *kafkaConfiguration.Password
|
||
|
configuration.Net.SASL.Handshake = true
|
||
|
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
|
||
|
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||
|
} else {
|
||
|
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
|
||
|
if err != nil {
|
||
|
panic("Invalid configuration failed to read client certificates")
|
||
|
}
|
||
|
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
|
||
|
}
|
||
|
|
||
|
return configuration
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type Topic string
|
||
|
|
||
|
type Application struct {
|
||
|
ProducerProvider Provider[sarama.SyncProducer]
|
||
|
ConsumerProvider Provider[sarama.ConsumerGroup]
|
||
|
ReceiverTopic Topic
|
||
|
EmitterTopic Topic
|
||
|
}
|