Skip to content

Commit 9fb1405

Browse files
committed
feat: load the policy table in cluster client (#4)
* feat: load the policy table in cluster client * Remove comments
1 parent 1028dd8 commit 9fb1405

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

command.go

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/redis/go-redis/v9/internal"
1515
"github.com/redis/go-redis/v9/internal/hscan"
1616
"github.com/redis/go-redis/v9/internal/proto"
17+
"github.com/redis/go-redis/v9/internal/routing"
1718
"github.com/redis/go-redis/v9/internal/util"
1819
)
1920

@@ -3478,6 +3479,7 @@ type CommandInfo struct {
34783479
LastKeyPos int8
34793480
StepCount int8
34803481
ReadOnly bool
3482+
Tips map[string]string
34813483
}
34823484

34833485
type CommandsInfoCmd struct {
@@ -3516,7 +3518,7 @@ func (cmd *CommandsInfoCmd) String() string {
35163518
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
35173519
const numArgRedis5 = 6
35183520
const numArgRedis6 = 7
3519-
const numArgRedis7 = 10
3521+
const numArgRedis7 = 10 // Also matches redis 8
35203522

35213523
n, err := rd.ReadArrayLen()
35223524
if err != nil {
@@ -3604,9 +3606,34 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
36043606
}
36053607

36063608
if nn >= numArgRedis7 {
3607-
if err := rd.DiscardNext(); err != nil {
3609+
// The 8th argument is an array of tips.
3610+
tipsLen, err := rd.ReadArrayLen()
3611+
if err != nil {
36083612
return err
36093613
}
3614+
3615+
cmdInfo.Tips = make(map[string]string, tipsLen)
3616+
3617+
for f := 0; f < tipsLen; f++ {
3618+
tip, err := rd.ReadString()
3619+
if err != nil {
3620+
return err
3621+
}
3622+
3623+
// Handle tips that don't have a colon (like "nondeterministic_output")
3624+
if !strings.Contains(tip, ":") {
3625+
cmdInfo.Tips[tip] = ""
3626+
continue
3627+
}
3628+
3629+
// Handle normal key:value tips
3630+
k, v, ok := strings.Cut(tip, ":")
3631+
if !ok {
3632+
return fmt.Errorf("redis: unexpected tip %q in COMMAND reply", tip)
3633+
}
3634+
cmdInfo.Tips[k] = v
3635+
}
3636+
36103637
if err := rd.DiscardNext(); err != nil {
36113638
return err
36123639
}
@@ -3656,6 +3683,50 @@ func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error
36563683
return c.cmds, err
36573684
}
36583685

3686+
// ------------------------------------------------------------------------------
3687+
var BuiltinPolicies = map[string]routing.CommandPolicy{
3688+
"ft.create": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3689+
"ft.alter": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3690+
"ft.drop": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3691+
3692+
"mset": {Request: routing.ReqMultiShard, Response: routing.RespAllSucceeded},
3693+
"mget": {Request: routing.ReqMultiShard, Response: routing.RespSpecial},
3694+
"del": {Request: routing.ReqMultiShard, Response: routing.RespAggSum},
3695+
}
3696+
3697+
func newCommandPolicies(commandInfo map[string]*CommandInfo) map[string]routing.CommandPolicy {
3698+
3699+
table := make(map[string]routing.CommandPolicy, len(commandInfo))
3700+
3701+
for name, info := range commandInfo {
3702+
req := routing.ReqDefault
3703+
resp := routing.RespAllSucceeded
3704+
3705+
if tips := info.Tips; tips != nil {
3706+
if v, ok := tips["request_policy"]; ok {
3707+
if p, err := routing.ParseRequestPolicy(v); err == nil {
3708+
req = p
3709+
}
3710+
}
3711+
if v, ok := tips["response_policy"]; ok {
3712+
if p, err := routing.ParseResponsePolicy(v); err == nil {
3713+
resp = p
3714+
}
3715+
}
3716+
} else {
3717+
return BuiltinPolicies
3718+
}
3719+
table[name] = routing.CommandPolicy{Request: req, Response: resp}
3720+
}
3721+
3722+
if len(table) == 0 {
3723+
for k, v := range BuiltinPolicies {
3724+
table[k] = v
3725+
}
3726+
}
3727+
return table
3728+
}
3729+
36593730
//------------------------------------------------------------------------------
36603731

36613732
type SlowLog struct {

commands_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,22 @@ var _ = Describe("Commands", func() {
657657
Expect(cmd.StepCount).To(Equal(int8(0)))
658658
})
659659

660+
It("should Command Tips", Label("NonRedisEnterprise"), func() {
661+
SkipAfterRedisVersion(7.9, "Redis 8 changed the COMMAND reply format")
662+
cmds, err := client.Command(ctx).Result()
663+
Expect(err).NotTo(HaveOccurred())
664+
665+
cmd := cmds["touch"]
666+
Expect(cmd.Name).To(Equal("touch"))
667+
Expect(cmd.Tips["request_policy"]).To(Equal("multi_shard"))
668+
Expect(cmd.Tips["response_policy"]).To(Equal("agg_sum"))
669+
670+
cmd = cmds["flushall"]
671+
Expect(cmd.Name).To(Equal("flushall"))
672+
Expect(cmd.Tips["request_policy"]).To(Equal("all_shards"))
673+
Expect(cmd.Tips["response_policy"]).To(Equal("all_succeeded"))
674+
})
675+
660676
It("should return all command names", func() {
661677
cmdList := client.CommandList(ctx, nil)
662678
Expect(cmdList.Err()).NotTo(HaveOccurred())

osscluster.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/redis/go-redis/v9/internal/pool"
2121
"github.com/redis/go-redis/v9/internal/proto"
2222
"github.com/redis/go-redis/v9/internal/rand"
23+
"github.com/redis/go-redis/v9/internal/routing"
2324
)
2425

2526
const (
@@ -937,10 +938,11 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er
937938
// or more underlying connections. It's safe for concurrent use by
938939
// multiple goroutines.
939940
type ClusterClient struct {
940-
opt *ClusterOptions
941-
nodes *clusterNodes
942-
state *clusterStateHolder
943-
cmdsInfoCache *cmdsInfoCache
941+
opt *ClusterOptions
942+
nodes *clusterNodes
943+
state *clusterStateHolder
944+
cmdsInfoCache *cmdsInfoCache
945+
commandPolicies map[string]routing.CommandPolicy
944946
cmdable
945947
hooksMixin
946948
}
@@ -960,8 +962,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
960962

961963
c.state = newClusterStateHolder(c.loadState)
962964
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
965+
c.commandPolicies = newCommandPolicies(c.cmdsInfoCache.cmds)
963966
c.cmdable = c.Process
964-
965967
c.initHooks(hooks{
966968
dial: nil,
967969
process: c.process,

0 commit comments

Comments
 (0)