Skip to content

Commit 74407a0

Browse files
authored
feat: load the policy table in cluster client (#4)
* feat: load the policy table in cluster client * Remove comments
1 parent 9e4369a commit 74407a0

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

command.go

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/redis/go-redis/v9/internal"
1515
"github.com/redis/go-redis/v9/internal/hscan"
1616
"github.com/redis/go-redis/v9/internal/proto"
17+
"github.com/redis/go-redis/v9/internal/routing"
1718
"github.com/redis/go-redis/v9/internal/util"
1819
)
1920

@@ -3413,6 +3414,7 @@ type CommandInfo struct {
34133414
LastKeyPos int8
34143415
StepCount int8
34153416
ReadOnly bool
3417+
Tips map[string]string
34163418
}
34173419

34183420
type CommandsInfoCmd struct {
@@ -3451,7 +3453,7 @@ func (cmd *CommandsInfoCmd) String() string {
34513453
func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
34523454
const numArgRedis5 = 6
34533455
const numArgRedis6 = 7
3454-
const numArgRedis7 = 10
3456+
const numArgRedis7 = 10 // Also matches redis 8
34553457

34563458
n, err := rd.ReadArrayLen()
34573459
if err != nil {
@@ -3539,9 +3541,34 @@ func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
35393541
}
35403542

35413543
if nn >= numArgRedis7 {
3542-
if err := rd.DiscardNext(); err != nil {
3544+
// The 8th argument is an array of tips.
3545+
tipsLen, err := rd.ReadArrayLen()
3546+
if err != nil {
35433547
return err
35443548
}
3549+
3550+
cmdInfo.Tips = make(map[string]string, tipsLen)
3551+
3552+
for f := 0; f < tipsLen; f++ {
3553+
tip, err := rd.ReadString()
3554+
if err != nil {
3555+
return err
3556+
}
3557+
3558+
// Handle tips that don't have a colon (like "nondeterministic_output")
3559+
if !strings.Contains(tip, ":") {
3560+
cmdInfo.Tips[tip] = ""
3561+
continue
3562+
}
3563+
3564+
// Handle normal key:value tips
3565+
k, v, ok := strings.Cut(tip, ":")
3566+
if !ok {
3567+
return fmt.Errorf("redis: unexpected tip %q in COMMAND reply", tip)
3568+
}
3569+
cmdInfo.Tips[k] = v
3570+
}
3571+
35453572
if err := rd.DiscardNext(); err != nil {
35463573
return err
35473574
}
@@ -3592,6 +3619,50 @@ func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error
35923619
return c.cmds, err
35933620
}
35943621

3622+
// ------------------------------------------------------------------------------
3623+
var BuiltinPolicies = map[string]routing.CommandPolicy{
3624+
"ft.create": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3625+
"ft.alter": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3626+
"ft.drop": {Request: routing.ReqSpecial, Response: routing.RespAllSucceeded},
3627+
3628+
"mset": {Request: routing.ReqMultiShard, Response: routing.RespAllSucceeded},
3629+
"mget": {Request: routing.ReqMultiShard, Response: routing.RespSpecial},
3630+
"del": {Request: routing.ReqMultiShard, Response: routing.RespAggSum},
3631+
}
3632+
3633+
func newCommandPolicies(commandInfo map[string]*CommandInfo) map[string]routing.CommandPolicy {
3634+
3635+
table := make(map[string]routing.CommandPolicy, len(commandInfo))
3636+
3637+
for name, info := range commandInfo {
3638+
req := routing.ReqDefault
3639+
resp := routing.RespAllSucceeded
3640+
3641+
if tips := info.Tips; tips != nil {
3642+
if v, ok := tips["request_policy"]; ok {
3643+
if p, err := routing.ParseRequestPolicy(v); err == nil {
3644+
req = p
3645+
}
3646+
}
3647+
if v, ok := tips["response_policy"]; ok {
3648+
if p, err := routing.ParseResponsePolicy(v); err == nil {
3649+
resp = p
3650+
}
3651+
}
3652+
} else {
3653+
return BuiltinPolicies
3654+
}
3655+
table[name] = routing.CommandPolicy{Request: req, Response: resp}
3656+
}
3657+
3658+
if len(table) == 0 {
3659+
for k, v := range BuiltinPolicies {
3660+
table[k] = v
3661+
}
3662+
}
3663+
return table
3664+
}
3665+
35953666
//------------------------------------------------------------------------------
35963667

35973668
type SlowLog struct {

commands_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,22 @@ var _ = Describe("Commands", func() {
657657
Expect(cmd.StepCount).To(Equal(int8(0)))
658658
})
659659

660+
It("should Command Tips", Label("NonRedisEnterprise"), func() {
661+
SkipAfterRedisVersion(7.9, "Redis 8 changed the COMMAND reply format")
662+
cmds, err := client.Command(ctx).Result()
663+
Expect(err).NotTo(HaveOccurred())
664+
665+
cmd := cmds["touch"]
666+
Expect(cmd.Name).To(Equal("touch"))
667+
Expect(cmd.Tips["request_policy"]).To(Equal("multi_shard"))
668+
Expect(cmd.Tips["response_policy"]).To(Equal("agg_sum"))
669+
670+
cmd = cmds["flushall"]
671+
Expect(cmd.Name).To(Equal("flushall"))
672+
Expect(cmd.Tips["request_policy"]).To(Equal("all_shards"))
673+
Expect(cmd.Tips["response_policy"]).To(Equal("all_succeeded"))
674+
})
675+
660676
It("should return all command names", func() {
661677
cmdList := client.CommandList(ctx, nil)
662678
Expect(cmdList.Err()).NotTo(HaveOccurred())

osscluster.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/redis/go-redis/v9/internal/pool"
2020
"github.com/redis/go-redis/v9/internal/proto"
2121
"github.com/redis/go-redis/v9/internal/rand"
22+
"github.com/redis/go-redis/v9/internal/routing"
2223
)
2324

2425
const (
@@ -912,10 +913,11 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er
912913
// or more underlying connections. It's safe for concurrent use by
913914
// multiple goroutines.
914915
type ClusterClient struct {
915-
opt *ClusterOptions
916-
nodes *clusterNodes
917-
state *clusterStateHolder
918-
cmdsInfoCache *cmdsInfoCache
916+
opt *ClusterOptions
917+
nodes *clusterNodes
918+
state *clusterStateHolder
919+
cmdsInfoCache *cmdsInfoCache
920+
commandPolicies map[string]routing.CommandPolicy
919921
cmdable
920922
hooksMixin
921923
}
@@ -932,8 +934,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
932934

933935
c.state = newClusterStateHolder(c.loadState)
934936
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
937+
c.commandPolicies = newCommandPolicies(c.cmdsInfoCache.cmds)
935938
c.cmdable = c.Process
936-
937939
c.initHooks(hooks{
938940
dial: nil,
939941
process: c.process,

0 commit comments

Comments
 (0)