diff --git a/Makefile b/Makefile index 51e7e4c..8818bb5 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,18 @@ -.PHONY: build +.PHONY: build-producer build-consumer run dependencies infrastructure-local infrastructure-prod topic-scram topic-ssl consume-topic ACTION=apply -build: dependencies - go build -o out/kafka ./cmd/main.go +build-producer: dependencies + go build -o out/kafka-producer ./cmd/producer.go + +build-consumer: dependencies + go built -o out/kafka-consumer ./cmd/consumer.go dependencies: go mod tidy run: - ./out/kafka -c ./configuration.yaml + ./out/kafka-producer -c ./configuration.yaml infrastructure-local: cd manifest && \ diff --git a/cmd/main.go b/cmd/producer.go similarity index 60% rename from cmd/main.go rename to cmd/producer.go index 77380a6..c161aea 100644 --- a/cmd/main.go +++ b/cmd/producer.go @@ -41,4 +41,33 @@ func main() { log.Printf("unable to commit txn %s\n", err) return } + + // ---- consumer ---- + //for { + // log.Println("in the for") + // select { + // case msg := <-partitionConsumer.Messages(): + // log.Printf("Consumed message offset %d\n", msg.Offset) + // if *logMsg { + // log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value) + // } + // consumed++ + // case <-signals: + // break ConsumerLoop + // } + //} + + // SIGUSR1 toggle the pause/resume consumption + //sigterm := make(chan os.Signal, 1) + //signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + + //for keepRunning { + // <-sigterm + // log.Println("terminating: via signal") + // keepRunning = false + //} + //cancel() + //wg.Wait() + + //producerProvider.Clear() } diff --git a/configuration.template.yaml b/configuration.template.yaml index 44123e3..224c0dc 100644 --- a/configuration.template.yaml +++ b/configuration.template.yaml @@ -13,7 +13,7 @@ kafka: # k view-secret -n streaming kafka-user user.key key: - # k view-secret kafka-dev-listener-certificate ca.crt + # k view-secret cluster-development-cluster-ca-cert ca.crt ca: group-name: some-consumer-name topics: