Skip to content

Commit c7ac65d

Browse files
KJTsanaktsidisKJ Tsanaktsidis
authored andcommitted
Implement RedisClient::Cluster::Command#extract_all_keys
This is required to validate that all the keys passed in to a pinned connection are consistent with the key passed in to `#with`. Technically speaking, we _could_ let the Redis server just reject keys that are cross-node. However, there _is_ a good reason to perform the validation in the client. Redis clusters have 16,000 odd slots, but usually only a handful of nodes. It's quite possible you might create a transaction which operates across slots, but have it work fine because the slots just happen to hash to the same node. However, if a resharding event then happens, suddednly your working-fine code will break! It's nice for users to receive feedback straight away, even on very small development setups, that their cross-slot transactions may well not actually work later on.
1 parent 4f578b8 commit c7ac65d

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

lib/redis_client/cluster/command.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ class Cluster
1010
class Command
1111
EMPTY_STRING = ''
1212
EMPTY_HASH = {}.freeze
13+
EMPTY_ARRAY = [].freeze
1314

1415
Detail = Struct.new(
1516
'RedisCommand',
1617
:first_key_position,
18+
:last_key_position,
19+
:key_step,
1720
:write?,
1821
:readonly?,
1922
keyword_init: true
@@ -49,6 +52,8 @@ def parse_command_reply(rows)
4952

5053
acc[row[0].downcase] = ::RedisClient::Cluster::Command::Detail.new(
5154
first_key_position: row[3],
55+
last_key_position: row[4],
56+
key_step: row[5],
5257
write?: row[2].include?('write'),
5358
readonly?: row[2].include?('readonly')
5459
)
@@ -67,6 +72,17 @@ def extract_first_key(command)
6772
(command[i].is_a?(Array) ? command[i].flatten.first : command[i]).to_s
6873
end
6974

75+
def extract_all_keys(command)
76+
keys_start = determine_first_key_position(command)
77+
keys_end = determine_last_key_position(command, keys_start)
78+
keys_step = determine_key_step(command)
79+
return EMPTY_ARRAY if [keys_start, keys_end, keys_step].any?(&:zero?)
80+
81+
keys_end = [keys_end, command.size - 1].min
82+
# use .. inclusive range because keys_end is a valid index.
83+
(keys_start..keys_end).step(keys_step).map { |i| command[i] }
84+
end
85+
7086
def should_send_to_primary?(command)
7187
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
7288
@commands[name]&.write?
@@ -98,6 +114,41 @@ def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticCo
98114
end
99115
end
100116

117+
# IMPORTANT: this determines the last key position INCLUSIVE of the last key -
118+
# i.e. command[determine_last_key_position(command)] is a key.
119+
# This is in line with what Redis returns from COMMANDS.
120+
def determine_last_key_position(command, keys_start) # rubocop:disable Metrics/AbcSize
121+
case name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
122+
when 'eval', 'evalsha', 'zinterstore', 'zunionstore'
123+
# EVALSHA sha1 numkeys [key [key ...]] [arg [arg ...]]
124+
# ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
125+
command[2].to_i + 2
126+
when 'object', 'memory'
127+
# OBJECT [ENCODING | FREQ | IDLETIME | REFCOUNT] key
128+
# MEMORY USAGE key [SAMPLES count]
129+
keys_start
130+
when 'migrate'
131+
# MIGRATE host port <key | ""> destination-db timeout [COPY] [REPLACE] [AUTH password | AUTH2 username password] [KEYS key [key ...]]
132+
command[3].empty? ? (command.length - 1) : 3
133+
when 'xread', 'xreadgroup'
134+
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
135+
keys_start + ((command.length - keys_start) / 2) - 1
136+
else
137+
# If there is a fixed, non-variable number of keys, don't iterate past that.
138+
if @commands[name].last_key_position >= 0
139+
@commands[name].last_key_position
140+
else
141+
command.length + @commands[name].last_key_position
142+
end
143+
end
144+
end
145+
146+
def determine_key_step(command)
147+
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
148+
# Some commands like EVALSHA have zero as the step in COMMANDS somehow.
149+
@commands[name].key_step == 0 ? 1 : @commands[name].key_step
150+
end
151+
101152
def determine_optional_key_position(command, option_name) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
102153
idx = command&.flatten&.map(&:to_s)&.map(&:downcase)&.index(option_name&.downcase)
103154
idx.nil? ? 0 : idx + 1

test/redis_client/cluster/test_command.rb

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ def test_parse_command_reply
4949
[
5050
{
5151
rows: [
52-
['get', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]],
53-
['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, 1, 1, Set['@write', '@string', '@slow'], Set[], Set[], Set[]]
52+
['get', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]],
53+
['set', -3, Set['write', 'denyoom', 'movablekeys'], 1, -1, 2, Set['@write', '@string', '@slow'], Set[], Set[], Set[]]
5454
],
5555
want: {
56-
'get' => { first_key_position: 1, write?: false, readonly?: true },
57-
'set' => { first_key_position: 1, write?: true, readonly?: false }
56+
'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true },
57+
'set' => { first_key_position: 1, last_key_position: -1, key_step: 2, write?: true, readonly?: false }
5858
}
5959
},
6060
{
6161
rows: [
62-
['GET', 2, Set['readonly', 'fast'], 1, 1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]]
62+
['GET', 2, Set['readonly', 'fast'], 1, -1, 1, Set['@read', '@string', '@fast'], Set[], Set[], Set[]]
6363
],
6464
want: {
65-
'get' => { first_key_position: 1, write?: false, readonly?: true }
65+
'get' => { first_key_position: 1, last_key_position: -1, key_step: 1, write?: false, readonly?: true }
6666
}
6767
},
6868
{ rows: [[]], want: {} },
@@ -190,6 +190,37 @@ def test_determine_optional_key_position
190190
assert_equal(c[:want], got, msg)
191191
end
192192
end
193+
194+
def test_extract_all_keys
195+
cmd = ::RedisClient::Cluster::Command.load(@raw_clients)
196+
[
197+
{ command: ['EVAL', 'return ARGV[1]', '0', 'hello'], want: [] },
198+
{ command: ['EVAL', 'return ARGV[1]', '3', 'key1', 'key2', 'key3', 'arg1', 'arg2'], want: %w[key1 key2 key3] },
199+
{ command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: [] },
200+
{ command: %w[EVALSHA sha1 2 foo bar baz zap], want: %w[foo bar] },
201+
{ command: %w[MIGRATE host port key 0 5 COPY], want: %w[key] },
202+
{ command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1'], want: %w[key1] },
203+
{ command: ['MIGRATE', 'host', 'port', '', '0', '5', 'COPY', 'KEYS', 'key1', 'key2'], want: %w[key1 key2] },
204+
{ command: %w[ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] },
205+
{ command: %w[ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: %w[zset1 zset2] },
206+
{ command: %w[OBJECT HELP], want: [] },
207+
{ command: %w[MEMORY HELP], want: [] },
208+
{ command: %w[MEMORY USAGE key], want: %w[key] },
209+
{ command: %w[XREAD COUNT 2 STREAMS mystream writers 0-0 0-0], want: %w[mystream writers] },
210+
{ command: %w[XREADGROUP GROUP group consumer STREAMS key id], want: %w[key] },
211+
{ command: %w[SET foo 1], want: %w[foo] },
212+
{ command: %w[set foo 1], want: %w[foo] },
213+
{ command: [['SET'], 'foo', 1], want: %w[foo] },
214+
{ command: %w[GET foo], want: %w[foo] },
215+
{ command: %w[MGET foo bar baz], want: %w[foo bar baz] },
216+
{ command: %w[MSET foo val bar val baz val], want: %w[foo bar baz] },
217+
{ command: %w[BLPOP foo bar 0], want: %w[foo bar] }
218+
].each_with_index do |c, idx|
219+
msg = "Case: #{idx}"
220+
got = cmd.send(:extract_all_keys, c[:command])
221+
assert_equal(c[:want], got, msg)
222+
end
223+
end
193224
end
194225
end
195226
end

0 commit comments

Comments
 (0)