Skip to content

Commit 67686db

Browse files
authored
Fix blocking call bug and add node down test cases for primary node (#51)
1 parent cba2380 commit 67686db

File tree

5 files changed

+118
-84
lines changed

5 files changed

+118
-84
lines changed

lib/redis_client/cluster.rb

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ def call_once(*command, **kwargs)
127127
end
128128

129129
def blocking_call(timeout, *command, **kwargs)
130-
node = assign_node(*command)
131-
try_send(node, :blocking_call, timeout, *command, **kwargs)
130+
send_command(:blocking_call, timeout, *command, **kwargs)
132131
end
133132

134133
def scan(*args, **kwargs, &block)
@@ -186,41 +185,43 @@ def fetch_cluster_info!(config, pool: nil, **kwargs)
186185
node_info: node_info, pool: pool, with_replica: config.use_replica?, **kwargs)
187186
end
188187

189-
def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
188+
def send_command(method, *args, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
189+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
190+
190191
cmd = command.first.to_s.downcase
191192
case cmd
192193
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
193-
@node.call_all(method, *command, **kwargs, &block).first
194+
@node.call_all(method, *args, **kwargs, &block).first
194195
when 'flushall', 'flushdb'
195-
@node.call_primaries(method, *command, **kwargs, &block).first
196-
when 'ping' then @node.send_ping(method, *command, **kwargs, &block).first
197-
when 'wait' then send_wait_command(method, *command, **kwargs, &block)
198-
when 'keys' then @node.call_replicas(method, *command, **kwargs, &block).flatten.sort
199-
when 'dbsize' then @node.call_replicas(method, *command, **kwargs, &block).sum
196+
@node.call_primaries(method, *args, **kwargs, &block).first
197+
when 'ping' then @node.send_ping(method, *args, **kwargs, &block).first
198+
when 'wait' then send_wait_command(method, *args, **kwargs, &block)
199+
when 'keys' then @node.call_replicas(method, *args, **kwargs, &block).flatten.sort
200+
when 'dbsize' then @node.call_replicas(method, *args, **kwargs, &block).sum
200201
when 'scan' then _scan(*command, **kwargs)
201-
when 'lastsave' then @node.call_all(method, *command, **kwargs, &block).sort
202-
when 'role' then @node.call_all(method, *command, **kwargs, &block)
203-
when 'config' then send_config_command(method, *command, **kwargs, &block)
204-
when 'client' then send_client_command(method, *command, **kwargs, &block)
205-
when 'cluster' then send_cluster_command(method, *command, **kwargs, &block)
202+
when 'lastsave' then @node.call_all(method, *args, **kwargs, &block).sort
203+
when 'role' then @node.call_all(method, *args, **kwargs, &block)
204+
when 'config' then send_config_command(method, *args, **kwargs, &block)
205+
when 'client' then send_client_command(method, *args, **kwargs, &block)
206+
when 'cluster' then send_cluster_command(method, *args, **kwargs, &block)
206207
when 'readonly', 'readwrite', 'shutdown'
207208
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, cmd
208-
when 'memory' then send_memory_command(method, *command, **kwargs, &block)
209-
when 'script' then send_script_command(method, *command, **kwargs, &block)
210-
when 'pubsub' then send_pubsub_command(method, *command, **kwargs, &block)
209+
when 'memory' then send_memory_command(method, *args, **kwargs, &block)
210+
when 'script' then send_script_command(method, *args, **kwargs, &block)
211+
when 'pubsub' then send_pubsub_command(method, *args, **kwargs, &block)
211212
when 'discard', 'exec', 'multi', 'unwatch'
212213
raise ::RedisClient::Cluster::AmbiguousNodeError, cmd
213214
else
214215
node = assign_node(*command)
215-
try_send(node, method, *command, **kwargs, &block)
216+
try_send(node, method, *args, **kwargs, &block)
216217
end
217218
rescue RedisClient::Cluster::Node::ReloadNeeded
218219
update_cluster_info!
219220
raise ::RedisClient::Cluster::NodeMightBeDown
220221
end
221222

222-
def send_wait_command(method, *command, retry_count: 3, **kwargs, &block)
223-
@node.call_primaries(method, *command, **kwargs, &block).sum
223+
def send_wait_command(method, *args, retry_count: 3, **kwargs, &block)
224+
@node.call_primaries(method, *args, **kwargs, &block).sum
224225
rescue RedisClient::Cluster::ErrorCollection => e
225226
raise if retry_count <= 0
226227
raise if e.errors.values.none? do |err|
@@ -232,64 +233,76 @@ def send_wait_command(method, *command, retry_count: 3, **kwargs, &block)
232233
retry
233234
end
234235

235-
def send_config_command(method, *command, **kwargs, &block)
236+
def send_config_command(method, *args, **kwargs, &block)
237+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
238+
236239
case command[1].to_s.downcase
237240
when 'resetstat', 'rewrite', 'set'
238-
@node.call_all(method, *command, **kwargs, &block).first
239-
else assign_node(*command).send(method, *command, **kwargs, &block)
241+
@node.call_all(method, *args, **kwargs, &block).first
242+
else assign_node(*command).send(method, *args, **kwargs, &block)
240243
end
241244
end
242245

243-
def send_memory_command(method, *command, **kwargs, &block)
246+
def send_memory_command(method, *args, **kwargs, &block)
247+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
248+
244249
case command[1].to_s.downcase
245-
when 'stats' then @node.call_all(method, *command, **kwargs, &block)
246-
when 'purge' then @node.call_all(method, *command, **kwargs, &block).first
247-
else assign_node(*command).send(method, *command, **kwargs, &block)
250+
when 'stats' then @node.call_all(method, *args, **kwargs, &block)
251+
when 'purge' then @node.call_all(method, *args, **kwargs, &block).first
252+
else assign_node(*command).send(method, *args, **kwargs, &block)
248253
end
249254
end
250255

251-
def send_client_command(method, *command, **kwargs, &block)
256+
def send_client_command(method, *args, **kwargs, &block)
257+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
258+
252259
case command[1].to_s.downcase
253-
when 'list' then @node.call_all(method, *command, **kwargs, &block).flatten
260+
when 'list' then @node.call_all(method, *args, **kwargs, &block).flatten
254261
when 'pause', 'reply', 'setname'
255-
@node.call_all(method, *command, **kwargs, &block).first
256-
else assign_node(*command).send(method, *command, **kwargs, &block)
262+
@node.call_all(method, *args, **kwargs, &block).first
263+
else assign_node(*command).send(method, *args, **kwargs, &block)
257264
end
258265
end
259266

260-
def send_cluster_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/MethodLength
267+
def send_cluster_command(method, *args, **kwargs, &block) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
268+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
261269
subcommand = command[1].to_s.downcase
270+
262271
case subcommand
263272
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
264273
'reset', 'set-config-epoch', 'setslot'
265274
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
266-
when 'saveconfig' then @node.call_all(method, *command, **kwargs, &block).first
275+
when 'saveconfig' then @node.call_all(method, *args, **kwargs, &block).first
267276
when 'getkeysinslot'
268277
raise ArgumentError, command.join(' ') if command.size != 4
269278

270-
find_node(@node.find_node_key_of_replica(command[2])).send(method, *command, **kwargs, &block)
271-
else assign_node(*command).send(method, *command, **kwargs, &block)
279+
find_node(@node.find_node_key_of_replica(command[2])).send(method, *args, **kwargs, &block)
280+
else assign_node(*command).send(method, *args, **kwargs, &block)
272281
end
273282
end
274283

275-
def send_script_command(method, *command, **kwargs, &block)
284+
def send_script_command(method, *args, **kwargs, &block)
285+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
286+
276287
case command[1].to_s.downcase
277288
when 'debug', 'kill'
278-
@node.call_all(method, *command, **kwargs, &block).first
289+
@node.call_all(method, *args, **kwargs, &block).first
279290
when 'flush', 'load'
280-
@node.call_primaries(method, *command, **kwargs, &block).first
281-
else assign_node(*command).send(method, *command, **kwargs, &block)
291+
@node.call_primaries(method, *args, **kwargs, &block).first
292+
else assign_node(*command).send(method, *args, **kwargs, &block)
282293
end
283294
end
284295

285-
def send_pubsub_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
296+
def send_pubsub_command(method, *args, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
297+
command = method == :blocking_call && args.size > 1 ? args[1..] : args
298+
286299
case command[1].to_s.downcase
287-
when 'channels' then @node.call_all(method, *command, **kwargs, &block).flatten.uniq.sort
300+
when 'channels' then @node.call_all(method, *args, **kwargs, &block).flatten.uniq.sort
288301
when 'numsub'
289-
@node.call_all(method, *command, **kwargs, &block).reject(&:empty?).map { |e| Hash[*e] }
302+
@node.call_all(method, *args, **kwargs, &block).reject(&:empty?).map { |e| Hash[*e] }
290303
.reduce({}) { |a, e| a.merge(e) { |_, v1, v2| v1 + v2 } }
291-
when 'numpat' then @node.call_all(method, *command, **kwargs, &block).sum
292-
else assign_node(*command).send(method, *command, **kwargs, &block)
304+
when 'numpat' then @node.call_all(method, *args, **kwargs, &block).sum
305+
else assign_node(*command).send(method, *args, **kwargs, &block)
293306
end
294307
end
295308

lib/redis_client/cluster/node.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,46 +129,46 @@ def find_by(node_key)
129129
@clients.fetch(node_key)
130130
end
131131

132-
def call_all(method, *command, **kwargs, &block)
132+
def call_all(method, *args, **kwargs, &block)
133133
results, errors = try_map do |_, client|
134-
client.send(method, *command, **kwargs, &block)
134+
client.send(method, *args, **kwargs, &block)
135135
end
136136

137137
return results.values if errors.empty?
138138

139139
raise ::RedisClient::Cluster::ErrorCollection, errors
140140
end
141141

142-
def call_primaries(method, *command, **kwargs, &block)
142+
def call_primaries(method, *args, **kwargs, &block)
143143
results, errors = try_map do |node_key, client|
144144
next if replica?(node_key)
145145

146-
client.send(method, *command, **kwargs, &block)
146+
client.send(method, *args, **kwargs, &block)
147147
end
148148

149149
return results.values if errors.empty?
150150

151151
raise ::RedisClient::Cluster::ErrorCollection, errors
152152
end
153153

154-
def call_replicas(method, *command, **kwargs, &block)
155-
return call_primaries(method, *command, **kwargs, &block) if replica_disabled?
154+
def call_replicas(method, *args, **kwargs, &block)
155+
return call_primaries(method, *args, **kwargs, &block) if replica_disabled?
156156

157157
replica_node_keys = @replications.values.map(&:sample)
158158
results, errors = try_map do |node_key, client|
159159
next if primary?(node_key) || !replica_node_keys.include?(node_key)
160160

161-
client.send(method, *command, **kwargs, &block)
161+
client.send(method, *args, **kwargs, &block)
162162
end
163163

164164
return results.values if errors.empty?
165165

166166
raise ::RedisClient::Cluster::ErrorCollection, errors
167167
end
168168

169-
def send_ping(method, *command, **kwargs, &block)
169+
def send_ping(method, *args, **kwargs, &block)
170170
results, errors = try_map do |_, client|
171-
client.send(method, *command, **kwargs, &block)
171+
client.send(method, *args, **kwargs, &block)
172172
end
173173

174174
return results.values if errors.empty?

test/redis_client/test_cluster.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ def test_dedicated_commands
185185
private
186186

187187
def wait_for_replication
188-
@client.call('WAIT', TEST_REPLICA_SIZE, (TEST_TIMEOUT_SEC * 1000).to_i)
188+
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
189+
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
190+
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
189191
end
190192
end
191193

test/test_against_cluster_broken.rb

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,16 @@
33
require 'testing_helper'
44

55
class TestAgainstClusterBroken < TestingWrapper
6+
WAIT_SEC = 3
7+
68
def setup
7-
config = new_test_config
8-
@node_info = ::RedisClient::Cluster::Node.load_info(config.per_node_key)
9+
config = ::RedisClient::ClusterConfig.new(
10+
nodes: TEST_NODE_URIS,
11+
replica: true,
12+
fixed_hostname: TEST_FIXED_HOSTNAME,
13+
**TEST_GENERIC_OPTIONS
14+
)
15+
916
@client = ::RedisClient::Cluster.new(config)
1017
end
1118

@@ -14,72 +21,82 @@ def teardown
1421
end
1522

1623
def test_a_replica_is_down
17-
do_test_a_node_is_down('slave', number_of_keys: 10)
24+
node_key = @client.instance_variable_get(:@node)
25+
.replica_node_keys
26+
.sample
27+
28+
node = @client.instance_variable_get(:@node)
29+
.instance_variable_get(:@clients)
30+
.fetch(node_key)
31+
32+
do_test_a_node_is_down(node, number_of_keys: 10)
1833
end
1934

2035
def test_a_primary_is_down
21-
skip('TODO: kill a node which has replicas at least one')
36+
node_key = @client.instance_variable_get(:@node)
37+
.instance_variable_get(:@replications)
38+
.reject { |_, v| v.size.zero? }
39+
.keys.sample
40+
41+
node = @client.instance_variable_get(:@node)
42+
.instance_variable_get(:@clients)
43+
.fetch(node_key)
2244

23-
# do_test_a_node_is_down('master', number_of_keys: 10)
45+
do_test_a_node_is_down(node, number_of_keys: 10)
2446
end
2547

2648
private
2749

28-
def new_test_config
29-
::RedisClient::ClusterConfig.new(
30-
nodes: TEST_NODE_URIS,
31-
replica: true,
32-
fixed_hostname: TEST_FIXED_HOSTNAME,
33-
**TEST_GENERIC_OPTIONS
34-
)
35-
end
36-
3750
def wait_for_replication
38-
@client.call('WAIT', TEST_REPLICA_SIZE, (TEST_TIMEOUT_SEC * 1000).to_i)
51+
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
52+
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
53+
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
3954
end
4055

41-
def do_test_a_node_is_down(role, number_of_keys:)
42-
number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) }
43-
number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } }
44-
wait_for_replication
56+
def do_test_a_node_is_down(sacrifice, number_of_keys:)
57+
prepare_test_data(number_of_keys: number_of_keys)
4558

46-
kill_a_node(role, kill_attempts: 10)
59+
kill_a_node(sacrifice, kill_attempts: 10)
4760
wait_for_cluster_to_be_ready(wait_attempts: 10)
4861

4962
assert_equal('PONG', @client.call('PING'), 'Case: PING')
5063
do_assertions_without_pipelining(number_of_keys: number_of_keys)
5164
do_assertions_with_pipelining(number_of_keys: number_of_keys)
5265
end
5366

54-
def kill_a_node(role, kill_attempts: 10)
55-
node_key = @node_info.select { |e| e[:role] == role }.sample.fetch(:node_key)
56-
node = @client.send(:find_node, node_key)
57-
refute_nil(node, node_key)
67+
def prepare_test_data(number_of_keys:)
68+
number_of_keys.times { |i| @client.call('SET', "pre-#{i}", i) }
69+
number_of_keys.times { |i| @client.pipelined { |pi| pi.call('SET', "pre-pipelined-#{i}", i) } }
70+
wait_for_replication
71+
end
72+
73+
def kill_a_node(sacrifice, kill_attempts:)
74+
refute_nil(sacrifice, "#{sacrifice.config.host}:#{sacrifice.config.port}")
5875

5976
loop do
6077
break if kill_attempts <= 0
6178

62-
node.call('SHUTDOWN', 'NOSAVE')
79+
sacrifice.call('SHUTDOWN', 'NOSAVE')
6380
rescue ::RedisClient::CommandError => e
6481
raise unless e.message.include?('Errors trying to SHUTDOWN')
6582
rescue ::RedisClient::ConnectionError
6683
break
6784
ensure
6885
kill_attempts -= 1
69-
sleep 3
86+
sleep WAIT_SEC
7087
end
7188

72-
assert_raises(::RedisClient::ConnectionError) { node.call('PING') }
89+
assert_raises(::RedisClient::ConnectionError) { sacrifice.call('PING') }
7390
end
7491

75-
def wait_for_cluster_to_be_ready(wait_attempts: 10)
92+
def wait_for_cluster_to_be_ready(wait_attempts:)
7693
loop do
7794
break if wait_attempts <= 0 || @client.call('PING') == 'PONG'
7895
rescue ::RedisClient::Cluster::NodeMightBeDown
7996
# ignore
8097
ensure
8198
wait_attempts -= 1
82-
sleep 3
99+
sleep WAIT_SEC
83100
end
84101
end
85102

test/test_against_cluster_state.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ def test_the_state_of_cluster_resharding_with_pipelining
6565
private
6666

6767
def wait_for_replication
68-
@client.call('WAIT', TEST_REPLICA_SIZE, (TEST_TIMEOUT_SEC * 1000).to_i)
68+
client_side_timeout = TEST_TIMEOUT_SEC + 1.0
69+
server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i
70+
@client.blocking_call(client_side_timeout, 'WAIT', TEST_REPLICA_SIZE, server_side_timeout)
6971
end
7072

7173
def fetch_cluster_info(key)

0 commit comments

Comments
 (0)