-
Notifications
You must be signed in to change notification settings - Fork 224
feat : Implement slot-based MGET batching for cluster clients #908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
e226605
c79902d
8657b76
7e9862a
39a9e9f
60a8fd6
2ee5f55
76f0fd9
cf97d42
7e35f12
e4d6b50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,10 @@ import ( | |
| intl "github.com/redis/rueidis/internal/cmds" | ||
| ) | ||
|
|
||
| func slot(key string) uint16 { | ||
| return intl.Slot(key) | ||
| } | ||
|
|
||
| // MGetCache is a helper that consults the client-side caches with multiple keys by grouping keys within the same slot into multiple GETs | ||
| func MGetCache(client Client, ctx context.Context, ttl time.Duration, keys []string) (ret map[string]RedisMessage, err error) { | ||
| if len(keys) == 0 { | ||
|
|
@@ -50,12 +54,7 @@ func MGet(client Client, ctx context.Context, keys []string) (ret map[string]Red | |
| return clientMGet(client, ctx, client.B().Mget().Key(keys...).Build(), keys) | ||
| } | ||
|
|
||
| cmds := mgetcmdsp.Get(len(keys), len(keys)) | ||
| defer mgetcmdsp.Put(cmds) | ||
| for i := range cmds.s { | ||
| cmds.s[i] = client.B().Get().Key(keys[i]).Build() | ||
| } | ||
| return doMultiGet(client, ctx, cmds.s, keys) | ||
| return clusterMGet(client, ctx, keys) | ||
| } | ||
|
|
||
| // MSet is a helper that consults the redis directly with multiple keys by grouping keys within the same slot into MSETs or multiple SETs | ||
|
|
@@ -139,12 +138,7 @@ func JsonMGet(client Client, ctx context.Context, keys []string, path string) (r | |
| return clientMGet(client, ctx, client.B().JsonMget().Key(keys...).Path(path).Build(), keys) | ||
| } | ||
|
|
||
| cmds := mgetcmdsp.Get(len(keys), len(keys)) | ||
| defer mgetcmdsp.Put(cmds) | ||
| for i := range cmds.s { | ||
| cmds.s[i] = client.B().JsonGet().Key(keys[i]).Path(path).Build() | ||
| } | ||
| return doMultiGet(client, ctx, cmds.s, keys) | ||
| return clusterJsonMGet(client, ctx, keys, path) | ||
| } | ||
|
|
||
| // JsonMSet is a helper that consults redis directly with multiple keys by grouping keys within the same slot into JSON.MSETs or multiple JSON.SETs | ||
|
|
@@ -277,6 +271,75 @@ func arrayToKV(m map[string]RedisMessage, arr []RedisMessage, keys []string) map | |
| return m | ||
| } | ||
|
|
||
| func clusterMGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) { | ||
| ret = make(map[string]RedisMessage, len(keys)) | ||
| slots := make(map[uint16][]int, len(keys)/2) | ||
|
||
| for i, key := range keys { | ||
| s := slot(key) | ||
| slots[s] = append(slots[s], i) | ||
| } | ||
| cmds := mgetcmdsp.Get(0, len(slots)) | ||
| defer mgetcmdsp.Put(cmds) | ||
| groups := make([][]string, 0, len(slots)) | ||
| for _, group := range slots { | ||
| gkeys := make([]string, 0, len(group)) | ||
| for _, i := range group { | ||
| gkeys = append(gkeys, keys[i]) | ||
| } | ||
| cmds.s = append(cmds.s, client.B().Mget().Key(gkeys...).Build().Pin()) | ||
| groups = append(groups, gkeys) | ||
| } | ||
| resps := client.DoMulti(ctx, cmds.s...) | ||
| defer resultsp.Put(&redisresults{s: resps}) | ||
| for i, resp := range resps { | ||
| arr, err := resp.ToArray() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| ret = arrayToKV(ret, arr, groups[i]) | ||
| } | ||
| for i := range cmds.s { | ||
| intl.PutCompletedForce(cmds.s[i]) | ||
| } | ||
| return ret, nil | ||
| } | ||
|
|
||
| func clusterJsonMGet(client Client, ctx context.Context, keys []string, path string) (ret map[string]RedisMessage, err error) { | ||
| ret = make(map[string]RedisMessage, len(keys)) | ||
| slots := make(map[uint16][]int, len(keys)/2) | ||
| for i, key := range keys { | ||
| s := slot(key) | ||
| slots[s] = append(slots[s], i) | ||
| } | ||
| if len(slots) == 0 { | ||
| return ret, nil | ||
| } | ||
| cmds := mgetcmdsp.Get(0, len(slots)) | ||
| defer mgetcmdsp.Put(cmds) | ||
| groups := make([][]string, 0, len(slots)) | ||
| for _, group := range slots { | ||
| gkeys := make([]string, 0, len(group)) | ||
| for _, i := range group { | ||
| gkeys = append(gkeys, keys[i]) | ||
| } | ||
| cmds.s = append(cmds.s, client.B().JsonMget().Key(gkeys...).Path(path).Build().Pin()) | ||
| groups = append(groups, gkeys) | ||
| } | ||
| resps := client.DoMulti(ctx, cmds.s...) | ||
| defer resultsp.Put(&redisresults{s: resps}) | ||
| for i, resp := range resps { | ||
| arr, err := resp.ToArray() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| ret = arrayToKV(ret, arr, groups[i]) | ||
| } | ||
| for i := range cmds.s { | ||
| intl.PutCompletedForce(cmds.s[i]) | ||
| } | ||
| return ret, nil | ||
| } | ||
|
|
||
| // ErrMSetNXNotSet is used in the MSetNX helper when the underlying MSETNX response is 0. | ||
| // Ref: https://redis.io/commands/msetnx/ | ||
| var ErrMSetNXNotSet = errors.New("MSETNX: no key was set") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.