package main import ( . "antoine-roux.tk/kafka/internal/configuration" "github.com/Shopify/sarama" "log" _ "net/http/pprof" "os" ) func main() { configuration := NewConfiguration() if configuration.IsVerbose() { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } application := configuration.CreateApplication() // producer code provider := application.ProducerProvider producer := provider.Borrow() defer provider.Release(producer) err := producer.BeginTxn() if err != nil { log.Printf("unable to start txn %s\n", err) return } for i := 0; i < 10; i++ { _, _, err := producer.SendMessage(&sarama.ProducerMessage{ Topic: string(application.EmitterTopic), Key: nil, Value: sarama.StringEncoder("test"), }) if err != nil { log.Printf("Message delivery error %s\n", err) } } err = producer.CommitTxn() if err != nil { log.Printf("unable to commit txn %s\n", err) return } // ---- consumer ---- //for { // log.Println("in the for") // select { // case msg := <-partitionConsumer.Messages(): // log.Printf("Consumed message offset %d\n", msg.Offset) // if *logMsg { // log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value) // } // consumed++ // case <-signals: // break ConsumerLoop // } //} // SIGUSR1 toggle the pause/resume consumption //sigterm := make(chan os.Signal, 1) //signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) //for keepRunning { // <-sigterm // log.Println("terminating: via signal") // keepRunning = false //} //cancel() //wg.Wait() //producerProvider.Clear() }