73 lines
1.6 KiB
Go
73 lines
1.6 KiB
Go
package main
|
|
|
|
import (
|
|
. "antoine-roux.tk/kafka/internal/configuration"
|
|
"github.com/Shopify/sarama"
|
|
"log"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
)
|
|
|
|
func main() {
|
|
configuration := NewConfiguration()
|
|
if configuration.IsVerbose() {
|
|
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
|
|
}
|
|
application := configuration.CreateApplication()
|
|
|
|
// producer code
|
|
provider := application.ProducerProvider
|
|
producer := provider.Borrow()
|
|
defer provider.Release(producer)
|
|
|
|
err := producer.BeginTxn()
|
|
if err != nil {
|
|
log.Printf("unable to start txn %s\n", err)
|
|
return
|
|
}
|
|
for i := 0; i < 10; i++ {
|
|
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
|
|
Topic: string(application.EmitterTopic),
|
|
Key: nil,
|
|
Value: sarama.StringEncoder("test"),
|
|
})
|
|
if err != nil {
|
|
log.Printf("Message delivery error %s\n", err)
|
|
}
|
|
}
|
|
err = producer.CommitTxn()
|
|
if err != nil {
|
|
log.Printf("unable to commit txn %s\n", err)
|
|
return
|
|
}
|
|
|
|
// ---- consumer ----
|
|
//for {
|
|
// log.Println("in the for")
|
|
// select {
|
|
// case msg := <-partitionConsumer.Messages():
|
|
// log.Printf("Consumed message offset %d\n", msg.Offset)
|
|
// if *logMsg {
|
|
// log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
|
|
// }
|
|
// consumed++
|
|
// case <-signals:
|
|
// break ConsumerLoop
|
|
// }
|
|
//}
|
|
|
|
// SIGUSR1 toggle the pause/resume consumption
|
|
//sigterm := make(chan os.Signal, 1)
|
|
//signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
//for keepRunning {
|
|
// <-sigterm
|
|
// log.Println("terminating: via signal")
|
|
// keepRunning = false
|
|
//}
|
|
//cancel()
|
|
//wg.Wait()
|
|
|
|
//producerProvider.Clear()
|
|
}
|