Skip to content

Commit a3dd764

Browse files
authored
Fix multi thread error handling and resharding test cases (#41)
1 parent aa9013b commit a3dd764

File tree

6 files changed

+69
-45
lines changed

6 files changed

+69
-45
lines changed

lib/redis_client/cluster.rb

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ def empty?
4040
@size.zero?
4141
end
4242

43-
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37
44-
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
43+
# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
44+
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
4545
all_replies = Array.new(@size)
46+
errors = {}
4647
threads = @grouped.map do |k, v|
4748
Thread.new(@client, k, v) do |client, node_key, rows|
4849
Thread.pass
@@ -60,11 +61,15 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
6061
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size
6162

6263
rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
64+
rescue StandardError => e
65+
errors[node_key] = e
6366
end
6467
end
6568

6669
threads.each(&:join)
67-
all_replies
70+
return all_replies if errors.empty?
71+
72+
raise ::RedisClient::Cluster::ErrorCollection, errors
6873
end
6974
end
7075

@@ -185,10 +190,10 @@ def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/A
185190
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save', 'ping'
186191
@node.call_all(method, *command, **kwargs, &block).first
187192
when 'flushall', 'flushdb'
188-
@node.call_primary(method, *command, **kwargs, &block).first
193+
@node.call_primaries(method, *command, **kwargs, &block).first
189194
when 'wait' then send_wait_command(method, *command, **kwargs, &block)
190-
when 'keys' then @node.call_replica(method, *command, **kwargs, &block).flatten.sort
191-
when 'dbsize' then @node.call_replica(method, *command, **kwargs, &block).sum
195+
when 'keys' then @node.call_replicas(method, *command, **kwargs, &block).flatten.sort
196+
when 'dbsize' then @node.call_replicas(method, *command, **kwargs, &block).sum
192197
when 'scan' then _scan(*command, **kwargs)
193198
when 'lastsave' then @node.call_all(method, *command, **kwargs, &block).sort
194199
when 'role' then @node.call_all(method, *command, **kwargs, &block)
@@ -206,14 +211,14 @@ def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/A
206211
node = assign_node(*command)
207212
try_send(node, method, *command, **kwargs, &block)
208213
end
209-
rescue RedisClient::Cluster::CommandErrorCollection => e
214+
rescue RedisClient::Cluster::ErrorCollection => e
210215
update_cluster_info! if e.errors.values.map(&:class).any?(::RedisClient::ConnectionError)
211216
raise
212217
end
213218

214219
def send_wait_command(method, *command, retry_count: 3, **kwargs, &block)
215-
@node.call_primary(method, *command, **kwargs, &block).sum
216-
rescue RedisClient::Cluster::CommandErrorCollection => e
220+
@node.call_primaries(method, *command, **kwargs, &block).sum
221+
rescue RedisClient::Cluster::ErrorCollection => e
217222
raise if retry_count <= 0 || e.errors.values.map(&:message).grep(/ERR WAIT cannot be used with replica instances/).empty?
218223

219224
update_cluster_info!
@@ -246,13 +251,17 @@ def send_client_command(method, *command, **kwargs, &block)
246251
end
247252
end
248253

249-
def send_cluster_command(method, *command, **kwargs, &block)
254+
def send_cluster_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/MethodLength
250255
subcommand = command[1].to_s.downcase
251256
case subcommand
252257
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
253258
'reset', 'set-config-epoch', 'setslot'
254259
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
255260
when 'saveconfig' then @node.call_all(method, *command, **kwargs, &block).first
261+
when 'getkeysinslot'
262+
raise ArgumentError, command.join(' ') if command.size != 4
263+
264+
find_node(@node.find_node_key_of_replica(command[2])).send(method, *command, **kwargs, &block)
256265
else assign_node(*command).send(method, *command, **kwargs, &block)
257266
end
258267
end
@@ -262,7 +271,7 @@ def send_script_command(method, *command, **kwargs, &block)
262271
when 'debug', 'kill'
263272
@node.call_all(method, *command, **kwargs, &block).first
264273
when 'flush', 'load'
265-
@node.call_primary(method, *command, **kwargs, &block).first
274+
@node.call_primaries(method, *command, **kwargs, &block).first
266275
else assign_node(*command).send(method, *command, **kwargs, &block)
267276
end
268277
end

lib/redis_client/cluster/errors.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def initialize(command)
2929
end
3030

3131
# Raised when error occurs on any node of cluster.
32-
class CommandErrorCollection < ::RedisClient::Error
32+
class ErrorCollection < ::RedisClient::Error
3333
attr_reader :errors
3434

3535
def initialize(errors)

lib/redis_client/cluster/node.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,16 @@ def call_all(method, *command, **kwargs, &block)
118118
try_map { |_, client| client.send(method, *command, **kwargs, &block) }.values
119119
end
120120

121-
def call_primary(method, *command, **kwargs, &block)
121+
def call_primaries(method, *command, **kwargs, &block)
122122
try_map do |node_key, client|
123123
next if replica?(node_key)
124124

125125
client.send(method, *command, **kwargs, &block)
126126
end.values
127127
end
128128

129-
def call_replica(method, *command, **kwargs, &block)
130-
return call_primary(method, *command, **kwargs, &block) if replica_disabled?
129+
def call_replicas(method, *command, **kwargs, &block)
130+
return call_primaries(method, *command, **kwargs, &block) if replica_disabled?
131131

132132
replica_node_keys = @replications.values.map(&:sample)
133133
try_map do |node_key, client|
@@ -228,22 +228,22 @@ def build_clients(options, pool: nil, **kwargs)
228228
end
229229

230230
def try_map # rubocop:disable Metrics/MethodLength
231-
errors = {}
232231
results = {}
232+
errors = {}
233233
threads = @clients.map do |k, v|
234234
Thread.new(k, v) do |node_key, client|
235235
Thread.pass
236236
reply = yield(node_key, client)
237237
results[node_key] = reply unless reply.nil?
238-
rescue ::RedisClient::CommandError => e
238+
rescue StandardError => e
239239
errors[node_key] = e
240240
end
241241
end
242242

243243
threads.each(&:join)
244244
return results if errors.empty?
245245

246-
raise ::RedisClient::Cluster::CommandErrorCollection, errors
246+
raise ::RedisClient::Cluster::ErrorCollection, errors
247247
end
248248
end
249249
end

test/redis_client/cluster/test_errors.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def test_orchestration_command_not_supported_error
4040
end
4141
end
4242

43-
def test_command_error_collection_error
43+
def test_error_collection_error
4444
[
4545
{
4646
errors: { '127.0.0.1:6379' => DummyError.new('foo') },
@@ -54,7 +54,7 @@ def test_command_error_collection_error
5454
{ errors: '', want: { msg: '', size: 0 } },
5555
{ errors: nil, want: { msg: '', size: 0 } }
5656
].each_with_index do |c, idx|
57-
raise ::RedisClient::Cluster::CommandErrorCollection, c[:errors]
57+
raise ::RedisClient::Cluster::ErrorCollection, c[:errors]
5858
rescue StandardError => e
5959
assert_equal(c[:want][:msg], e.message, "Case: #{idx}")
6060
assert_equal(c[:want][:size], e.errors.size, "Case: #{idx}")

test/redis_client/cluster/test_node.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,19 +220,19 @@ def test_call_all
220220
assert_equal(want, got, 'Case: scale read')
221221
end
222222

223-
def test_call_primary
223+
def test_call_primaries
224224
want = (1..(@test_node_info.count { |info| info[:role] == 'master' })).map { |_| 'PONG' }
225-
got = @test_node.call_primary(:call, 'PING')
225+
got = @test_node.call_primaries(:call, 'PING')
226226
assert_equal(want, got)
227227
end
228228

229-
def test_call_replica
229+
def test_call_replicas
230230
want = (1..(@test_node_info.count { |info| info[:role] == 'master' })).map { |_| 'PONG' }
231231

232-
got = @test_node.call_replica(:call, 'PING')
232+
got = @test_node.call_replicas(:call, 'PING')
233233
assert_equal(want, got, 'Case: primary only')
234234

235-
got = @test_node_with_scale_read.call_replica(:call, 'PING')
235+
got = @test_node_with_scale_read.call_replicas(:call, 'PING')
236236
assert_equal(want, got, 'Case: scale read')
237237
end
238238

@@ -408,7 +408,7 @@ def test_try_map
408408
primary_node_keys = @test_node_info.select { |info| info[:role] == 'master' }.map { |info| info[:node_key] }
409409
[
410410
{ block: ->(_, client) { client.call('PING') }, want: primary_node_keys.to_h { |k| [k, 'PONG'] } },
411-
{ block: ->(_, client) { client.call('UNKNOWN') }, error: ::RedisClient::Cluster::CommandErrorCollection }
411+
{ block: ->(_, client) { client.call('UNKNOWN') }, error: ::RedisClient::Cluster::ErrorCollection }
412412
].each_with_index do |c, idx|
413413
msg = "Case: #{idx}"
414414
got = -> { @test_node.send(:try_map, &c[:block]) }

test/test_against_cluster_state.rb

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,29 @@ def test_the_state_of_cluster_failover
3737
end
3838

3939
def test_the_state_of_cluster_resharding
40-
@client.pipelined { |pipeline| 10_000.times { |i| pipeline.call('SET', "{key}#{i}", i) } }
41-
wait_for_replication
42-
43-
slot = SLOT_SIZE.times.max_by { |i| @client.call('CLUSTER', 'COUNTKEYSINSLOT', i) }
44-
src = @client.instance_variable_get(:@node).find_node_key_of_primary(slot)
45-
dest = @client.instance_variable_get(:@node).primary_node_keys.reject { |k| k == src }.sample
46-
47-
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
48-
10_000.times { |i| assert_equal(i.to_s, @client.call('GET', "{key}#{i}"), "Case: GET: #{i}") }
49-
@controller.finish_resharding(slot: slot, dest_node_key: dest)
40+
do_resharding_test do |keys|
41+
keys.each do |key|
42+
want = key
43+
got = @client.call('GET', key)
44+
assert_equal(want, got, "Case: GET: #{key}")
45+
end
46+
end
5047
end
5148

5249
def test_the_state_of_cluster_resharding_with_pipelining
5350
skip('TODO: https://github.com/redis-rb/redis-cluster-client/issues/37')
54-
@client.pipelined { |pipeline| 100_000.times { |i| pipeline.call('SET', "{key}#{i}", i) } }
55-
56-
slot = SLOT_SIZE.times.max_by { |i| @client.call('CLUSTER', 'COUNTKEYSINSLOT', i) }
57-
src = @client.instance_variable_get(:@node).find_node_key_of_primary(slot)
58-
dest = @client.instance_variable_get(:@node).primary_node_keys.reject { |k| k == src }.sample
5951

60-
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
61-
values = @client.pipelined { |pipeline| 100_000.times { |i| pipeline.call('GET', "{key}#{i}") } }
62-
100_000.times { |i| assert_equal(i.to_s, values[i], "Case: GET: #{i}") }
63-
@controller.finish_resharding(slot: slot, dest_node_key: dest)
52+
do_resharding_test do |keys|
53+
values = @client.pipelined do |pipeline|
54+
keys.each { |key| pipeline.call('GET', key) }
55+
end
56+
57+
keys.each_with_index do |key, i|
58+
want = key
59+
got = values[i]
60+
assert_equal(want, got, "Case: GET: #{key}")
61+
end
62+
end
6463
end
6564

6665
private
@@ -72,6 +71,22 @@ def wait_for_replication
7271
def fetch_cluster_info(key)
7372
@client.call('CLUSTER', 'INFO').split("\r\n").to_h { |v| v.split(':') }.fetch(key)
7473
end
74+
75+
def do_resharding_test(number_of_keys: 1000)
76+
@client.pipelined { |pipeline| number_of_keys.times { |i| pipeline.call('SET', "key#{i}", "key#{i}") } }
77+
wait_for_replication
78+
count, slot = @client.pipelined { |pi| SLOT_SIZE.times { |i| pi.call('CLUSTER', 'COUNTKEYSINSLOT', i) } }
79+
.each_with_index.max_by { |c, _| c }
80+
refute_equal(0, count)
81+
keys = @client.call('CLUSTER', 'GETKEYSINSLOT', slot, count)
82+
refute_empty(keys)
83+
src = @client.instance_variable_get(:@node).find_node_key_of_primary(slot)
84+
dest = @client.instance_variable_get(:@node).primary_node_keys.reject { |k| k == src }.sample
85+
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
86+
wait_for_replication
87+
yield(keys)
88+
@controller.finish_resharding(slot: slot, dest_node_key: dest)
89+
end
7590
end
7691

7792
class PrimaryOnly < TestingWrapper

0 commit comments

Comments
 (0)