From e2266052f2266947987913649f1e9c5b8b5e377c Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Fri, 3 Oct 2025 13:41:20 +0000 Subject: [PATCH] Initial plan Implement slot-based MGET batching for cluster clients Co-authored-by: SoulPancake <70265851+SoulPancake@users.noreply.github.com> --- helper.go | 74 ++++++++++++++++++++++++++++++++++++++++++-------- helper_test.go | 70 +++++++++++++++++++++++++++++------------------ 2 files changed, 105 insertions(+), 39 deletions(-) diff --git a/helper.go b/helper.go index 333eb7d9..2791fa00 100644 --- a/helper.go +++ b/helper.go @@ -50,12 +50,7 @@ func MGet(client Client, ctx context.Context, keys []string) (ret map[string]Red return clientMGet(client, ctx, client.B().Mget().Key(keys...).Build(), keys) } - cmds := mgetcmdsp.Get(len(keys), len(keys)) - defer mgetcmdsp.Put(cmds) - for i := range cmds.s { - cmds.s[i] = client.B().Get().Key(keys[i]).Build() - } - return doMultiGet(client, ctx, cmds.s, keys) + return clusterMGet(client, ctx, keys) } // MSet is a helper that consults the redis directly with multiple keys by grouping keys within the same slot into MSETs or multiple SETs @@ -139,12 +134,7 @@ func JsonMGet(client Client, ctx context.Context, keys []string, path string) (r return clientMGet(client, ctx, client.B().JsonMget().Key(keys...).Path(path).Build(), keys) } - cmds := mgetcmdsp.Get(len(keys), len(keys)) - defer mgetcmdsp.Put(cmds) - for i := range cmds.s { - cmds.s[i] = client.B().JsonGet().Key(keys[i]).Path(path).Build() - } - return doMultiGet(client, ctx, cmds.s, keys) + return clusterJsonMGet(client, ctx, keys, path) } // JsonMSet is a helper that consults redis directly with multiple keys by grouping keys within the same slot into JSON.MSETs or multiple JSON.SETs @@ -277,6 +267,66 @@ func arrayToKV(m map[string]RedisMessage, arr []RedisMessage, keys []string) map return m } +func clusterMGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) { + ret = make(map[string]RedisMessage, len(keys)) + slotCmds := intl.MGets(keys) + if len(slotCmds) == 0 { + return ret, nil + } + cmds := make([]Completed, 0, len(slotCmds)) + for _, cmd := range slotCmds { + cmds = append(cmds, cmd.Pin()) + } + resps := client.DoMulti(ctx, cmds...) + defer resultsp.Put(&redisresults{s: resps}) + for i, resp := range resps { + if err := resp.NonRedisError(); err != nil { + return nil, err + } + arr, err := resp.ToArray() + if err != nil { + return nil, err + } + commands := cmds[i].Commands() + cmdKeys := commands[1:] + ret = arrayToKV(ret, arr, cmdKeys) + } + for _, cmd := range cmds { + intl.PutCompletedForce(cmd) + } + return ret, nil +} + +func clusterJsonMGet(client Client, ctx context.Context, keys []string, path string) (ret map[string]RedisMessage, err error) { + ret = make(map[string]RedisMessage, len(keys)) + slotCmds := intl.JsonMGets(keys, path) + if len(slotCmds) == 0 { + return ret, nil + } + cmds := make([]Completed, 0, len(slotCmds)) + for _, cmd := range slotCmds { + cmds = append(cmds, cmd.Pin()) + } + resps := client.DoMulti(ctx, cmds...) + defer resultsp.Put(&redisresults{s: resps}) + for i, resp := range resps { + if err := resp.NonRedisError(); err != nil { + return nil, err + } + arr, err := resp.ToArray() + if err != nil { + return nil, err + } + commands := cmds[i].Commands() + cmdKeys := commands[1 : len(commands)-1] + ret = arrayToKV(ret, arr, cmdKeys) + } + for _, cmd := range cmds { + intl.PutCompletedForce(cmd) + } + return ret, nil +} + // ErrMSetNXNotSet is used in the MSetNX helper when the underlying MSETNX response is 0. // Ref: https://redis.io/commands/msetnx/ var ErrMSetNXNotSet = errors.New("MSETNX: no key was set") diff --git a/helper_test.go b/helper_test.go index f23bdb06..e3c6945c 100644 --- a/helper_test.go +++ b/helper_test.go @@ -179,18 +179,22 @@ func TestMGetCache(t *testing.T) { t.Fatalf("unexpected err %v", err) } t.Run("Delegate DisabledCache DoCache", func(t *testing.T) { - keys := make([]string, 100) - for i := range keys { - keys[i] = strconv.Itoa(i) - } + keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"} m.DoMultiFn = func(cmd ...Completed) *redisresults { result := make([]RedisResult, len(cmd)) - for i, key := range keys { - if !reflect.DeepEqual(cmd[i].Commands(), []string{"GET", key}) { - t.Fatalf("unexpected command %v", cmd) + for i, c := range cmd { + // Each command should be MGET with keys from the same slot + commands := c.Commands() + if commands[0] != "MGET" { + t.Fatalf("expected MGET command, got %v", commands) return nil } - result[i] = newResult(strmsg('+', key), nil) + // Build response array with values matching the keys + values := make([]RedisMessage, len(commands)-1) + for j := 1; j < len(commands); j++ { + values[j-1] = strmsg('+', commands[j]) + } + result[i] = newResult(slicemsg('*', values), nil) } return &redisresults{s: result} } @@ -200,7 +204,7 @@ func TestMGetCache(t *testing.T) { } for _, key := range keys { if vKey, ok := v[key]; !ok || vKey.string() != key { - t.Fatalf("unexpected response %v", v) + t.Fatalf("unexpected response for key %s: %v", key, v) } } }) @@ -358,18 +362,22 @@ func TestMGet(t *testing.T) { t.Fatalf("unexpected err %v", err) } t.Run("Delegate Do", func(t *testing.T) { - keys := make([]string, 100) - for i := range keys { - keys[i] = strconv.Itoa(i) - } + keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"} m.DoMultiFn = func(cmd ...Completed) *redisresults { result := make([]RedisResult, len(cmd)) - for i, key := range keys { - if !reflect.DeepEqual(cmd[i].Commands(), []string{"GET", key}) { - t.Fatalf("unexpected command %v", cmd) + for i, c := range cmd { + // Each command should be MGET with keys from the same slot + commands := c.Commands() + if commands[0] != "MGET" { + t.Fatalf("expected MGET command, got %v", commands) return nil } - result[i] = newResult(strmsg('+', key), nil) + // Build response array with values matching the keys + values := make([]RedisMessage, len(commands)-1) + for j := 1; j < len(commands); j++ { + values[j-1] = strmsg('+', commands[j]) + } + result[i] = newResult(slicemsg('*', values), nil) } return &redisresults{s: result} } @@ -379,7 +387,7 @@ func TestMGet(t *testing.T) { } for _, key := range keys { if vKey, ok := v[key]; !ok || vKey.string() != key { - t.Fatalf("unexpected response %v", v) + t.Fatalf("unexpected response for key %s: %v", key, v) } } }) @@ -1162,18 +1170,26 @@ func TestJsonMGet(t *testing.T) { t.Fatalf("unexpected err %v", err) } t.Run("Delegate Do", func(t *testing.T) { - keys := make([]string, 100) - for i := range keys { - keys[i] = strconv.Itoa(i) - } + keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"} m.DoMultiFn = func(cmd ...Completed) *redisresults { result := make([]RedisResult, len(cmd)) - for i, key := range keys { - if !reflect.DeepEqual(cmd[i].Commands(), []string{"JSON.GET", key, "$"}) { - t.Fatalf("unexpected command %v", cmd) + for i, c := range cmd { + // Each command should be JSON.MGET with keys from the same slot and path at the end + commands := c.Commands() + if commands[0] != "JSON.MGET" { + t.Fatalf("expected JSON.MGET command, got %v", commands) return nil } - result[i] = newResult(strmsg('+', key), nil) + if commands[len(commands)-1] != "$" { + t.Fatalf("expected $ as last parameter, got %v", commands) + return nil + } + // Build response array with values matching the keys (exclude the path) + values := make([]RedisMessage, len(commands)-2) + for j := 1; j < len(commands)-1; j++ { + values[j-1] = strmsg('+', commands[j]) + } + result[i] = newResult(slicemsg('*', values), nil) } return &redisresults{s: result} } @@ -1183,7 +1199,7 @@ func TestJsonMGet(t *testing.T) { } for _, key := range keys { if vKey, ok := v[key]; !ok || vKey.string() != key { - t.Fatalf("unexpected response %v", v) + t.Fatalf("unexpected response for key %s: %v", key, v) } } })