Go使用Kafka消息队列(生产者)

本文代码github: https://github.com/zboyco/go-test/blob/master/kafka/producer/main.go


1.介绍

kafka是什么就不多说了,这里主要记录下第三方库的使用,第三方库还是挺多的,我们这里采用sarama来实现生产者.

2.引用项目地址

sarama: github.com/Shopify/sarama

3.使用

3.1 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
config := sarama.NewConfig()

//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll

//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true

// 设置订阅者分配方式
// sarama.NewManualPartitioner //返回一个手动选择分区的分割器,也就是获取msg中指定的`partition`
// sarama.NewRandomPartitioner //通过随机函数随机获取一个分区号
// sarama.NewRoundRobinPartitioner //环形选择,也就是在所有分区中循环选择一个
// sarama.NewHashPartitioner //通过msg中的key生成hash值,选择分区
config.Producer.Partitioner = sarama.NewHashPartitioner

//设置kafka版本
config.Version = sarama.V2_1_0_0
  • 注意Partitioner根据需求选择分割器

3.2 新建生产者实例

1
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
  • 其中localhost:9092是kafka服务地址

3.3 生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
currentTime := time.Now()

value = "message " + currentTime.Format("2006/01/02 15:04:05")

//新建消息体
msg := &sarama.ProducerMessage{
Topic: "test",
Key: sarama.StringEncoder(strconv.Itoa(currentTime.Minute())), // KEY用来分区指定消费者
}

//赋值
msg.Value = sarama.ByteEncoder(value)

//使用通道发送
producer.Input() <- msg

###结语
这里只介绍了sarama实现生产者的方法,消费者因为sarama没实现分组功能,我们采用另一个库实现.
消费者参考另一篇文章Go使用Kafka消息队列(消费者)