本文代码github: https://github.com/zboyco/go-test/blob/master/kafka/consumer/main.go
1.介绍
因为sarama
本身没有实现消费者分组订阅的功能,所以我们采用sarama-cluster
第三方库,sarama-cluster
是在sarama
基础上再封装了一些方法,实现了分组等一些其他功能.
2.引用项目地址
sarama : github.com/Shopify/sarama
sarama-cluster : github.com/bsm/sarama-cluster
3.使用
3.1 配置
1 | config := cluster.NewConfig() |
3.2 新建消费者实例
1 | consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, "consumer-group1", []string{"test"}, config) |
- NewConsumer(代理地址,组ID,话题,配置)
3.3 消费消息
1 | for { |
- 其中
signals
通道用来接收系统消息1
2signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
4.结语
这里只介绍了sarama-cluster
实现消费者的方法,生产者 参考另一篇文章Go使用Kafka消息队列(生产者)