feature: consumer code

This commit is contained in:
RouxAntoine 2023-06-25 10:13:48 +02:00
parent fc28a7678c
commit d7d10ab2a4
Signed by: antoine
GPG Key ID: 098FB66FC0475E70
3 changed files with 37 additions and 5 deletions

View File

@ -1,15 +1,18 @@
.PHONY: build .PHONY: build-producer build-consumer run dependencies infrastructure-local infrastructure-prod topic-scram topic-ssl consume-topic
ACTION=apply ACTION=apply
build: dependencies build-producer: dependencies
go build -o out/kafka ./cmd/main.go go build -o out/kafka-producer ./cmd/producer.go
build-consumer: dependencies
go built -o out/kafka-consumer ./cmd/consumer.go
dependencies: dependencies:
go mod tidy go mod tidy
run: run:
./out/kafka -c ./configuration.yaml ./out/kafka-producer -c ./configuration.yaml
infrastructure-local: infrastructure-local:
cd manifest && \ cd manifest && \

View File

@ -41,4 +41,33 @@ func main() {
log.Printf("unable to commit txn %s\n", err) log.Printf("unable to commit txn %s\n", err)
return return
} }
// ---- consumer ----
//for {
// log.Println("in the for")
// select {
// case msg := <-partitionConsumer.Messages():
// log.Printf("Consumed message offset %d\n", msg.Offset)
// if *logMsg {
// log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
// }
// consumed++
// case <-signals:
// break ConsumerLoop
// }
//}
// SIGUSR1 toggle the pause/resume consumption
//sigterm := make(chan os.Signal, 1)
//signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
//for keepRunning {
// <-sigterm
// log.Println("terminating: via signal")
// keepRunning = false
//}
//cancel()
//wg.Wait()
//producerProvider.Clear()
} }

View File

@ -13,7 +13,7 @@ kafka:
# k view-secret -n streaming kafka-user user.key # k view-secret -n streaming kafka-user user.key
key: <client pem key> key: <client pem key>
# k view-secret kafka-dev-listener-certificate ca.crt # k view-secret cluster-development-cluster-ca-cert ca.crt
ca: <server root certificate> ca: <server root certificate>
group-name: some-consumer-name group-name: some-consumer-name
topics: topics: