kafka-go
动机
在Segment,我们大量依赖Go和Kafka。不幸的是,在撰写本文时,Go的Kafka客户端库的状态并不理想。可用的选择有:
-
sarama,是目前最受欢迎的库,但使用起来相当困难。文档不足,API暴露了Kafka协议的低级概念,并且不支持Go的最新特性,如contexts。它还将所有值作为指针传递,这会导致大量动态内存分配、更频繁的垃圾收集和更高的内存使用。
-
confluent-kafka-go是基于librdkafka的cgo包装器,这意味着它为所有使用该包的Go代码引入了对C库的依赖。它的文档比sarama好得多,但仍然缺乏对Go contexts的支持。
-
goka是一个较新的Go Kafka客户端,专注于特定的使用模式。它提供了将Kafka用作服务之间的消息传递总线而不是有序事件日志的抽象,但这不是我们在Segment典型的Kafka使用场景。该包还依赖sarama进行所有与Kafka的交互。
这就是kafka-go
发挥作用的地方。它提供了与Kafka交互的低级和高级API,反映了Go标准库的概念并实现了接口,使其易于使用和与现有软件集成。
注意:
为了更好地符合我们新采用的行为准则,kafka-go项目已将默认分支重命名为main
。有关行为准则的完整详细信息,请参阅此文档。
Kafka版本
kafka-go
目前已通过Kafka 0.10.1.0至2.7.1版本的测试。虽然它应该与更高版本兼容,但Kafka API中可用的较新功能可能尚未在客户端中实现。
Go版本
kafka-go
需要Go 1.15或更高版本。
连接
Conn
类型是kafka-go
包的核心。它封装了与Kafka服务器的原始网络连接,以提供低级API。
以下是展示连接对象典型用法的一些示例:
// 生产消息
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("连接leader失败:", err)
}
conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("一!")},
kafka.Message{Value: []byte("二!")},
kafka.Message{Value: []byte("三!")},
)
if err != nil {
log.Fatal("写入消息失败:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("关闭写入器失败:", err)
}
// 消费消息
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("连接leader失败:", err)
}
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // 获取最小10KB,最大1MB
b := make([]byte, 10e3) // 每条消息最大10KB
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
if err := batch.Close(); err != nil {
log.Fatal("关闭批处理失败:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("关闭连接失败:", err)
}
创建主题
默认情况下,Kafka的auto.create.topics.enable
设置为'true'
(在bitnami/kafka的Kafka Docker镜像中为KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'
)。如果此值设置为'true'
,则主题将作为kafka.DialLeader
的副作用被创建,如下所示:
// 当auto.create.topics.enable='true'时创建主题
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
panic(err.Error())
}
如果auto.create.topics.enable='false'
,则需要显式创建主题,如下所示:
// 当auto.create.topics.enable='false'时创建主题
topic := "my-topic"
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
通过非leader连接连接到leader
// 通过现有的非leader连接连接到Kafka leader,而不是使用DialLeader
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer connLeader.Close()
列出主题
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
由于它是低级的,Conn
类型成为构建更高级抽象的绝佳基础,例如Reader
。
Reader
Reader
是kafka-go
包暴露的另一个概念,旨在简化从单个主题-分区对消费的典型用例。Reader
还自动处理重新连接和偏移量管理,并暴露一个支持使用Go contexts进行异步取消和超时的API。
请注意,在进程退出时调用Reader
的Close()
方法很重要。Kafka服务器需要优雅断开连接,以停止继续尝试向已连接的客户端发送消息。如果进程被SIGINT(在shell中按ctrl-c)或SIGTERM(如docker stop或kubernetes重启所做的)终止,给定的示例将不会调用Close()
。这可能会导致新的reader连接到同一主题时出现延迟(例如,启动新进程或运行新容器)。使用signal.Notify
处理程序在进程关闭时关闭reader。
// 创建一个新的读取器,从主题A的分区0的偏移量42开始消费
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("偏移量 %d 的消息: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("关闭读取器失败:", err)
}
消费者组
kafka-go
也支持Kafka消费者组,包括由代理管理的偏移量。
要启用消费者组,只需在ReaderConfig中指定GroupID。
使用消费者组时,ReadMessage会自动提交偏移量。
// 创建一个新的读取器,消费主题A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("关闭读取器失败:", err)
}
使用消费者组时有一些限制:
- 当设置GroupID时,
(*Reader).SetOffset
将返回错误 - 当设置GroupID时,
(*Reader).Offset
将始终返回-1
- 当设置GroupID时,
(*Reader).Lag
将始终返回-1
- 当设置GroupID时,
(*Reader).ReadLag
将返回错误 - 当设置GroupID时,
(*Reader).Stats
将返回分区-1
显式提交
kafka-go
也支持显式提交。不调用 ReadMessage
,
而是调用 FetchMessage
然后调用 CommitMessages
。
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("提交消息失败:", err)
}
}
在消费者组中提交消息时,给定主题/分区中偏移量最高的消息
决定了该分区提交的偏移量值。例如,如果通过调用 FetchMessage
检索到单个分区的偏移量为1、2和3的消息,那么用偏移量为3的消息
调用 CommitMessages
也会导致该分区的偏移量1和2的消息被提交。
管理提交
默认情况下,CommitMessages会同步地将偏移量提交到Kafka。为了 提高性能,你可以通过在ReaderConfig上设置CommitInterval来 定期将偏移量提交到Kafka。
// 创建一个新的读取器,消费主题A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 每秒向Kafka刷新提交
})
写入器
要向Kafka生产消息,程序可以使用低级别的 Conn
API,但
该包还提供了更高级别的 Writer
类型,在大多数情况下更适合
使用,因为它提供了额外的功能:
- 自动重试和在错误时重新连接。
- 可配置的消息在可用分区间的分布。
- 同步或异步向Kafka写入消息。
- 使用上下文进行异步取消。
- 关闭时刷新待处理的消息以支持优雅关闭。
- 在发布消息之前创建缺失的主题。注意! 这是直到版本
v0.4.30
的默认行为。
// 创建一个写入器,生产到主题A,使用最少字节分布
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("写入消息失败:", err)
}
if err := w.Close(); err != nil {
log.Fatal("关闭写入器失败:", err)
}
在发布之前创建缺失的主题
// 创建一个写入器,发布消息到主题A。
// 如果主题不存在,将会被创建。
w := &Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
AllowAutoTopicCreation: true,
}
messages := []kafka.Message{
{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
}
var err error
const retries = 3
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 在发布消息之前尝试创建主题
err = w.WriteMessages(ctx, messages...)
if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatalf("意外错误 %v", err)
}
break
}
if err := w.Close(); err != nil {
log.Fatal("关闭写入器失败:", err)
}
写入多个主题
通常,WriterConfig.Topic
用于初始化单一主题的写入器。
通过排除该特定配置,你可以通过设置 Message.Topic
来
定义每条消息的主题。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// 注意:当这里没有定义Topic时,每条Message必须定义它。
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
// 注意:每条Message都定义了Topic,否则将返回错误。
kafka.Message{
Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Topic: "topic-B",
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Topic: "topic-C",
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("写入消息失败:", err)
}
if err := w.Close(); err != nil {
log.Fatal("关闭写入器失败:", err)
}
注意: 这两种模式是互斥的,如果你设置了 Writer.Topic
,
你就不能在写入的消息上显式定义 Message.Topic
。当你没有为写入器
定义主题时,相反的情况也适用。如果 Writer
检测到这种歧义,
将返回错误。
与其他客户端的兼容性
Sarama
如果你从Sarama切换过来,并且需要/想要使用相同的消息分区算法,你可以使用
kafka.Hash
平衡器或 kafka.ReferenceHash
平衡器:
kafka.Hash
=sarama.NewHashPartitioner
kafka.ReferenceHash
=sarama.NewReferenceHashPartitioner
kafka.Hash
和kafka.ReferenceHash
均衡器会将消息路由到与前述两个Sarama分区器相同的分区。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
}
librdkafka和confluent-kafka-go
使用kafka.CRC32Balancer
均衡器可以获得与librdkafka的默认consistent_random
分区策略相同的行为。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
}
Java
使用kafka.Murmur2Balancer
均衡器可以获得与标准Java客户端默认分区器相同的行为。注意:Java类允许直接指定分区,这在这里是不允许的。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
}
压缩
可以通过设置Compression
字段在Writer
上启用压缩:
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Compression: kafka.Snappy,
}
Reader
会通过检查消息属性来确定消费的消息是否被压缩。但是,必须导入所有预期编解码器的包,以确保它们正确加载。
注意:在0.4版本之前,程序必须导入压缩包以安装编解码器并支持从kafka读取压缩消息。现在不再需要这样做,压缩包的导入现在是无操作的。
TLS支持
对于基本的Conn类型或在Reader/Writer配置中,你可以指定一个dialer选项来支持TLS。如果TLS字段为nil,它将不会使用TLS连接。 *注意:*如果在Conn/Reader/Writer上未配置TLS而连接到启用了TLS的Kafka集群,可能会导致不透明的io.ErrUnexpectedEOF错误。
连接
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls配置...},
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls配置...},
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
直接创建Writer
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{},
},
}
使用kafka.NewWriter
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls配置...},
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Balancer: &kafka.Hash{},
Dialer: dialer,
})
注意kafka.NewWriter
和kafka.WriterConfig
已被弃用,将在未来版本中移除。
SASL支持
你可以在Dialer
上指定一个选项来使用SASL认证。Dialer
可以直接用于打开Conn
,也可以通过各自的配置传递给Reader
或Writer
。如果SASLMechanism
字段为nil
,它将不会使用SASL进行认证。
SASL认证类型
Plain
mechanism := plain.Mechanism{
Username: "username",
Password: "password",
}
SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
连接
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport负责管理连接池和其他资源,
// 通常最好创建几个并在应用程序中共享它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: sharedTransport,
}
Client
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport负责管理连接池和其他资源,
// 通常最好创建几个并在应用程序中共享它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
client := &kafka.Client{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}
读取指定时间范围内的所有消息
startTime := time.Now().Add(-time.Hour)
endTime := time.Now()
batchSize := int(10e6) // 10MB
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "my-topic1",
Partition: 0,
MaxBytes: batchSize,
})
r.SetOffsetAt(context.Background(), startTime)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
if m.Time.After(endTime) {
break
}
// TODO: 处理消息
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
日志记录
为了查看Reader/Writer类型的操作,在创建时配置一个日志记录器。
Reader
func logf(msg string, a ...interface{}) {
fmt.Printf(msg, a...)
fmt.Println()
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "my-topic1",
Partition: 0,
Logger: kafka.LoggerFunc(logf),
ErrorLogger: kafka.LoggerFunc(logf),
})
Writer
func logf(msg string, a ...interface{}) {
fmt.Printf(msg, a...)
fmt.Println()
}
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "topic",
Logger: kafka.LoggerFunc(logf),
ErrorLogger: kafka.LoggerFunc(logf),
}
测试
较新版本的Kafka中微妙的行为变化导致一些历史测试失败,如果您正在运行Kafka 2.3.1或更高版本,导出KAFKA_SKIP_NETTEST=1
环境变量将跳过这些测试。
在Docker中本地运行Kafka
docker-compose up -d
运行测试
KAFKA_VERSION=2.3.1 \
KAFKA_SKIP_NETTEST=1 \
go test -race ./...
(或者)清理缓存的测试结果并运行测试:
go clean -cache && make test