Go使用Kafka消息队列(消费者)

本文代码github: https://github.com/zboyco/go-test/blob/master/kafka/consumer/main.go


1.介绍

因为sarama本身没有实现消费者分组订阅的功能,所以我们采用sarama-cluster第三方库,sarama-cluster是在sarama基础上再封装了一些方法,实现了分组等一些其他功能.

2.引用项目地址

sarama : github.com/Shopify/sarama
sarama-cluster : github.com/bsm/sarama-cluster

3.使用

3.1 配置

1
2
3
4
5
6
7
config := cluster.NewConfig()
// 返回错误
config.Consumer.Return.Errors = true
// offsets提交间隔
config.Consumer.Offsets.CommitInterval = 1 * time.Second
// 默认从最新的offsets开始获取
config.Consumer.Offsets.Initial = sarama.OffsetNewest

3.2 新建消费者实例

1
consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, "consumer-group1", []string{"test"}, config)
  • NewConsumer(代理地址,组ID,话题,配置)

3.3 消费消息

1
2
3
4
5
6
7
8
9
10
11
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
log.Printf("%s : %s/%d/%d\t%s\t%s\n", consumerId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // 提交offset
}
case <-signals:
return
}
}
  • 其中signals通道用来接收系统消息
    1
    2
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

4.结语

这里只介绍了sarama-cluster实现消费者的方法,生产者 参考另一篇文章Go使用Kafka消息队列(生产者)