rueidis
一个快速的 Golang Redis 客户端,具有自动流水线功能并支持服务器辅助的客户端缓存。
特性
- 非阻塞 Redis 命令的自动流水线
- 服务器辅助的客户端缓存
- 带客户端缓存的通用对象映射
- 带客户端缓存的缓存侧边模式
- 带客户端缓存的分布式锁
- 用于编写 rueidis 模拟测试的辅助工具
- OpenTelemetry 集成
- 钩子和其他集成
- 类似 Go-redis 的 API 适配器,由 @418Coffee 提供
- 发布/订阅、分片发布/订阅、流
- Redis 集群、哨兵、RedisJSON、RedisBloom、RediSearch、RedisTimeseries 等
- 无需 Redis Stack 的概率数据结构
入门
package main
import (
"context"
"github.com/redis/rueidis"
)
func main() {
client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
// SET key val NX
err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error()
// HGETALL hm
hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap()
}
查看更多示例:命令响应速查表
开发者友好的命令构建器
client.B()
是构建 Redis 命令的入口点:
由 @FZambia 录制 使用 Rueidis Go 库提高 Centrifugo Redis 引擎的吞吐量和分配效率
构建命令后,使用 client.Do()
或 client.DoMulti()
将其发送到 Redis。
你❗️不应该❗️在另一个 client.Do()
或 client.DoMulti()
调用中重复使用命令,因为默认情况下它已被回收到底层的 sync.Pool
中。
要重复使用命令,请在 Build()
后使用 Pin()
,这将防止命令被回收。
流水线
自动流水线
所有并发的非阻塞 Redis 命令(如 GET
、SET
)都会自动进行流水线处理,这减少了整体的往返次数和系统调用,从而获得更高的吞吐量。你可以通过从多个 goroutine 并发调用 client.Do()
来轻松获得流水线技术的好处。例如:
func BenchmarkPipelining(b *testing.B, client rueidis.Client) {
// 以下 client.Do() 操作将从多个 goroutine 发出,
// 因此会自动进行流水线处理。
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString()
}
})
}
与 go-redis v9 的基准对比
与 go-redis 相比,Rueidis 在 1、8 和 64 并行度设置下都有更高的吞吐量。
在 Macbook Pro 16" M1 Pro 2021 的本地基准测试中,它甚至能够达到比 go-redis 高出 ~14 倍 的吞吐量。(参见 parallelism(64)-key(16)-value(64)-10
)
基准测试源代码:https://github.com/rueian/rueidis-benchmark
在两台 GCP n2-highcpu-2 机器上进行的基准测试结果也显示,rueidis 可以在更低延迟的情况下实现更高的吞吐量:https://github.com/redis/rueidis/pull/93
手动流水线
除了自动流水线外,你还可以使用 DoMulti()
手动进行流水线处理:
cmds := make(rueidis.Commands, 0, 10)
for i := 0; i < 10; i++ {
cmds = append(cmds, client.B().Set().Key("key").Value("value").Build())
}
for _, resp := range client.DoMulti(ctx, cmds...) {
if err := resp.Error(); err != nil {
panic(err)
}
}
服务器辅助的客户端缓存
默认启用的服务器辅助客户端缓存的选择加入模式可以通过调用 DoCache()
或 DoMultiCache()
并指定客户端 TTL 来使用。
client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray()
client.DoMultiCache(ctx,
rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute),
rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))
缓存的响应,包括 Redis Nil,将在收到 Redis 服务器的通知或达到客户端 TTL 时被失效。有关更多详细信息,请参阅 https://github.com/redis/rueidis/issues/534。
基准测试
服务器辅助的客户端缓存可以大幅提高延迟和吞吐量,就像在应用程序内部有一个 Redis 副本一样。例如:
基准测试源代码:https://github.com/rueian/rueidis-benchmark
客户端缓存辅助函数
使用 CacheTTL()
检查剩余的客户端缓存 TTL(以秒为单位):
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60
使用 IsCacheHit()
验证响应是否来自客户端内存:
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true
如果通过 rueidisotel.NewClient(option)
启用了 OpenTelemetry,则还会有两个指标被测量:
- rueidis_do_cache_miss
- rueidis_do_cache_hits
MGET/JSON.MGET 客户端缓存辅助函数
rueidis.MGetCache
和 rueidis.JsonMGetCache
是通过客户端缓存获取跨不同槽位的多个键的便捷辅助函数。它们首先按槽位分组键以构建 MGET
或 JSON.MGET
命令,然后仅将缓存未命中的键发送到 Redis 节点。
广播模式客户端缓存
尽管默认是选择加入模式,但你可以通过在 ClientOption.ClientTrackingOptions
中指定前缀来使用广播模式:
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"},
})
if err != nil {
panic(err)
}
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false
client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true
请确保传递给 DoCache()
和 DoMultiCache()
的命令被你的前缀覆盖。否则,它们的客户端缓存将不会被 Redis 失效。
使用缓存旁路模式的客户端缓存
缓存旁路是一种广泛使用的缓存策略。 rueidisaside可以帮助你将数据缓存到由Redis支持的客户端缓存中。例如:
client, err := rueidisaside.NewClient(rueidisaside.ClientOption{
ClientOption: rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}},
})
if err != nil {
panic(err)
}
val, err := client.Get(context.Background(), time.Minute, "mykey", func(ctx context.Context, key string) (val string, err error) {
if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows {
val = "_nil_" // 缓存nil以避免穿透。
err = nil // 在sql.ErrNoRows的情况下清除err。
}
return
})
// ...
请参考rueidisaside中的完整示例。
禁用客户端缓存
某些Redis提供商不支持客户端缓存,例如Google Cloud Memorystore。
你可以通过将ClientOption.DisableCache
设置为true
来禁用客户端缓存。
这也会将client.DoCache()
和client.DoMultiCache()
回退到client.Do()
和client.DoMulti()
。
上下文取消
如果上下文被取消或达到截止时间,client.Do()
、client.DoMulti()
、client.DoCache()
和client.DoMultiCache()
可以提前返回。
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded
请注意,虽然操作可以提前返回,但命令可能已经发送。
发布/订阅
要接收来自频道的消息,应使用client.Receive()
。它支持SUBSCRIBE
、PSUBSCRIBE
和Redis 7.0的SSUBSCRIBE
:
err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) {
// 处理消息
})
提供的处理程序将使用接收到的消息进行调用。
重要的是要注意,client.Receive()
将在以下情况下一直阻塞直到返回一个值:
- 当收到与提供的
subscribe
命令相关的任何取消订阅/取消模式订阅消息时,返回nil
。 - 当客户端手动关闭时,返回
rueidis.ErrClosing
。 - 当
ctx
完成时,返回ctx.Err()
。 - 当提供的
subscribe
命令失败时,返回非nil的err
。
虽然client.Receive()
调用是阻塞的,但Client
仍然能够接受其他并发请求,
它们共享同一个TCP连接。如果你的消息处理程序可能需要一些时间才能完成,建议
在client.Dedicated()
内使用client.Receive()
,以免阻塞其他并发请求。
替代发布/订阅钩子
client.Receive()
要求用户提前提供订阅命令。
还有一个替代方案Dedicatedclient.SetPubSubHooks()
,允许用户稍后订阅/取消订阅频道。
c, cancel := client.Dedicate()
defer cancel()
wait := c.SetPubSubHooks(rueidis.PubSubHooks{
OnMessage: func(m rueidis.PubSubMessage) {
// 处理消息。这个回调将在另一个goroutine中按顺序调用。
}
})
c.Do(ctx, c.B().Subscribe().Channel("ch").Build())
err := <-wait // 断开连接并返回err
如果钩子不为nil,上面的wait
通道保证在钩子不再被调用时关闭,
并最多产生一个描述原因的错误。用户可以使用此通道检测断开连接。
CAS事务
要执行CAS事务(WATCH
+ MULTI
+ EXEC
),应该使用专用连接,因为在WATCH
和EXEC
之间不应有无意的写命令。否则,EXEC
可能不会按预期失败。
client.Dedicated(func(c rueidis.DedicatedClient) error {
// 首先监视键
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// 在这里执行读取
c.Do(ctx, c.B().Mget().Key("k1", "k2").Build())
// 使用MULTI EXEC执行写入
c.DoMulti(
ctx,
c.B().Multi().Build(),
c.B().Set().Key("k1").Value("1").Build(),
c.B().Set().Key("k2").Value("2").Build(),
c.B().Exec().Build(),
)
return nil
})
或者使用Dedicate()
并在完成时调用cancel()
将连接放回池中。
c, cancel := client.Dedicate()
defer cancel()
c.Do(ctx, c.B().Watch().Key("k1", "k2").Build())
// 使用占用连接的`client`执行剩余的CAS操作
然而,占用连接对吞吐量不利。最好使用Lua脚本来执行乐观锁定。
Lua脚本
NewLuaScript
或NewLuaScriptReadOnly
将创建一个可安全并发使用的脚本。
调用script.Exec
时,它会首先尝试发送EVALSHA
,如果服务器返回NOSCRIPT
,则回退到EVAL
。
script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}")
// script.Exec可以安全地并发调用
list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
流式读取
client.DoStream()
和client.DoMultiStream()
可用于将大型Redis响应直接发送到io.Writer
,
而无需将它们分配到内存中。它们的工作原理是首先将命令发送到从池中获取的专用连接,
然后直接将响应值复制到给定的io.Writer
,最后回收连接。
s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build())
for s.HasNext() {
n, err := s.WriteTo(io.Discard)
if rueidis.IsRedisNil(err) {
// ...
}
}
注意,这两种方法在所有响应写入给定的io.Writer
之前会占用连接。
这可能需要很长时间并影响性能。除非你想避免为大型Redis响应分配内存,否则请使用普通的Do()
和DoMulti()
。
还要注意,这两种方法只适用于string
、integer
和float
类型的Redis响应。目前,DoMultiStream
在连接到Redis集群时不支持跨多个槽位进行键的流水线处理。
内存消耗考虑
rueidis中的每个底层连接都为流水线分配一个环形缓冲区。
其大小由ClientOption.RingScaleEachConn
控制,默认值为10,结果是每个环的大小为2^10。
如果你有许多rueidis连接,你可能会发现它们占用了相当多的内存。
在这种情况下,你可以考虑将ClientOption.RingScaleEachConn
减少到8或9,但可能会导致潜在的吞吐量下降。
你也可以考虑将ClientOption.PipelineMultiplex
的值设置为-1
,这将让rueidis对每个Redis节点只使用1个连接进行流水线处理。
实例化新的Redis客户端
你可以使用NewClient
创建一个新的Redis客户端,并提供多个选项。
// 连接到单个Redis节点:
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
})
// 连接到Redis集群
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
ShuffleInit: true,
})
// 连接到Redis集群并使用副本进行读操作
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"},
SendToReplicas: func(cmd rueidis.Completed) bool {
return cmd.IsReadOnly()
},
})
// 连接到哨兵
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"},
Sentinel: rueidis.SentinelOption{
MasterSet: "my_master",
},
})
Redis URL
你可以使用 ParseURL
或 MustParseURL
来构造一个 ClientOption
。
提供的 URL 必须以 redis://
、rediss://
或 unix://
开头。
当前支持的 URL 参数有 db
、dial_timeout
、write_timeout
、addr
、protocol
、client_cache
、client_name
、max_retries
和 master_set
。
// 连接到 Redis 集群
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003"))
// 连接到 Redis 节点
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:6379/0"))
// 连接到 Redis 哨兵
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))
任意命令
如果你想构造命令构建器中没有的命令,可以使用 client.B().Arbitrary()
:
// 这将生成 [ANY CMD k1 k2 a1 a2]
client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()
处理 JSON、原始 []byte
和向量相似度搜索
命令构建器将所有参数视为 Redis 字符串,这些字符串是二进制安全的。这意味着用户可以直接将 []byte
存储到 Redis 中而无需转换。rueidis.BinaryString
辅助函数可以将 []byte
转换为 string
而无需复制。例如:
client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()
将所有参数视为 Redis 字符串也意味着命令构建器不会自动为用户进行任何引用或转换。
在使用 RedisJSON 时,用户经常需要在 Redis 字符串中准备 JSON 字符串。rueidis.JSON
可以帮助实现这一点:
client.B().JsonSet().Key("j").Path("$.myStrField").Value(rueidis.JSON("str")).Build()
// 等价于
client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()
在进行向量相似度搜索时,用户可以使用 rueidis.VectorString32
和 rueidis.VectorString64
来构建查询:
cmd := client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]").
Params().Nargs(2).NameValue().NameValue("V", rueidis.VectorString64([]float64{...})).
Dialect(2).Build()
n, resp, err := client.Do(ctx, cmd).AsFtSearch()
命令响应速查表
虽然命令构建器对开发者友好,但响应解析器稍显不友好。开发者必须预先知道服务器将返回什么类型的 Redis 响应,以及应该使用哪个解析器。
错误处理: 如果选择了不正确的解析器函数,将返回 errParse。以下是使用 ToArray 演示这种情况的示例:
// 尝试解析响应。如果发生解析错误,检查错误是否为解析错误并处理它。
// 通常,你应该通过选择正确的解析器函数来修复代码。
// 例如,如果预期响应是字符串,则使用 ToString(),如果预期响应是数组,则使用 ToArray(),如下所示:
if err := client.Do(ctx, client.B().Get().Key("k").Build()).ToArray(); IsParseErr(err) {
fmt.Println("解析错误:", err)
}
要记住将返回什么类型的消息以及使用哪种解析方式是很困难的。因此,这里列出了一些常见的示例:
// GET
client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64()
// MGET
client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray()
// SET
client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error()
// INCR
client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64()
// HGET
client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString()
// HMGET
client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray()
// HGETALL
client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap()
// EXPIRE
client.Do(ctx, client.B().Expire().Key("k").Seconds(1).Build()).AsInt64()
// HEXPIRE
client.Do(ctx, client.B().Hexpire().Key("h").Seconds(1).Fields().Numfields(2).Field("f1", "f2").Build()).AsIntSlice()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice()
// ZRANK
client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64()
// ZSCORE
client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64()
// ZRANGE
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice()
client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores()
// ZPOPMIN
client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore()
client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores()
// SCARD
client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64()
// SMEMBERS
client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice()
// LINDEX
client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
// LPOP
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()
// SCAN
client.Do(ctx, client.B().Scan().Cursor(0).Build()).AsScanEntry()
// FT.SEARCH
client.Do(ctx, client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch()
// GEOSEARCH
client.Do(ctx, client.B().Geosearch().Key("k").Fromlonlat(1, 1).Bybox(1).Height(1).Km().Build()).AsGeosearch()
使用 DecodeSliceOfJSON 扫描数组结果
当你想将数组结果扫描到特定结构体的切片中时,DecodeSliceOfJSON 非常有用。
type User struct {
Name string `json:"name"`
}
// 设置一些值
if err = client.Do(ctx, client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error(); err != nil {
return err
}
if err = client.Do(ctx, client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error(); err != nil {
return err
}
// 将 MGET 结果扫描到 []*User 中
var users []*User // 或者 []User 也可以扫描
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil {
return err
}
for _, user := range users {
fmt.Printf("%+v\n", user)
}
/*
&{name:name1}
&{name:name2}
*/
!!!!!! 不要这样做 !!!!!!
请确保结果中的所有值都具有相同的 JSON 结构。
// 设置一个纯字符串值
if err = client.Do(ctx, client.B().Set().Key("user1").Value("userName1").Build()).Error(); err != nil {
return err
}
// 错误做法
users := make([]*User, 0)
if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1").Build()), &users); err != nil {
return err
}
// -> 错误:在寻找值的开头时遇到无效字符 'u'
// 在这种情况下,使用 client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()
贡献
欢迎贡献,包括问题、拉取请求和讨论。 贡献对我们意义重大,有助于改进这个库和社区!
生成命令构建器
命令构建器是基于 ./hack/cmds 中的定义通过运行以下命令生成的:
go generate
测试
请使用./dockertest.sh脚本在本地运行测试用例。 并请尽最大努力确保代码更改的测试覆盖率达到100%。