Skip to content

Commit d6fc855

Browse files
authored
feat: prevent a cross slot error by using pipelining when calling a command with multiple keys (#356)
1 parent b272f19 commit d6fc855

File tree

4 files changed

+67
-6
lines changed

4 files changed

+67
-6
lines changed

lib/redis_client/cluster/command.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ def exists?(name)
9797
@commands.key?(::RedisClient::Cluster::NormalizedCmdName.instance.get_by_name(name))
9898
end
9999

100+
def determine_key_step(command)
101+
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
102+
# Some commands like EVALSHA have zero as the step in COMMANDS somehow.
103+
@commands[name].key_step == 0 ? 1 : @commands[name].key_step
104+
end
105+
100106
private
101107

102108
def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity
@@ -143,12 +149,6 @@ def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/A
143149
end
144150
end
145151

146-
def determine_key_step(command)
147-
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
148-
# Some commands like EVALSHA have zero as the step in COMMANDS somehow.
149-
@commands[name].key_step == 0 ? 1 : @commands[name].key_step
150-
end
151-
152152
def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
153153
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
154154
idx.nil? ? 0 : idx + 1

lib/redis_client/cluster/router.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,19 @@
1010
require 'redis_client/cluster/normalized_cmd_name'
1111
require 'redis_client/cluster/transaction'
1212
require 'redis_client/cluster/optimistic_locking'
13+
require 'redis_client/cluster/pipeline'
1314
require 'redis_client/cluster/error_identification'
1415

1516
class RedisClient
1617
class Cluster
1718
class Router
1819
ZERO_CURSOR_FOR_SCAN = '0'
1920
TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry
21+
MULTIPLE_KEYS_COMMAND_TO_PIPELINE = {
22+
'mset' => 'set',
23+
'mget' => 'get',
24+
'del' => 'del'
25+
}.freeze
2026

2127
def initialize(config, concurrent_worker, pool: nil, **kwargs)
2228
@config = config.dup
@@ -48,6 +54,8 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
4854
when 'script' then send_script_command(method, command, args, &block)
4955
when 'pubsub' then send_pubsub_command(method, command, args, &block)
5056
when 'watch' then send_watch_command(command, &block)
57+
when 'mset', 'mget', 'del'
58+
send_multiple_keys_command(cmd, method, command, args, &block)
5159
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
5260
@node.call_all(method, command, args).first.then(&TSF.call(block))
5361
when 'flushall', 'flushdb'
@@ -326,6 +334,25 @@ def send_watch_command(command)
326334
end
327335
end
328336

337+
def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
338+
if command.size < 3 || !::RedisClient::Cluster::KeySlotConverter.extract_hash_tag(command[1]).empty? # rubocop:disable Style/IfUnlessModifier
339+
return try_send(assign_node(command), method, command, args, &block)
340+
end
341+
342+
single_key_cmd = MULTIPLE_KEYS_COMMAND_TO_PIPELINE[cmd]
343+
key_step = @command.determine_key_step(cmd)
344+
seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
345+
pipeline = ::RedisClient::Cluster::Pipeline.new(self, @command_builder, @concurrent_worker, exception: true, seed: seed)
346+
command[1..].each_slice(key_step) { |*v| pipeline.call(single_key_cmd, *v) }
347+
replies = pipeline.execute
348+
result = case cmd
349+
when 'mset' then replies.first
350+
when 'del' then replies.sum
351+
else replies
352+
end
353+
block_given? ? yield(result) : result
354+
end
355+
329356
def update_cluster_info!
330357
@node.reload!
331358
end

test/redis_client/cluster/test_command.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ def test_exists?
147147
end
148148
end
149149

150+
def test_determine_key_step
151+
cmd = ::RedisClient::Cluster::Command.load(@raw_clients)
152+
[
153+
{ name: 'MSET', want: 2 },
154+
{ name: 'MGET', want: 1 },
155+
{ name: 'DEL', want: 1 },
156+
{ name: 'EVALSHA', want: 1 }
157+
].each_with_index do |c, idx|
158+
msg = "Case: #{idx}"
159+
got = cmd.determine_key_step(c[:name])
160+
assert_equal(c[:want], got, msg)
161+
end
162+
end
163+
150164
def test_determine_first_key_position
151165
cmd = ::RedisClient::Cluster::Command.load(@raw_clients)
152166
[

test/redis_client/test_cluster.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,26 @@ def test_with_method
686686
assert_raises(NotImplementedError) { @client.with }
687687
end
688688

689+
def test_dedicated_multiple_keys_command
690+
[
691+
{ command: %w[MSET key1 val1], want: 'OK' },
692+
{ command: %w[MGET key1], want: %w[val1] },
693+
{ command: %w[DEL key1], want: 1 },
694+
{ command: %w[MSET {key}1 val1 {key}2 val2], want: 'OK' },
695+
{ command: %w[MGET {key}1 {key}2], want: %w[val1 val2] },
696+
{ command: %w[DEL {key}1 {key}2], want: 2 },
697+
{ command: %w[MSET key1 val1 key2 val2], want: 'OK' },
698+
{ command: %w[MGET key1 key2], want: %w[val1 val2] },
699+
{ command: %w[DEL key1 key2], want: 2 },
700+
{ command: %w[MSET key1 val1 key2 val2], block: ->(r) { "#{r}!" }, want: 'OK!' },
701+
{ command: %w[MGET key1 key2], block: ->(r) { r.map { |e| "#{e}!" } }, want: %w[val1! val2!] },
702+
{ command: %w[DEL key1 key2], block: ->(r) { r == 2 }, want: true }
703+
].each_with_index do |c, i|
704+
block = c.key?(:block) ? c.fetch(:block) : nil
705+
assert_equal(c.fetch(:want), @client.call_v(c.fetch(:command), &block), i + 1)
706+
end
707+
end
708+
689709
def test_dedicated_commands # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
690710
10.times { |i| @client.call('SET', "key#{i}", i) }
691711
wait_for_replication

0 commit comments

Comments
 (0)