From be69809fd03a613abcfda21e0696b46eeafc5d11 Mon Sep 17 00:00:00 2001 From: RouxAntoine Date: Fri, 16 Jun 2023 07:20:54 +0200 Subject: [PATCH] feature: develop kafka consumer and producer --- .gitignore | 3 +- Makefile | 18 +- cmd/main.go | 43 ++++ configuration.template.yaml | 21 ++ go.mod | 35 ++++ go.sum | 104 ++++++++++ internal/configuration/configuration.go | 253 ++++++++++++++++++++++++ internal/kafka/provider/consumer.go | 69 +++++++ internal/kafka/provider/producer.go | 79 ++++++++ internal/kafka/provider/provider.go | 7 + internal/kafka/scram.go | 33 ++++ manifest/kafka/users.tf | 20 ++ 12 files changed, 683 insertions(+), 2 deletions(-) create mode 100644 cmd/main.go create mode 100644 configuration.template.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/configuration/configuration.go create mode 100644 internal/kafka/provider/consumer.go create mode 100644 internal/kafka/provider/producer.go create mode 100644 internal/kafka/provider/provider.go create mode 100644 internal/kafka/scram.go diff --git a/.gitignore b/.gitignore index 044a765..e92969b 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,5 @@ override.tf.json .terraformrc terraform.rc -configuration.yaml \ No newline at end of file +configuration.yaml +configuration-*.yaml \ No newline at end of file diff --git a/Makefile b/Makefile index b154b41..51e7e4c 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,16 @@ +.PHONY: build + ACTION=apply +build: dependencies + go build -o out/kafka ./cmd/main.go + +dependencies: + go mod tidy + +run: + ./out/kafka -c ./configuration.yaml + infrastructure-local: cd manifest && \ terraform workspace select kind-cluster-dev && \ @@ -16,4 +27,9 @@ topic-ssl: rm -rf user.p12 topic-scram: - kubectl view-secret -n streaming kafka-dev-listener-certificate ca.crt | kcat -b kafka.localdomain:9092 -L -J -X 'security.protocol=sasl_ssl' -X 'sasl.mechanism=SCRAM-SHA-512' -X 'sasl.username=kafka-user' -X "sasl.password=$(kubectl view-secret -n streaming kafka-user password)" -X "ssl.ca.location=/dev/stdin" | jq \ No newline at end of file + kubectl view-secret -n streaming kafka-dev-listener-certificate ca.crt | kcat -b kafka.localdomain:9092 -L -J -X 'security.protocol=sasl_ssl' -X 'sasl.mechanism=SCRAM-SHA-512' -X 'sasl.username=kafka-user' -X "sasl.password=$(kubectl view-secret -n streaming kafka-user password)" -X "ssl.ca.location=/dev/stdin" | jq + +consume-topic: + kubectl view-secret -n streaming admin user.p12 > user.p12 && \ + kubectl view-secret -n streaming cluster-development-cluster-ca-cert ca.crt | kcat -b kafka.127.0.0.1.nip.io:443 -X 'security.protocol=ssl' -X "ssl.ca.location=/dev/stdin" -X "ssl.keystore.location=user.p12" -X "ssl.keystore.password=$$(kubectl view-secret -n streaming admin user.password)" -v -G test-group -o beginning dev.emitter.json && \ + rm -rf user.p12 \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..323103c --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,43 @@ +package main + +import ( + . "antoine-roux.tk/kafka/internal/configuration" + "github.com/Shopify/sarama" + "log" + _ "net/http/pprof" + "os" +) + +func main() { + configuration := NewConfiguration() + if configuration.IsVerbose() { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + } + application := configuration.CreateApplication() + + // producer code + provider := application.ProducerProvider + producer := provider.Borrow() + defer provider.Release(producer) + + err := producer.BeginTxn() + if err != nil { + log.Printf("unable to start txn %s\n", err) + return + } + for i := 0; i < 10; i++ { + _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: string(application.EmitterTopic), + Key: nil, + Value: sarama.StringEncoder("test"), + }) + if err != nil { + log.Printf("Message delivery error %s\n", err) + } + } + err = producer.CommitTxn() + if err != nil { + log.Printf("unable to commit txn %s\n", err) + return + } +} diff --git a/configuration.template.yaml b/configuration.template.yaml new file mode 100644 index 0000000..44123e3 --- /dev/null +++ b/configuration.template.yaml @@ -0,0 +1,21 @@ +verbose: false +kafka: + version: 3.3.1 + brokers: kafka.localdomain:9092 + auth-type: mtls # mtls, scram-sha-512 + + username: + # k view-secret kafka-user password + password: + + # k view-secret -n streaming kafka-user user.crt + cert: + # k view-secret -n streaming kafka-user user.key + key: + + # k view-secret kafka-dev-listener-certificate ca.crt + ca: + group-name: some-consumer-name + topics: + in: receiver + out: emitter \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f267565 --- /dev/null +++ b/go.mod @@ -0,0 +1,35 @@ +module antoine-roux.tk/kafka + +go 1.20 + +require ( + github.com/Shopify/sarama v1.38.1 + github.com/xdg-go/scram v1.1.2 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.15.14 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/net v0.5.0 // indirect + golang.org/x/text v0.6.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..42a83ff --- /dev/null +++ b/go.sum @@ -0,0 +1,104 @@ +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go new file mode 100644 index 0000000..5158ff5 --- /dev/null +++ b/internal/configuration/configuration.go @@ -0,0 +1,253 @@ +package configuration + +import ( + . "antoine-roux.tk/kafka/internal/kafka" + . "antoine-roux.tk/kafka/internal/kafka/provider" + "crypto/tls" + "crypto/x509" + "flag" + "fmt" + "github.com/Shopify/sarama" + "gopkg.in/yaml.v3" + "os" + "path/filepath" + "strings" +) + +const usage = ` +Usage of %s: + -c, --configuration path to yaml configuration file + -h, --help prints help information +` + +type AuthType string + +const ( + MTLS_AUTH AuthType = "mtls" + SRAM_AUTH AuthType = "scram-sha-512" +) + +var configurationPath string + +type Configuration interface { + IsVerbose() bool + CreateApplication() (application *Application) +} + +type kafkaConfiguration struct { + Version string `yaml:"version"` + Brokers string `yaml:"brokers"` + AuthType AuthType `yaml:"auth-type"` + UserName *string `yaml:"username"` + Password *string `yaml:"password"` + Cert *string `yaml:"cert"` + Key *string `yaml:"key"` + Ca string `yaml:"ca"` + GroupName string `yaml:"group-name"` + Topics struct { + In string `yaml:"in"` + Out string `yaml:"out"` + } +} + +type configuration struct { + Verbose bool `yaml:"verbose"` + Kafka kafkaConfiguration +} + +func NewConfiguration() Configuration { + + flag.StringVar(&configurationPath, "configuration", "./configuration.yaml", "configuration path") + flag.StringVar(&configurationPath, "c", "./configuration.yaml", "configuration path") + flag.Usage = func() { fmt.Printf(usage, os.Args[0]) } + flag.Parse() + + filename, _ := filepath.Abs(configurationPath) + + configurationFile, err := os.ReadFile(filename) + if err != nil { + panic("Invalid configuration file path, fail to read it") + } + + var configuration configuration + + err = yaml.Unmarshal(configurationFile, &configuration) + return &configuration +} + +func (c *configuration) IsVerbose() bool { + return c.Verbose +} + +func (c *configuration) CreateApplication() (application *Application) { + producerConfigurationFactory := newKafkaProducerConfigurationFactory(c.Kafka) + consumerConfigurationFactory := newKafkaConsumerConfigurationFactory(c.Kafka) + + application = &Application{} + + application.ProducerProvider = NewProducerProvider( + strings.Split(c.Kafka.Brokers, ","), + producerConfigurationFactory, + ) + application.ConsumerProvider = NewConsumerProvider( + strings.Split(c.Kafka.Brokers, ","), + c.Kafka.GroupName, + consumerConfigurationFactory, + ) + + if len(c.Kafka.Topics.In) == 0 { + panic("no input In topic given to be consumed, please set kafka.topics.in") + } + application.ReceiverTopic = Topic(c.Kafka.Topics.In) + + if len(c.Kafka.Topics.Out) == 0 { + panic("no input Out topic given to produce to, please set kafka.topics.out") + } + application.EmitterTopic = Topic(c.Kafka.Topics.Out) + + return +} + +func newKafkaConsumerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config { + version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version) + if err != nil { + panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err)) + } + if len(kafkaConfiguration.Brokers) == 0 { + panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file") + } + + if kafkaConfiguration.AuthType == SRAM_AUTH { + if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 { + panic("no Kafka scram username defined, please set Kafka.username in configuration file") + } + if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 { + panic("no Kafka scram password defined, please set Kafka.password in configuration file") + } + } else if kafkaConfiguration.AuthType == MTLS_AUTH { + if kafkaConfiguration.Cert == nil { + panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file") + } + if kafkaConfiguration.Key == nil { + panic("no Kafka mtls key defined, please set Kafka.key in configuration file") + } + } else { + panic("invalid kafka auth type in configuration file") + } + + configuration := sarama.NewConfig() + + return func() *sarama.Config { + configuration.Version = version + configuration.ClientID = "client_id" + configuration.Metadata.Full = true + + configuration.Net.MaxOpenRequests = 1 + configuration.Net.TLS.Enable = true + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca)) + configuration.Net.TLS.Config = &tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + + if kafkaConfiguration.AuthType == SRAM_AUTH { + configuration.Net.SASL.Enable = true + configuration.Net.SASL.User = *kafkaConfiguration.UserName + configuration.Net.SASL.Password = *kafkaConfiguration.Password + configuration.Net.SASL.Handshake = true + configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } else { + cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key)) + if err != nil { + panic("Invalid configuration failed to read client certificates") + } + configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert} + } + + return configuration + } +} + +func newKafkaProducerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config { + version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version) + if err != nil { + panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err)) + } + if len(kafkaConfiguration.Brokers) == 0 { + panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file") + } + + if kafkaConfiguration.AuthType == SRAM_AUTH { + if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 { + panic("no Kafka scram username defined, please set Kafka.username in configuration file") + } + if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 { + panic("no Kafka scram password defined, please set Kafka.password in configuration file") + } + } else if kafkaConfiguration.AuthType == MTLS_AUTH { + if kafkaConfiguration.Cert == nil { + panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file") + } + if kafkaConfiguration.Key == nil { + panic("no Kafka mtls key defined, please set Kafka.key in configuration file") + } + } else { + panic("invalid kafka auth type in configuration file") + } + + configuration := sarama.NewConfig() + + return func() *sarama.Config { + configuration.Version = version + configuration.ClientID = "client_id" + configuration.Metadata.Full = true + + configuration.Producer.Idempotent = true + configuration.Producer.Return.Errors = true + configuration.Producer.RequiredAcks = sarama.WaitForAll + configuration.Producer.Partitioner = sarama.NewRoundRobinPartitioner + configuration.Producer.Transaction.Retry.Backoff = 10 + configuration.Producer.Transaction.ID = "txn_producer" + configuration.Producer.Retry.Max = 1 + configuration.Producer.Return.Successes = true + + configuration.Net.MaxOpenRequests = 1 + configuration.Net.TLS.Enable = true + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca)) + configuration.Net.TLS.Config = &tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + + if kafkaConfiguration.AuthType == SRAM_AUTH { + configuration.Net.SASL.Enable = true + configuration.Net.SASL.User = *kafkaConfiguration.UserName + configuration.Net.SASL.Password = *kafkaConfiguration.Password + configuration.Net.SASL.Handshake = true + configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + } else { + cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key)) + if err != nil { + panic("Invalid configuration failed to read client certificates") + } + configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert} + } + + return configuration + } +} + +type Topic string + +type Application struct { + ProducerProvider Provider[sarama.SyncProducer] + ConsumerProvider Provider[sarama.ConsumerGroup] + ReceiverTopic Topic + EmitterTopic Topic +} diff --git a/internal/kafka/provider/consumer.go b/internal/kafka/provider/consumer.go new file mode 100644 index 0000000..5afa086 --- /dev/null +++ b/internal/kafka/provider/consumer.go @@ -0,0 +1,69 @@ +package provider + +import ( + "github.com/Shopify/sarama" + "log" + "sync" +) + +type ConsumerProvider struct { + consumersGroupLock sync.Mutex + consumersGroup []sarama.ConsumerGroup + consumerGroupProvider func() sarama.ConsumerGroup +} + +func NewConsumerProvider(brokers []string, groupID string, consumerConfigurationProvider func() *sarama.Config) Provider[sarama.ConsumerGroup] { + return &ConsumerProvider{ + consumerGroupProvider: func() sarama.ConsumerGroup { + configuration := consumerConfigurationProvider() + + group, err := sarama.NewConsumerGroup(brokers, groupID, configuration) + if err != nil { + log.Fatalln(err) + } + return group + }, + } +} + +func (c *ConsumerProvider) Borrow() (consumerGroup sarama.ConsumerGroup) { + c.consumersGroupLock.Lock() + defer c.consumersGroupLock.Unlock() + + if len(c.consumersGroup) == 0 { + for { + consumerGroup = c.consumerGroupProvider() + if consumerGroup != nil { + return + } + } + } + + index := len(c.consumersGroup) - 1 + consumerGroup = c.consumersGroup[index] + c.consumersGroup = c.consumersGroup[:index] + return +} + +func (c *ConsumerProvider) Release(consumerGroup sarama.ConsumerGroup) { + c.consumersGroupLock.Lock() + defer c.consumersGroupLock.Unlock() + + if err := consumerGroup.Close(); err != nil { + log.Fatalln(err) + } + + c.consumersGroup = append(c.consumersGroup, consumerGroup) +} + +func (c *ConsumerProvider) Clear() { + c.consumersGroupLock.Lock() + defer c.consumersGroupLock.Unlock() + + for _, consumer := range c.consumersGroup { + if err := consumer.Close(); err != nil { + log.Fatalln(err) + } + } + c.consumersGroup = c.consumersGroup[:0] +} diff --git a/internal/kafka/provider/producer.go b/internal/kafka/provider/producer.go new file mode 100644 index 0000000..8432fca --- /dev/null +++ b/internal/kafka/provider/producer.go @@ -0,0 +1,79 @@ +package provider + +import ( + "fmt" + "github.com/Shopify/sarama" + "log" + "sync" +) + +// ProducerProvider pool of producers that ensure transactional-id is unique. +type ProducerProvider struct { + transactionIdGenerator int32 + + producersLock sync.Mutex + producers []sarama.SyncProducer + + producerProvider func() sarama.SyncProducer +} + +func NewProducerProvider(brokers []string, producerConfigurationProvider func() *sarama.Config) Provider[sarama.SyncProducer] { + provider := &ProducerProvider{} + provider.producerProvider = func() sarama.SyncProducer { + configuration := producerConfigurationProvider() + suffix := provider.transactionIdGenerator + // Append transactionIdGenerator to current configuration.Producer.Transaction.ID to ensure transaction-id uniqueness. + if configuration.Producer.Transaction.ID != "" { + provider.transactionIdGenerator++ + configuration.Producer.Transaction.ID = configuration.Producer.Transaction.ID + "-" + fmt.Sprint(suffix) + } + producer, err := sarama.NewSyncProducer(brokers, configuration) + if err != nil { + log.Fatalln(err) + } + return producer + } + return provider +} + +func (p *ProducerProvider) Borrow() (producer sarama.SyncProducer) { + p.producersLock.Lock() + defer p.producersLock.Unlock() + + if len(p.producers) == 0 { + for { + producer = p.producerProvider() + if producer != nil { + return + } + } + } + + index := len(p.producers) - 1 + producer = p.producers[index] + p.producers = p.producers[:index] + return +} + +func (p *ProducerProvider) Release(producer sarama.SyncProducer) { + p.producersLock.Lock() + defer p.producersLock.Unlock() + + // If released producer is erroneous close it and don't return it to the producer pool. + if producer.TxnStatus()&sarama.ProducerTxnFlagInError != 0 { + // Try to close it + _ = producer.Close() + return + } + p.producers = append(p.producers, producer) +} + +func (p *ProducerProvider) Clear() { + p.producersLock.Lock() + defer p.producersLock.Unlock() + + for _, producer := range p.producers { + _ = producer.Close() + } + p.producers = p.producers[:0] +} diff --git a/internal/kafka/provider/provider.go b/internal/kafka/provider/provider.go new file mode 100644 index 0000000..706aacf --- /dev/null +++ b/internal/kafka/provider/provider.go @@ -0,0 +1,7 @@ +package provider + +type Provider[T any] interface { + Borrow() T + Release(T) + Clear() +} diff --git a/internal/kafka/scram.go b/internal/kafka/scram.go new file mode 100644 index 0000000..0c7c357 --- /dev/null +++ b/internal/kafka/scram.go @@ -0,0 +1,33 @@ +package kafka + +import ( + "crypto/sha512" + + "github.com/xdg-go/scram" +) + +var SHA512 scram.HashGeneratorFcn = sha512.New + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/manifest/kafka/users.tf b/manifest/kafka/users.tf index 3f14604..6bd3d3f 100644 --- a/manifest/kafka/users.tf +++ b/manifest/kafka/users.tf @@ -39,6 +39,17 @@ resource "kubernetes_manifest" "kafka_user" { "patternType" = "prefix" } }, + { + "host" = "*" + "operations" = [ + "Describe" + ] + "resource" = { + "name" = "__transaction_state" + "type" = "topic" + "patternType" = "literal" + } + }, { "host" = "*" "operations" = [ @@ -50,6 +61,15 @@ resource "kubernetes_manifest" "kafka_user" { "name" = "some-consumer" "patternType" = "literal" } + }, + { + "host" = "*" + "operations" = ["Describe", "Write"], + "resource" = { + "type" = "transactionalId", + "name" = "txn_producer-" + "patternType" = "prefix" + } } ] "type" = "simple"