feature: develop kafka consumer and producer

This commit is contained in:
RouxAntoine 2023-06-16 07:20:54 +02:00
parent 6349ce5108
commit be69809fd0
Signed by: antoine
GPG Key ID: 098FB66FC0475E70
12 changed files with 683 additions and 2 deletions

1
.gitignore vendored
View File

@ -113,3 +113,4 @@ override.tf.json
terraform.rc terraform.rc
configuration.yaml configuration.yaml
configuration-*.yaml

View File

@ -1,5 +1,16 @@
.PHONY: build
ACTION=apply ACTION=apply
build: dependencies
go build -o out/kafka ./cmd/main.go
dependencies:
go mod tidy
run:
./out/kafka -c ./configuration.yaml
infrastructure-local: infrastructure-local:
cd manifest && \ cd manifest && \
terraform workspace select kind-cluster-dev && \ terraform workspace select kind-cluster-dev && \
@ -17,3 +28,8 @@ topic-ssl:
topic-scram: topic-scram:
kubectl view-secret -n streaming kafka-dev-listener-certificate ca.crt | kcat -b kafka.localdomain:9092 -L -J -X 'security.protocol=sasl_ssl' -X 'sasl.mechanism=SCRAM-SHA-512' -X 'sasl.username=kafka-user' -X "sasl.password=$(kubectl view-secret -n streaming kafka-user password)" -X "ssl.ca.location=/dev/stdin" | jq kubectl view-secret -n streaming kafka-dev-listener-certificate ca.crt | kcat -b kafka.localdomain:9092 -L -J -X 'security.protocol=sasl_ssl' -X 'sasl.mechanism=SCRAM-SHA-512' -X 'sasl.username=kafka-user' -X "sasl.password=$(kubectl view-secret -n streaming kafka-user password)" -X "ssl.ca.location=/dev/stdin" | jq
consume-topic:
kubectl view-secret -n streaming admin user.p12 > user.p12 && \
kubectl view-secret -n streaming cluster-development-cluster-ca-cert ca.crt | kcat -b kafka.127.0.0.1.nip.io:443 -X 'security.protocol=ssl' -X "ssl.ca.location=/dev/stdin" -X "ssl.keystore.location=user.p12" -X "ssl.keystore.password=$$(kubectl view-secret -n streaming admin user.password)" -v -G test-group -o beginning dev.emitter.json && \
rm -rf user.p12

43
cmd/main.go Normal file
View File

@ -0,0 +1,43 @@
package main
import (
. "antoine-roux.tk/kafka/internal/configuration"
"github.com/Shopify/sarama"
"log"
_ "net/http/pprof"
"os"
)
func main() {
configuration := NewConfiguration()
if configuration.IsVerbose() {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
application := configuration.CreateApplication()
// producer code
provider := application.ProducerProvider
producer := provider.Borrow()
defer provider.Release(producer)
err := producer.BeginTxn()
if err != nil {
log.Printf("unable to start txn %s\n", err)
return
}
for i := 0; i < 10; i++ {
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: string(application.EmitterTopic),
Key: nil,
Value: sarama.StringEncoder("test"),
})
if err != nil {
log.Printf("Message delivery error %s\n", err)
}
}
err = producer.CommitTxn()
if err != nil {
log.Printf("unable to commit txn %s\n", err)
return
}
}

View File

@ -0,0 +1,21 @@
verbose: false
kafka:
version: 3.3.1
brokers: kafka.localdomain:9092
auth-type: mtls # mtls, scram-sha-512
username: <username>
# k view-secret kafka-user password
password: <password>
# k view-secret -n streaming kafka-user user.crt
cert: <client pem cert>
# k view-secret -n streaming kafka-user user.key
key: <client pem key>
# k view-secret kafka-dev-listener-certificate ca.crt
ca: <server root certificate>
group-name: some-consumer-name
topics:
in: receiver
out: emitter

35
go.mod Normal file
View File

@ -0,0 +1,35 @@
module antoine-roux.tk/kafka
go 1.20
require (
github.com/Shopify/sarama v1.38.1
github.com/xdg-go/scram v1.1.2
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/text v0.6.0 // indirect
)

104
go.sum Normal file
View File

@ -0,0 +1,104 @@
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8=
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc=
github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,253 @@
package configuration
import (
. "antoine-roux.tk/kafka/internal/kafka"
. "antoine-roux.tk/kafka/internal/kafka/provider"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"github.com/Shopify/sarama"
"gopkg.in/yaml.v3"
"os"
"path/filepath"
"strings"
)
const usage = `
Usage of %s:
-c, --configuration path to yaml configuration file
-h, --help prints help information
`
type AuthType string
const (
MTLS_AUTH AuthType = "mtls"
SRAM_AUTH AuthType = "scram-sha-512"
)
var configurationPath string
type Configuration interface {
IsVerbose() bool
CreateApplication() (application *Application)
}
type kafkaConfiguration struct {
Version string `yaml:"version"`
Brokers string `yaml:"brokers"`
AuthType AuthType `yaml:"auth-type"`
UserName *string `yaml:"username"`
Password *string `yaml:"password"`
Cert *string `yaml:"cert"`
Key *string `yaml:"key"`
Ca string `yaml:"ca"`
GroupName string `yaml:"group-name"`
Topics struct {
In string `yaml:"in"`
Out string `yaml:"out"`
}
}
type configuration struct {
Verbose bool `yaml:"verbose"`
Kafka kafkaConfiguration
}
func NewConfiguration() Configuration {
flag.StringVar(&configurationPath, "configuration", "./configuration.yaml", "configuration path")
flag.StringVar(&configurationPath, "c", "./configuration.yaml", "configuration path")
flag.Usage = func() { fmt.Printf(usage, os.Args[0]) }
flag.Parse()
filename, _ := filepath.Abs(configurationPath)
configurationFile, err := os.ReadFile(filename)
if err != nil {
panic("Invalid configuration file path, fail to read it")
}
var configuration configuration
err = yaml.Unmarshal(configurationFile, &configuration)
return &configuration
}
func (c *configuration) IsVerbose() bool {
return c.Verbose
}
func (c *configuration) CreateApplication() (application *Application) {
producerConfigurationFactory := newKafkaProducerConfigurationFactory(c.Kafka)
consumerConfigurationFactory := newKafkaConsumerConfigurationFactory(c.Kafka)
application = &Application{}
application.ProducerProvider = NewProducerProvider(
strings.Split(c.Kafka.Brokers, ","),
producerConfigurationFactory,
)
application.ConsumerProvider = NewConsumerProvider(
strings.Split(c.Kafka.Brokers, ","),
c.Kafka.GroupName,
consumerConfigurationFactory,
)
if len(c.Kafka.Topics.In) == 0 {
panic("no input In topic given to be consumed, please set kafka.topics.in")
}
application.ReceiverTopic = Topic(c.Kafka.Topics.In)
if len(c.Kafka.Topics.Out) == 0 {
panic("no input Out topic given to produce to, please set kafka.topics.out")
}
application.EmitterTopic = Topic(c.Kafka.Topics.Out)
return
}
func newKafkaConsumerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
if err != nil {
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
}
if len(kafkaConfiguration.Brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
}
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
}
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
if kafkaConfiguration.Cert == nil {
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
}
if kafkaConfiguration.Key == nil {
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
}
} else {
panic("invalid kafka auth type in configuration file")
}
configuration := sarama.NewConfig()
return func() *sarama.Config {
configuration.Version = version
configuration.ClientID = "client_id"
configuration.Metadata.Full = true
configuration.Net.MaxOpenRequests = 1
configuration.Net.TLS.Enable = true
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
configuration.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
configuration.Net.SASL.Enable = true
configuration.Net.SASL.User = *kafkaConfiguration.UserName
configuration.Net.SASL.Password = *kafkaConfiguration.Password
configuration.Net.SASL.Handshake = true
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else {
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
if err != nil {
panic("Invalid configuration failed to read client certificates")
}
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
}
return configuration
}
}
func newKafkaProducerConfigurationFactory(kafkaConfiguration kafkaConfiguration) func() *sarama.Config {
version, err := sarama.ParseKafkaVersion(kafkaConfiguration.Version)
if err != nil {
panic(fmt.Sprintf("Error parsing Kafka Version: %v\n", err))
}
if len(kafkaConfiguration.Brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set Kafka.brokers in configuration file")
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
if kafkaConfiguration.UserName == nil || len(*kafkaConfiguration.UserName) == 0 {
panic("no Kafka scram username defined, please set Kafka.username in configuration file")
}
if kafkaConfiguration.Password == nil || len(*kafkaConfiguration.Password) == 0 {
panic("no Kafka scram password defined, please set Kafka.password in configuration file")
}
} else if kafkaConfiguration.AuthType == MTLS_AUTH {
if kafkaConfiguration.Cert == nil {
panic("no Kafka mtls cert defined, please set Kafka.cert in configuration file")
}
if kafkaConfiguration.Key == nil {
panic("no Kafka mtls key defined, please set Kafka.key in configuration file")
}
} else {
panic("invalid kafka auth type in configuration file")
}
configuration := sarama.NewConfig()
return func() *sarama.Config {
configuration.Version = version
configuration.ClientID = "client_id"
configuration.Metadata.Full = true
configuration.Producer.Idempotent = true
configuration.Producer.Return.Errors = true
configuration.Producer.RequiredAcks = sarama.WaitForAll
configuration.Producer.Partitioner = sarama.NewRoundRobinPartitioner
configuration.Producer.Transaction.Retry.Backoff = 10
configuration.Producer.Transaction.ID = "txn_producer"
configuration.Producer.Retry.Max = 1
configuration.Producer.Return.Successes = true
configuration.Net.MaxOpenRequests = 1
configuration.Net.TLS.Enable = true
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(kafkaConfiguration.Ca))
configuration.Net.TLS.Config = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false,
}
if kafkaConfiguration.AuthType == SRAM_AUTH {
configuration.Net.SASL.Enable = true
configuration.Net.SASL.User = *kafkaConfiguration.UserName
configuration.Net.SASL.Password = *kafkaConfiguration.Password
configuration.Net.SASL.Handshake = true
configuration.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
configuration.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else {
cert, err := tls.X509KeyPair([]byte(*kafkaConfiguration.Cert), []byte(*kafkaConfiguration.Key))
if err != nil {
panic("Invalid configuration failed to read client certificates")
}
configuration.Net.TLS.Config.Certificates = []tls.Certificate{cert}
}
return configuration
}
}
type Topic string
type Application struct {
ProducerProvider Provider[sarama.SyncProducer]
ConsumerProvider Provider[sarama.ConsumerGroup]
ReceiverTopic Topic
EmitterTopic Topic
}

View File

@ -0,0 +1,69 @@
package provider
import (
"github.com/Shopify/sarama"
"log"
"sync"
)
type ConsumerProvider struct {
consumersGroupLock sync.Mutex
consumersGroup []sarama.ConsumerGroup
consumerGroupProvider func() sarama.ConsumerGroup
}
func NewConsumerProvider(brokers []string, groupID string, consumerConfigurationProvider func() *sarama.Config) Provider[sarama.ConsumerGroup] {
return &ConsumerProvider{
consumerGroupProvider: func() sarama.ConsumerGroup {
configuration := consumerConfigurationProvider()
group, err := sarama.NewConsumerGroup(brokers, groupID, configuration)
if err != nil {
log.Fatalln(err)
}
return group
},
}
}
func (c *ConsumerProvider) Borrow() (consumerGroup sarama.ConsumerGroup) {
c.consumersGroupLock.Lock()
defer c.consumersGroupLock.Unlock()
if len(c.consumersGroup) == 0 {
for {
consumerGroup = c.consumerGroupProvider()
if consumerGroup != nil {
return
}
}
}
index := len(c.consumersGroup) - 1
consumerGroup = c.consumersGroup[index]
c.consumersGroup = c.consumersGroup[:index]
return
}
func (c *ConsumerProvider) Release(consumerGroup sarama.ConsumerGroup) {
c.consumersGroupLock.Lock()
defer c.consumersGroupLock.Unlock()
if err := consumerGroup.Close(); err != nil {
log.Fatalln(err)
}
c.consumersGroup = append(c.consumersGroup, consumerGroup)
}
func (c *ConsumerProvider) Clear() {
c.consumersGroupLock.Lock()
defer c.consumersGroupLock.Unlock()
for _, consumer := range c.consumersGroup {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}
c.consumersGroup = c.consumersGroup[:0]
}

View File

@ -0,0 +1,79 @@
package provider
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
)
// ProducerProvider pool of producers that ensure transactional-id is unique.
type ProducerProvider struct {
transactionIdGenerator int32
producersLock sync.Mutex
producers []sarama.SyncProducer
producerProvider func() sarama.SyncProducer
}
func NewProducerProvider(brokers []string, producerConfigurationProvider func() *sarama.Config) Provider[sarama.SyncProducer] {
provider := &ProducerProvider{}
provider.producerProvider = func() sarama.SyncProducer {
configuration := producerConfigurationProvider()
suffix := provider.transactionIdGenerator
// Append transactionIdGenerator to current configuration.Producer.Transaction.ID to ensure transaction-id uniqueness.
if configuration.Producer.Transaction.ID != "" {
provider.transactionIdGenerator++
configuration.Producer.Transaction.ID = configuration.Producer.Transaction.ID + "-" + fmt.Sprint(suffix)
}
producer, err := sarama.NewSyncProducer(brokers, configuration)
if err != nil {
log.Fatalln(err)
}
return producer
}
return provider
}
func (p *ProducerProvider) Borrow() (producer sarama.SyncProducer) {
p.producersLock.Lock()
defer p.producersLock.Unlock()
if len(p.producers) == 0 {
for {
producer = p.producerProvider()
if producer != nil {
return
}
}
}
index := len(p.producers) - 1
producer = p.producers[index]
p.producers = p.producers[:index]
return
}
func (p *ProducerProvider) Release(producer sarama.SyncProducer) {
p.producersLock.Lock()
defer p.producersLock.Unlock()
// If released producer is erroneous close it and don't return it to the producer pool.
if producer.TxnStatus()&sarama.ProducerTxnFlagInError != 0 {
// Try to close it
_ = producer.Close()
return
}
p.producers = append(p.producers, producer)
}
func (p *ProducerProvider) Clear() {
p.producersLock.Lock()
defer p.producersLock.Unlock()
for _, producer := range p.producers {
_ = producer.Close()
}
p.producers = p.producers[:0]
}

View File

@ -0,0 +1,7 @@
package provider
type Provider[T any] interface {
Borrow() T
Release(T)
Clear()
}

33
internal/kafka/scram.go Normal file
View File

@ -0,0 +1,33 @@
package kafka
import (
"crypto/sha512"
"github.com/xdg-go/scram"
)
var SHA512 scram.HashGeneratorFcn = sha512.New
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

View File

@ -39,6 +39,17 @@ resource "kubernetes_manifest" "kafka_user" {
"patternType" = "prefix" "patternType" = "prefix"
} }
}, },
{
"host" = "*"
"operations" = [
"Describe"
]
"resource" = {
"name" = "__transaction_state"
"type" = "topic"
"patternType" = "literal"
}
},
{ {
"host" = "*" "host" = "*"
"operations" = [ "operations" = [
@ -50,6 +61,15 @@ resource "kubernetes_manifest" "kafka_user" {
"name" = "some-consumer" "name" = "some-consumer"
"patternType" = "literal" "patternType" = "literal"
} }
},
{
"host" = "*"
"operations" = ["Describe", "Write"],
"resource" = {
"type" = "transactionalId",
"name" = "txn_producer-"
"patternType" = "prefix"
}
} }
] ]
"type" = "simple" "type" = "simple"