Skip to content

Commit 5e492e3

Browse files
committed
Add Routing Policies Comprehensive Test Suite and Fix multi keyed aggregation for different step
1 parent e083cde commit 5e492e3

File tree

3 files changed

+380
-68
lines changed

3 files changed

+380
-68
lines changed

command.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ type Cmder interface {
201201
stringArg(int) string
202202
firstKeyPos() int8
203203
SetFirstKeyPos(int8)
204+
stepCount() int8
205+
SetStepCount(int8)
204206

205207
readTimeout() *time.Duration
206208
readReply(rd *proto.Reader) error
@@ -303,6 +305,7 @@ type baseCmd struct {
303305
args []interface{}
304306
err error
305307
keyPos int8
308+
_stepCount int8
306309
rawVal interface{}
307310
_readTimeout *time.Duration
308311
cmdType CmdType
@@ -361,6 +364,14 @@ func (cmd *baseCmd) SetFirstKeyPos(keyPos int8) {
361364
cmd.keyPos = keyPos
362365
}
363366

367+
func (cmd *baseCmd) stepCount() int8 {
368+
return cmd._stepCount
369+
}
370+
371+
func (cmd *baseCmd) SetStepCount(stepCount int8) {
372+
cmd._stepCount = stepCount
373+
}
374+
364375
func (cmd *baseCmd) SetErr(e error) {
365376
cmd.err = e
366377
}
@@ -402,6 +413,7 @@ func (cmd *baseCmd) cloneBaseCmd() baseCmd {
402413
args: args,
403414
err: cmd.err,
404415
keyPos: cmd.keyPos,
416+
_stepCount: cmd._stepCount,
405417
rawVal: cmd.rawVal,
406418
_readTimeout: readTimeout,
407419
cmdType: cmd.cmdType,

osscluster_router.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *cluste
3434
policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd)
3535
}
3636

37+
// Set stepCount from cmdInfo if not already set
38+
if cmd.stepCount() == 0 {
39+
if cmdInfo := c.cmdInfo(ctx, cmd.Name()); cmdInfo != nil && cmdInfo.StepCount > 0 {
40+
cmd.SetStepCount(cmdInfo.StepCount)
41+
}
42+
}
43+
3744
if policy == nil {
3845
return c.executeDefault(ctx, cmd, policy, node)
3946
}
@@ -104,6 +111,10 @@ func (c *ClusterClient) executeOnAllShards(ctx context.Context, cmd Cmder, polic
104111
func (c *ClusterClient) executeMultiShard(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy) error {
105112
args := cmd.Args()
106113
firstKeyPos := int(cmdFirstKeyPos(cmd))
114+
stepCount := int(cmd.stepCount())
115+
if stepCount == 0 {
116+
stepCount = 1 // Default to 1 if not set
117+
}
107118

108119
if firstKeyPos == 0 || firstKeyPos >= len(args) {
109120
return fmt.Errorf("redis: multi-shard command %s has no key arguments", cmd.Name())
@@ -113,14 +124,20 @@ func (c *ClusterClient) executeMultiShard(ctx context.Context, cmd Cmder, policy
113124
slotMap := make(map[int][]string)
114125
keyOrder := make([]string, 0)
115126

116-
for i := firstKeyPos; i < len(args); i++ {
127+
for i := firstKeyPos; i < len(args); i += stepCount {
117128
key, ok := args[i].(string)
118129
if !ok {
119130
return fmt.Errorf("redis: non-string key at position %d: %v", i, args[i])
120131
}
121132

122133
slot := hashtag.Slot(key)
123134
slotMap[slot] = append(slotMap[slot], key)
135+
for j := 1; j < stepCount; j++ {
136+
if i+j >= len(args) {
137+
break
138+
}
139+
slotMap[slot] = append(slotMap[slot], args[i+j].(string))
140+
}
124141
keyOrder = append(keyOrder, key)
125142
}
126143

@@ -371,6 +388,7 @@ func (c *ClusterClient) aggregateMultiSlotResults(ctx context.Context, cmd Cmder
371388
}
372389
}
373390

391+
// TODO: return multiple errors by order when we will implement multiple errors returning
374392
if result.err != nil {
375393
firstErr = result.err
376394
}

0 commit comments

Comments
 (0)