Project Icon

kafka-go

Go语言开发的高性能Kafka客户端库

kafka-go是一款Go语言开发的高性能Kafka客户端库,提供低级和高级API与Kafka交互。该库实现Go标准库接口,易于使用和集成,支持生产者、消费者和消费者组功能。它具备自动重试、重连和偏移量管理等特性,适用于Kafka 0.10.1.0至2.7.1版本,需要Go 1.15或更高版本。

kafka-go CircleCI Go Report Card GoDoc

动机

在Segment,我们大量依赖Go和Kafka。不幸的是,在撰写本文时,Go的Kafka客户端库的状态并不理想。可用的选择有:

  • sarama,是目前最受欢迎的库,但使用起来相当困难。文档不足,API暴露了Kafka协议的低级概念,并且不支持Go的最新特性,如contexts。它还将所有值作为指针传递,这会导致大量动态内存分配、更频繁的垃圾收集和更高的内存使用。

  • confluent-kafka-go是基于librdkafka的cgo包装器,这意味着它为所有使用该包的Go代码引入了对C库的依赖。它的文档比sarama好得多,但仍然缺乏对Go contexts的支持。

  • goka是一个较新的Go Kafka客户端,专注于特定的使用模式。它提供了将Kafka用作服务之间的消息传递总线而不是有序事件日志的抽象,但这不是我们在Segment典型的Kafka使用场景。该包还依赖sarama进行所有与Kafka的交互。

这就是kafka-go发挥作用的地方。它提供了与Kafka交互的低级和高级API,反映了Go标准库的概念并实现了接口,使其易于使用和与现有软件集成。

注意:

为了更好地符合我们新采用的行为准则,kafka-go项目已将默认分支重命名为main。有关行为准则的完整详细信息,请参阅此文档

Kafka版本

kafka-go目前已通过Kafka 0.10.1.0至2.7.1版本的测试。虽然它应该与更高版本兼容,但Kafka API中可用的较新功能可能尚未在客户端中实现。

Go版本

kafka-go需要Go 1.15或更高版本。

连接 GoDoc

Conn类型是kafka-go包的核心。它封装了与Kafka服务器的原始网络连接,以提供低级API。

以下是展示连接对象典型用法的一些示例:

// 生产消息
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("连接leader失败:", err)
}

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
_, err = conn.WriteMessages(
    kafka.Message{Value: []byte("一!")},
    kafka.Message{Value: []byte("二!")},
    kafka.Message{Value: []byte("三!")},
)
if err != nil {
    log.Fatal("写入消息失败:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("关闭写入器失败:", err)
}
// 消费消息
topic := "my-topic"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
    log.Fatal("连接leader失败:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // 获取最小10KB,最大1MB

b := make([]byte, 10e3) // 每条消息最大10KB
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b[:n]))
}

if err := batch.Close(); err != nil {
    log.Fatal("关闭批处理失败:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("关闭连接失败:", err)
}

创建主题

默认情况下,Kafka的auto.create.topics.enable设置为'true'(在bitnami/kafka的Kafka Docker镜像中为KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true')。如果此值设置为'true',则主题将作为kafka.DialLeader的副作用被创建,如下所示:

// 当auto.create.topics.enable='true'时创建主题
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
if err != nil {
    panic(err.Error())
}

如果auto.create.topics.enable='false',则需要显式创建主题,如下所示:

// 当auto.create.topics.enable='false'时创建主题
topic := "my-topic"

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()


topicConfigs := []kafka.TopicConfig{
    {
        Topic:             topic,
        NumPartitions:     1,
        ReplicationFactor: 1,
    },
}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}

通过非leader连接连接到leader

// 通过现有的非leader连接连接到Kafka leader,而不是使用DialLeader
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer connLeader.Close()

列出主题

conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

partitions, err := conn.ReadPartitions()
if err != nil {
    panic(err.Error())
}

m := map[string]struct{}{}

for _, p := range partitions {
    m[p.Topic] = struct{}{}
}
for k := range m {
    fmt.Println(k)
}

由于它是低级的,Conn类型成为构建更高级抽象的绝佳基础,例如Reader

Reader GoDoc

Readerkafka-go包暴露的另一个概念,旨在简化从单个主题-分区对消费的典型用例。Reader还自动处理重新连接和偏移量管理,并暴露一个支持使用Go contexts进行异步取消和超时的API。

请注意,在进程退出时调用ReaderClose()方法很重要。Kafka服务器需要优雅断开连接,以停止继续尝试向已连接的客户端发送消息。如果进程被SIGINT(在shell中按ctrl-c)或SIGTERM(如docker stop或kubernetes重启所做的)终止,给定的示例将不会调用Close()。这可能会导致新的reader连接到同一主题时出现延迟(例如,启动新进程或运行新容器)。使用signal.Notify处理程序在进程关闭时关闭reader。

// 创建一个新的读取器,从主题A的分区0的偏移量42开始消费
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092","localhost:9093", "localhost:9094"},
    Topic:     "topic-A",
    Partition: 0,
    MaxBytes:  10e6, // 10MB
})
r.SetOffset(42)

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("偏移量 %d 的消息: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("关闭读取器失败:", err)
}

消费者组

kafka-go 也支持Kafka消费者组,包括由代理管理的偏移量。 要启用消费者组,只需在ReaderConfig中指定GroupID。

使用消费者组时,ReadMessage会自动提交偏移量。

// 创建一个新的读取器,消费主题A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:   "consumer-group-id",
    Topic:     "topic-A",
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("关闭读取器失败:", err)
}

使用消费者组时有一些限制:

  • 当设置GroupID时,(*Reader).SetOffset 将返回错误
  • 当设置GroupID时,(*Reader).Offset 将始终返回 -1
  • 当设置GroupID时,(*Reader).Lag 将始终返回 -1
  • 当设置GroupID时,(*Reader).ReadLag 将返回错误
  • 当设置GroupID时,(*Reader).Stats 将返回分区 -1

显式提交

kafka-go 也支持显式提交。不调用 ReadMessage, 而是调用 FetchMessage 然后调用 CommitMessages

ctx := context.Background()
for {
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    if err := r.CommitMessages(ctx, m); err != nil {
        log.Fatal("提交消息失败:", err)
    }
}

在消费者组中提交消息时,给定主题/分区中偏移量最高的消息 决定了该分区提交的偏移量值。例如,如果通过调用 FetchMessage 检索到单个分区的偏移量为1、2和3的消息,那么用偏移量为3的消息 调用 CommitMessages 也会导致该分区的偏移量1和2的消息被提交。

管理提交

默认情况下,CommitMessages会同步地将偏移量提交到Kafka。为了 提高性能,你可以通过在ReaderConfig上设置CommitInterval来 定期将偏移量提交到Kafka。

// 创建一个新的读取器,消费主题A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // 每秒向Kafka刷新提交
})

写入器 GoDoc

要向Kafka生产消息,程序可以使用低级别的 Conn API,但 该包还提供了更高级别的 Writer 类型,在大多数情况下更适合 使用,因为它提供了额外的功能:

  • 自动重试和在错误时重新连接。
  • 可配置的消息在可用分区间的分布。
  • 同步或异步向Kafka写入消息。
  • 使用上下文进行异步取消。
  • 关闭时刷新待处理的消息以支持优雅关闭。
  • 在发布消息之前创建缺失的主题。注意! 这是直到版本 v0.4.30 的默认行为。
// 创建一个写入器,生产到主题A,使用最少字节分布
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:   "topic-A",
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("写入消息失败:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("关闭写入器失败:", err)
}

在发布之前创建缺失的主题

// 创建一个写入器,发布消息到主题A。
// 如果主题不存在,将会被创建。
w := &Writer{
    Addr:                   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Topic:                  "topic-A",
    AllowAutoTopicCreation: true,
}

messages := []kafka.Message{
    {
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
    {
        Key:   []byte("Key-B"),
        Value: []byte("One!"),
    },
    {
        Key:   []byte("Key-C"),
        Value: []byte("Two!"),
    },
}

var err error
const retries = 3
for i := 0; i < retries; i++ {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // 在发布消息之前尝试创建主题
    err = w.WriteMessages(ctx, messages...)
    if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
        time.Sleep(time.Millisecond * 250)
        continue
    }

    if err != nil {
        log.Fatalf("意外错误 %v", err)
    }
    break
}

if err := w.Close(); err != nil {
    log.Fatal("关闭写入器失败:", err)
}

写入多个主题

通常,WriterConfig.Topic 用于初始化单一主题的写入器。 通过排除该特定配置,你可以通过设置 Message.Topic 来 定义每条消息的主题。

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    // 注意:当这里没有定义Topic时,每条Message必须定义它。
	Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
    // 注意:每条Message都定义了Topic,否则将返回错误。
	kafka.Message{
        Topic: "topic-A",
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
        Topic: "topic-B",
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
        Topic: "topic-C",
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)
if err != nil {
    log.Fatal("写入消息失败:", err)
}

if err := w.Close(); err != nil {
    log.Fatal("关闭写入器失败:", err)
}

注意: 这两种模式是互斥的,如果你设置了 Writer.Topic, 你就不能在写入的消息上显式定义 Message.Topic。当你没有为写入器 定义主题时,相反的情况也适用。如果 Writer 检测到这种歧义, 将返回错误。

与其他客户端的兼容性

Sarama

如果你从Sarama切换过来,并且需要/想要使用相同的消息分区算法,你可以使用 kafka.Hash 平衡器或 kafka.ReferenceHash 平衡器:

  • kafka.Hash = sarama.NewHashPartitioner
  • kafka.ReferenceHash = sarama.NewReferenceHashPartitioner kafka.Hashkafka.ReferenceHash均衡器会将消息路由到与前述两个Sarama分区器相同的分区。
w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: &kafka.Hash{},
}

librdkafka和confluent-kafka-go

使用kafka.CRC32Balancer均衡器可以获得与librdkafka的默认consistent_random分区策略相同的行为。

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.CRC32Balancer{},
}

Java

使用kafka.Murmur2Balancer均衡器可以获得与标准Java客户端默认分区器相同的行为。注意:Java类允许直接指定分区,这在这里是不允许的。

w := &kafka.Writer{
	Addr:     kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:    "topic-A",
	Balancer: kafka.Murmur2Balancer{},
}

压缩

可以通过设置Compression字段在Writer上启用压缩:

w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:       "topic-A",
	Compression: kafka.Snappy,
}

Reader会通过检查消息属性来确定消费的消息是否被压缩。但是,必须导入所有预期编解码器的包,以确保它们正确加载。

注意:在0.4版本之前,程序必须导入压缩包以安装编解码器并支持从kafka读取压缩消息。现在不再需要这样做,压缩包的导入现在是无操作的。

TLS支持

对于基本的Conn类型或在Reader/Writer配置中,你可以指定一个dialer选项来支持TLS。如果TLS字段为nil,它将不会使用TLS连接。 *注意:*如果在Conn/Reader/Writer上未配置TLS而连接到启用了TLS的Kafka集群,可能会导致不透明的io.ErrUnexpectedEOF错误。

连接

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls配置...},
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls配置...},
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

直接创建Writer

w := kafka.Writer{
    Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), 
    Topic:   "topic-A",
    Balancer: &kafka.Hash{},
    Transport: &kafka.Transport{
        TLS: &tls.Config{},
      },
    }

使用kafka.NewWriter

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls配置...},
}

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	Topic:   "topic-A",
	Balancer: &kafka.Hash{},
	Dialer:   dialer,
})

注意kafka.NewWriterkafka.WriterConfig已被弃用,将在未来版本中移除。

SASL支持

你可以在Dialer上指定一个选项来使用SASL认证。Dialer可以直接用于打开Conn,也可以通过各自的配置传递给ReaderWriter。如果SASLMechanism字段为nil,它将不会使用SASL进行认证。

SASL认证类型

Plain

mechanism := plain.Mechanism{
    Username: "username",
    Password: "password",
}

SCRAM

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

连接

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

dialer := &kafka.Dialer{
    Timeout:       10 * time.Second,
    DualStack:     true,
    SASLMechanism: mechanism,
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092","localhost:9093", "localhost:9094"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})

Writer

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transport负责管理连接池和其他资源,
// 通常最好创建几个并在应用程序中共享它们。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

w := kafka.Writer{
	Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
	Topic:     "topic-A",
	Balancer:  &kafka.Hash{},
	Transport: sharedTransport,
}

Client

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
    panic(err)
}

// Transport负责管理连接池和其他资源,
// 通常最好创建几个并在应用程序中共享它们。
sharedTransport := &kafka.Transport{
    SASL: mechanism,
}

client := &kafka.Client{
    Addr:      kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
    Timeout:   10 * time.Second,
    Transport: sharedTransport,
}

读取指定时间范围内的所有消息

startTime := time.Now().Add(-time.Hour)
endTime := time.Now()
batchSize := int(10e6) // 10MB

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
    Topic:     "my-topic1",
    Partition: 0,
    MaxBytes:  batchSize,
})

r.SetOffsetAt(context.Background(), startTime)

for {
    m, err := r.ReadMessage(context.Background())

    if err != nil {
        break
    }
    if m.Time.After(endTime) {
        break
    }
    // TODO: 处理消息
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
    log.Fatal("failed to close reader:", err)
}

日志记录

为了查看Reader/Writer类型的操作,在创建时配置一个日志记录器。

Reader

func logf(msg string, a ...interface{}) {
	fmt.Printf(msg, a...)
	fmt.Println()
}

r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:     []string{"localhost:9092", "localhost:9093", "localhost:9094"},
	Topic:       "my-topic1",
	Partition:   0,
	Logger:      kafka.LoggerFunc(logf),
	ErrorLogger: kafka.LoggerFunc(logf),
})

Writer

func logf(msg string, a ...interface{}) {
	fmt.Printf(msg, a...)
	fmt.Println()
}
w := &kafka.Writer{
	Addr:        kafka.TCP("localhost:9092"),
	Topic:       "topic",
	Logger:      kafka.LoggerFunc(logf),
	ErrorLogger: kafka.LoggerFunc(logf),
}

测试

较新版本的Kafka中微妙的行为变化导致一些历史测试失败,如果您正在运行Kafka 2.3.1或更高版本,导出KAFKA_SKIP_NETTEST=1环境变量将跳过这些测试。

在Docker中本地运行Kafka

docker-compose up -d

运行测试

KAFKA_VERSION=2.3.1 \
  KAFKA_SKIP_NETTEST=1 \
  go test -race ./...

(或者)清理缓存的测试结果并运行测试:

go clean -cache && make test
项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

吐司

探索Tensor.Art平台的独特AI模型,免费访问各种图像生成与AI训练工具,从Stable Diffusion等基础模型开始,轻松实现创新图像生成。体验前沿的AI技术,推动个人和企业的创新发展。

Project Cover

SubCat字幕猫

SubCat字幕猫APP是一款创新的视频播放器,它将改变您观看视频的方式!SubCat结合了先进的人工智能技术,为您提供即时视频字幕翻译,无论是本地视频还是网络流媒体,让您轻松享受各种语言的内容。

Project Cover

美间AI

美间AI创意设计平台,利用前沿AI技术,为设计师和营销人员提供一站式设计解决方案。从智能海报到3D效果图,再到文案生成,美间让创意设计更简单、更高效。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号