kafka/cmd/main.go

73 lines
1.6 KiB
Go

package main
import (
. "antoine-roux.tk/kafka/internal/configuration"
"github.com/Shopify/sarama"
"log"
_ "net/http/pprof"
"os"
)
func main() {
configuration := NewConfiguration()
if configuration.IsVerbose() {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
application := configuration.CreateApplication()
// producer code
provider := application.ProducerProvider
producer := provider.Borrow()
defer provider.Release(producer)
err := producer.BeginTxn()
if err != nil {
log.Printf("unable to start txn %s\n", err)
return
}
for i := 0; i < 10; i++ {
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: string(application.EmitterTopic),
Key: nil,
Value: sarama.StringEncoder("test"),
})
if err != nil {
log.Printf("Message delivery error %s\n", err)
}
}
err = producer.CommitTxn()
if err != nil {
log.Printf("unable to commit txn %s\n", err)
return
}
// ---- consumer ----
//for {
// log.Println("in the for")
// select {
// case msg := <-partitionConsumer.Messages():
// log.Printf("Consumed message offset %d\n", msg.Offset)
// if *logMsg {
// log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
// }
// consumed++
// case <-signals:
// break ConsumerLoop
// }
//}
// SIGUSR1 toggle the pause/resume consumption
//sigterm := make(chan os.Signal, 1)
//signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
//for keepRunning {
// <-sigterm
// log.Println("terminating: via signal")
// keepRunning = false
//}
//cancel()
//wg.Wait()
//producerProvider.Clear()
}