Project Icon

rueidis

支持自动流水线和客户端缓存的高性能Go Redis库

rueidis是一个高性能的Go语言Redis客户端库,实现了自动流水线和服务器辅助的客户端缓存机制。该库支持非阻塞Redis命令的自动流水线处理,提供通用对象映射和分布式锁等功能。rueidis集成了OpenTelemetry,支持Pub/Sub、Redis集群和哨兵模式,为开发者提供全面的Redis客户端解决方案。其优化设计显著提升了吞吐量和降低了延迟,适用于各种高性能Redis应用场景。

rueidis

Go 参考 CircleCI Go 报告卡 codecov

一个快速的 Golang Redis 客户端,具有自动流水线功能并支持服务器辅助的客户端缓存。

特性


入门

package main

import (
	"context"
	"github.com/redis/rueidis"
)

func main() {
	client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	ctx := context.Background()
	// SET key val NX
	err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
	// HGETALL hm
	hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}

查看更多示例:命令响应速查表

开发者友好的命令构建器

client.B() 是构建 Redis 命令的入口点:

开发者友好的命令构建器
由 @FZambia 录制 使用 Rueidis Go 库提高 Centrifugo Redis 引擎的吞吐量和分配效率

构建命令后,使用 client.Do()client.DoMulti() 将其发送到 Redis。

你❗️不应该❗️在另一个 client.Do()client.DoMulti() 调用中重复使用命令,因为默认情况下它已被回收到底层的 sync.Pool 中。

要重复使用命令,请在 Build() 后使用 Pin(),这将防止命令被回收。

流水线

自动流水线

所有并发的非阻塞 Redis 命令(如 GETSET)都会自动进行流水线处理,这减少了整体的往返次数和系统调用,从而获得更高的吞吐量。你可以通过从多个 goroutine 并发调用 client.Do() 来轻松获得流水线技术的好处。例如:

func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
	// 以下 client.Do() 操作将从多个 goroutine 发出,
	// 因此会自动进行流水线处理。
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
		}
	})
}

与 go-redis v9 的基准对比

与 go-redis 相比,Rueidis 在 1、8 和 64 并行度设置下都有更高的吞吐量。

在 Macbook Pro 16" M1 Pro 2021 的本地基准测试中,它甚至能够达到比 go-redis 高出 ~14 倍 的吞吐量。(参见 parallelism(64)-key(16)-value(64)-10

client_test_set

基准测试源代码:https://github.com/rueian/rueidis-benchmark

在两台 GCP n2-highcpu-2 机器上进行的基准测试结果也显示,rueidis 可以在更低延迟的情况下实现更高的吞吐量:https://github.com/redis/rueidis/pull/93

手动流水线

除了自动流水线外,你还可以使用 DoMulti() 手动进行流水线处理:

cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
    cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
    if err := resp.Error(); err != nil {
        panic(err)
    }
}

服务器辅助的客户端缓存

默认启用的服务器辅助客户端缓存的选择加入模式可以通过调用 DoCache()DoMultiCache() 并指定客户端 TTL 来使用。

client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
    rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
    rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))

缓存的响应,包括 Redis Nil,将在收到 Redis 服务器的通知或达到客户端 TTL 时被失效。有关更多详细信息,请参阅 https://github.com/redis/rueidis/issues/534。

基准测试

服务器辅助的客户端缓存可以大幅提高延迟和吞吐量,就像在应用程序内部有一个 Redis 副本一样。例如:

client_test_get

基准测试源代码:https://github.com/rueian/rueidis-benchmark

客户端缓存辅助函数

使用 CacheTTL() 检查剩余的客户端缓存 TTL(以秒为单位):

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60

使用 IsCacheHit() 验证响应是否来自客户端内存:

client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true

如果通过 rueidisotel.NewClient(option) 启用了 OpenTelemetry,则还会有两个指标被测量:

  • rueidis_do_cache_miss
  • rueidis_do_cache_hits

MGET/JSON.MGET 客户端缓存辅助函数

rueidis.MGetCacherueidis.JsonMGetCache 是通过客户端缓存获取跨不同槽位的多个键的便捷辅助函数。它们首先按槽位分组键以构建 MGETJSON.MGET 命令,然后仅将缓存未命中的键发送到 Redis 节点。

广播模式客户端缓存

尽管默认是选择加入模式,但你可以通过在 ClientOption.ClientTrackingOptions 中指定前缀来使用广播模式:

client, err := rueidis.NewClient(rueidis.ClientOption{
	InitAddress:           []string{"127.0.0.1:6379"},
	ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"},
})
if err != nil {
	panic(err)
}
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true

请确保传递给 DoCache()DoMultiCache() 的命令被你的前缀覆盖。否则,它们的客户端缓存将不会被 Redis 失效。

使用缓存旁路模式的客户端缓存

缓存旁路是一种广泛使用的缓存策略。 rueidisaside可以帮助你将数据缓存到由Redis支持的客户端缓存中。例如:

client, err := rueidisaside.NewClient(rueidisaside.ClientOption{
    ClientOption: rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
})
if err != nil {
    panic(err)
}
val, err := client.Get(context.Background(), time.Minute, "mykey", func(ctx context.Context, key string) (val string, err error) {
    if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows {
        val = "_nil_" // 缓存nil以避免穿透。
        err = nil     // 在sql.ErrNoRows的情况下清除err。
    }
    return
})
// ...

请参考rueidisaside中的完整示例。

禁用客户端缓存

某些Redis提供商不支持客户端缓存,例如Google Cloud Memorystore。 你可以通过将ClientOption.DisableCache设置为true来禁用客户端缓存。 这也会将client.DoCache()client.DoMultiCache()回退到client.Do()client.DoMulti()

上下文取消

如果上下文被取消或达到截止时间,client.Do()client.DoMulti()client.DoCache()client.DoMultiCache()可以提前返回。

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded

请注意,虽然操作可以提前返回,但命令可能已经发送。

发布/订阅

要接收来自频道的消息,应使用client.Receive()。它支持SUBSCRIBEPSUBSCRIBE和Redis 7.0的SSUBSCRIBE

err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
    // 处理消息
})

提供的处理程序将使用接收到的消息进行调用。

重要的是要注意,client.Receive()将在以下情况下一直阻塞直到返回一个值:

  1. 当收到与提供的subscribe命令相关的任何取消订阅/取消模式订阅消息时,返回nil
  2. 当客户端手动关闭时,返回rueidis.ErrClosing
  3. ctx完成时,返回ctx.Err()
  4. 当提供的subscribe命令失败时,返回非nil的err

虽然client.Receive()调用是阻塞的,但Client仍然能够接受其他并发请求, 它们共享同一个TCP连接。如果你的消息处理程序可能需要一些时间才能完成,建议 在client.Dedicated()内使用client.Receive(),以免阻塞其他并发请求。

替代发布/订阅钩子

client.Receive()要求用户提前提供订阅命令。 还有一个替代方案Dedicatedclient.SetPubSubHooks(),允许用户稍后订阅/取消订阅频道。

c, cancel := client.Dedicate()
defer cancel()

wait := c.SetPubSubHooks(rueidis.PubSubHooks{
	OnMessage: func(m rueidis.PubSubMessage) {
		// 处理消息。这个回调将在另一个goroutine中按顺序调用。
	}
})
c.Do(ctx, c.B().Subscribe().Channel("ch").Build())
err := <-wait // 断开连接并返回err

如果钩子不为nil,上面的wait通道保证在钩子不再被调用时关闭, 并最多产生一个描述原因的错误。用户可以使用此通道检测断开连接。

CAS事务

要执行CAS事务WATCH + MULTI + EXEC),应该使用专用连接,因为在WATCHEXEC之间不应有无意的写命令。否则,EXEC可能不会按预期失败。

client.Dedicated(func(c rueidis.DedicatedClient) error {
    // 首先监视键
    c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
    // 在这里执行读取
    c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
    // 使用MULTI EXEC执行写入
    c.DoMulti(
        ctx,
        c.B().Multi().Build(),
        c.B().Set().Key("k1").Value("1").Build(),
        c.B().Set().Key("k2").Value("2").Build(),
        c.B().Exec().Build(),
    )
    return nil
})

或者使用Dedicate()并在完成时调用cancel()将连接放回池中。

c, cancel := client.Dedicate()
defer cancel()

c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// 使用占用连接的`client`执行剩余的CAS操作

然而,占用连接对吞吐量不利。最好使用Lua脚本来执行乐观锁定。

Lua脚本

NewLuaScriptNewLuaScriptReadOnly将创建一个可安全并发使用的脚本。

调用script.Exec时,它会首先尝试发送EVALSHA,如果服务器返回NOSCRIPT,则回退到EVAL

script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// script.Exec可以安全地并发调用
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()

流式读取

client.DoStream()client.DoMultiStream()可用于将大型Redis响应直接发送到io.Writer, 而无需将它们分配到内存中。它们的工作原理是首先将命令发送到从池中获取的专用连接, 然后直接将响应值复制到给定的io.Writer,最后回收连接。

s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
    n, err := s.WriteTo(io.Discard)
    if rueidis.IsRedisNil(err) {
        // ...
    }
}

注意,这两种方法在所有响应写入给定的io.Writer之前会占用连接。 这可能需要很长时间并影响性能。除非你想避免为大型Redis响应分配内存,否则请使用普通的Do()DoMulti()

还要注意,这两种方法只适用于stringintegerfloat类型的Redis响应。目前,DoMultiStream在连接到Redis集群时不支持跨多个槽位进行键的流水线处理。

内存消耗考虑

rueidis中的每个底层连接都为流水线分配一个环形缓冲区。 其大小由ClientOption.RingScaleEachConn控制,默认值为10,结果是每个环的大小为2^10。

如果你有许多rueidis连接,你可能会发现它们占用了相当多的内存。 在这种情况下,你可以考虑将ClientOption.RingScaleEachConn减少到8或9,但可能会导致潜在的吞吐量下降。

你也可以考虑将ClientOption.PipelineMultiplex的值设置为-1,这将让rueidis对每个Redis节点只使用1个连接进行流水线处理。

实例化新的Redis客户端

你可以使用NewClient创建一个新的Redis客户端,并提供多个选项。

// 连接到单个Redis节点:
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:6379"},
})

// 连接到Redis集群
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    ShuffleInit: true,
})

// 连接到Redis集群并使用副本进行读操作
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
    SendToReplicas: func(cmd rueidis.Completed) bool {
        return cmd.IsReadOnly()
    },
})

// 连接到哨兵
client, err := rueidis.NewClient(rueidis.ClientOption{
    InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
    Sentinel: rueidis.SentinelOption{
        MasterSet: "my_master",
    },
})

Redis URL

你可以使用 ParseURLMustParseURL 来构造一个 ClientOption

提供的 URL 必须以 redis://rediss://unix:// 开头。

当前支持的 URL 参数有 dbdial_timeoutwrite_timeoutaddrprotocolclient_cacheclient_namemax_retriesmaster_set

// 连接到 Redis 集群
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
// 连接到 Redis 节点
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:6379/0"))
// 连接到 Redis 哨兵
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))

任意命令

如果你想构造命令构建器中没有的命令,可以使用 client.B().Arbitrary()

// 这将生成 [ANY CMD k1 k2 a1 a2]
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()

处理 JSON、原始 []byte 和向量相似度搜索

命令构建器将所有参数视为 Redis 字符串,这些字符串是二进制安全的。这意味着用户可以直接将 []byte 存储到 Redis 中而无需转换。rueidis.BinaryString 辅助函数可以将 []byte 转换为 string 而无需复制。例如:

client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()

将所有参数视为 Redis 字符串也意味着命令构建器不会自动为用户进行任何引用或转换。

在使用 RedisJSON 时,用户经常需要在 Redis 字符串中准备 JSON 字符串。rueidis.JSON 可以帮助实现这一点:

client.B().JsonSet().Key("j").Path("$.myStrField").Value(rueidis.JSON("str")).Build()
// 等价于
client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()

在进行向量相似度搜索时,用户可以使用 rueidis.VectorString32rueidis.VectorString64 来构建查询:

cmd := client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
    Params().Nargs(2).NameValue().NameValue("V", rueidis.VectorString64([]float64{...})).
    Dialect(2).Build()
n, resp, err := client.Do(ctx, cmd).AsFtSearch()

命令响应速查表

虽然命令构建器对开发者友好,但响应解析器稍显不友好。开发者必须预先知道服务器将返回什么类型的 Redis 响应,以及应该使用哪个解析器。

错误处理: 如果选择了不正确的解析器函数,将返回 errParse。以下是使用 ToArray 演示这种情况的示例:

// 尝试解析响应。如果发生解析错误,检查错误是否为解析错误并处理它。
// 通常,你应该通过选择正确的解析器函数来修复代码。
// 例如,如果预期响应是字符串,则使用 ToString(),如果预期响应是数组,则使用 ToArray(),如下所示:
if err := client.Do(ctx, client.B().Get().Key("k").Build()).ToArray(); IsParseErr(err) {
    fmt.Println("解析错误:", err)
}

要记住将返回什么类型的消息以及使用哪种解析方式是很困难的。因此,这里列出了一些常见的示例:

// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// EXPIRE
client.Do(ctx, client.B().Expire().Key("k").Seconds(1).Build()).AsInt64()
// HEXPIRE
client.Do(ctx, client.B().Hexpire().Key("h").Seconds(1).Fields().Numfields(2).Field("f1", "f2").Build()).AsIntSlice()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()
// ZPOPMIN
client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore()
client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()
// SCAN
client.Do(ctx, client.B().Scan().Cursor(0).Build()).AsScanEntry()
// FT.SEARCH
client.Do(ctx, client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch()
// GEOSEARCH
client.Do(ctx, client.B().Geosearch().Key("k").Fromlonlat(1, 1).Bybox(1).Height(1).Km().Build()).AsGeosearch()

使用 DecodeSliceOfJSON 扫描数组结果

当你想将数组结果扫描到特定结构体的切片中时,DecodeSliceOfJSON 非常有用。

type User struct {
	Name string `json:"name"`
}

// 设置一些值
if err = client.Do(ctx, client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error(); err != nil {
	return err
}
if err = client.Do(ctx, client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error(); err != nil {
	return err
}

// 将 MGET 结果扫描到 []*User 中
var users []*User // 或者 []User 也可以扫描
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
	return err
}

for _, user := range users {
	fmt.Printf("%+v\n", user)
}
/*
&{name:name1}
&{name:name2}
*/

!!!!!! 不要这样做 !!!!!!

请确保结果中的所有值都具有相同的 JSON 结构。

// 设置一个纯字符串值
if err = client.Do(ctx, client.B().Set().Key("user1").Value("userName1").Build()).Error(); err != nil {
	return err
}

// 错误做法
users := make([]*User, 0)
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1").Build()), &users); err != nil {
	return err
}
// -> 错误:在寻找值的开头时遇到无效字符 'u'
// 在这种情况下,使用 client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()

贡献

欢迎贡献,包括问题拉取请求讨论。 贡献对我们意义重大,有助于改进这个库和社区!

生成命令构建器

命令构建器是基于 ./hack/cmds 中的定义通过运行以下命令生成的:

go generate

测试

请使用./dockertest.sh脚本在本地运行测试用例。 并请尽最大努力确保代码更改的测试覆盖率达到100%。

项目侧边栏1项目侧边栏2
推荐项目
Project Cover

豆包MarsCode

豆包 MarsCode 是一款革命性的编程助手,通过AI技术提供代码补全、单测生成、代码解释和智能问答等功能,支持100+编程语言,与主流编辑器无缝集成,显著提升开发效率和代码质量。

Project Cover

AI写歌

Suno AI是一个革命性的AI音乐创作平台,能在短短30秒内帮助用户创作出一首完整的歌曲。无论是寻找创作灵感还是需要快速制作音乐,Suno AI都是音乐爱好者和专业人士的理想选择。

Project Cover

白日梦AI

白日梦AI提供专注于AI视频生成的多样化功能,包括文生视频、动态画面和形象生成等,帮助用户快速上手,创造专业级内容。

Project Cover

有言AI

有言平台提供一站式AIGC视频创作解决方案,通过智能技术简化视频制作流程。无论是企业宣传还是个人分享,有言都能帮助用户快速、轻松地制作出专业级别的视频内容。

Project Cover

Kimi

Kimi AI助手提供多语言对话支持,能够阅读和理解用户上传的文件内容,解析网页信息,并结合搜索结果为用户提供详尽的答案。无论是日常咨询还是专业问题,Kimi都能以友好、专业的方式提供帮助。

Project Cover

讯飞绘镜

讯飞绘镜是一个支持从创意到完整视频创作的智能平台,用户可以快速生成视频素材并创作独特的音乐视频和故事。平台提供多样化的主题和精选作品,帮助用户探索创意灵感。

Project Cover

讯飞文书

讯飞文书依托讯飞星火大模型,为文书写作者提供从素材筹备到稿件撰写及审稿的全程支持。通过录音智记和以稿写稿等功能,满足事务性工作的高频需求,帮助撰稿人节省精力,提高效率,优化工作与生活。

Project Cover

阿里绘蛙

绘蛙是阿里巴巴集团推出的革命性AI电商营销平台。利用尖端人工智能技术,为商家提供一键生成商品图和营销文案的服务,显著提升内容创作效率和营销效果。适用于淘宝、天猫等电商平台,让商品第一时间被种草。

Project Cover

AIWritePaper论文写作

AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文,配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。

投诉举报邮箱: service@vectorlightyear.com
@2024 懂AI·鲁ICP备2024100362号-6·鲁公网安备37021002001498号