Skip to content

Commit 8662a21

Browse files
authored
Fix node down test cases (#46)
1 parent cb02245 commit 8662a21

File tree

11 files changed

+189
-124
lines changed

11 files changed

+189
-124
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ RedisClient.cluster(timeout: 3.0).new_client
3939
```
4040

4141
```ruby
42-
# To connect with subset nodes for startup
42+
# To connect with a subset of nodes for startup
4343
RedisClient.cluster(nodes: %w[redis://node1:6379 redis://node2:6379]).new_client
4444
```
4545

@@ -70,7 +70,7 @@ The other methods are not implemented because the client cannot operate with clu
7070
`#pipelined` method splits and sends commands to each node and aggregates replies.
7171

7272
## Multiple keys and CROSSSLOT error
73-
A part of commands can be passed multiple keys. But it has a constraint the keys are in the same hash slot.
73+
A subset of commands can be passed multiple keys. But it has a constraint the keys are in the same hash slot.
7474
The following error occurs because keys must be in the same hash slot and not just the same node.
7575

7676
```ruby

lib/redis_client/cluster.rb

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,10 @@ def pubsub
170170
end
171171

172172
def close
173-
@node.each(&:close)
173+
@node.call_all(:close)
174174
nil
175+
rescue StandardError
176+
# ignore
175177
end
176178

177179
private
@@ -187,10 +189,11 @@ def fetch_cluster_info!(config, pool: nil, **kwargs)
187189
def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
188190
cmd = command.first.to_s.downcase
189191
case cmd
190-
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save', 'ping'
192+
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
191193
@node.call_all(method, *command, **kwargs, &block).first
192194
when 'flushall', 'flushdb'
193195
@node.call_primaries(method, *command, **kwargs, &block).first
196+
when 'ping' then @node.send_ping(method, *command, **kwargs, &block).first
194197
when 'wait' then send_wait_command(method, *command, **kwargs, &block)
195198
when 'keys' then @node.call_replicas(method, *command, **kwargs, &block).flatten.sort
196199
when 'dbsize' then @node.call_replicas(method, *command, **kwargs, &block).sum
@@ -211,15 +214,18 @@ def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/A
211214
node = assign_node(*command)
212215
try_send(node, method, *command, **kwargs, &block)
213216
end
214-
rescue RedisClient::Cluster::ErrorCollection => e
215-
update_cluster_info! if e.errors.values.map(&:class).any?(::RedisClient::ConnectionError)
216-
raise
217+
rescue RedisClient::Cluster::Node::ReloadNeeded
218+
update_cluster_info!
219+
raise ::RedisClient::Cluster::NodeMightBeDown
217220
end
218221

219222
def send_wait_command(method, *command, retry_count: 3, **kwargs, &block)
220223
@node.call_primaries(method, *command, **kwargs, &block).sum
221224
rescue RedisClient::Cluster::ErrorCollection => e
222-
raise if retry_count <= 0 || e.errors.values.map(&:message).grep(/ERR WAIT cannot be used with replica instances/).empty?
225+
raise if retry_count <= 0
226+
raise if e.errors.values.none? do |err|
227+
err.message.include?('WAIT cannot be used with replica instances')
228+
end
223229

224230
update_cluster_info!
225231
retry_count -= 1
@@ -353,44 +359,30 @@ def assign_node(*command)
353359
find_node(node_key)
354360
end
355361

356-
def find_node_key(*command, primary_only: false) # rubocop:disable Metrics/MethodLength
362+
def find_node_key(*command, primary_only: false)
357363
key = @command.extract_first_key(command)
358-
if key.empty?
359-
return @node.primary_node_keys.sample if @command.should_send_to_primary?(command) || primary_only
360-
361-
return @node.replica_node_keys.sample
362-
end
363-
364-
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
365-
return unless @node.slot_exists?(slot)
364+
slot = key.empty? ? nil : ::RedisClient::Cluster::KeySlotConverter.convert(key)
366365

367366
if @command.should_send_to_primary?(command) || primary_only
368-
@node.find_node_key_of_primary(slot)
367+
@node.find_node_key_of_primary(slot) || @node.primary_node_keys.sample
369368
else
370-
@node.find_node_key_of_replica(slot)
369+
@node.find_node_key_of_replica(slot) || @node.replica_node_keys.sample
371370
end
372371
end
373372

374373
def find_node(node_key, retry_count: 3)
375-
return @node.sample if node_key.nil?
376-
377374
@node.find_by(node_key)
378375
rescue ::RedisClient::Cluster::Node::ReloadNeeded
379-
raise(::RedisClient::ConnectionError, 'unstable cluster state') if retry_count <= 0
376+
raise ::RedieClient::Cluster::NodeMightBeDown if retry_count <= 0
380377

381-
update_cluster_info!(node_key)
378+
update_cluster_info!
382379
retry_count -= 1
383380
retry
384381
end
385382

386-
def update_cluster_info!(node_key = nil)
383+
def update_cluster_info!
387384
@mutex.synchronize do
388-
unless node_key.nil?
389-
host, port = ::RedisClient::Cluster::NodeKey.split(node_key)
390-
@config.add_node(host, port)
391-
end
392-
393-
@node.each(&:close)
385+
close
394386
@node = fetch_cluster_info!(@config, pool: @pool, **@client_kwargs)
395387
end
396388
end

lib/redis_client/cluster/command.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,14 @@ def dig_details(command, key)
6868
@details.fetch(name).fetch(key)
6969
end
7070

71-
def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity
71+
def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength
7272
case command&.flatten&.first.to_s.downcase
73-
when 'eval', 'evalsha', 'migrate', 'zinterstore', 'zunionstore' then 3
73+
when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
7474
when 'object' then 2
7575
when 'memory'
7676
command[1].to_s.casecmp('usage').zero? ? 2 : 0
77+
when 'migrate'
78+
command[3] == '""' ? determine_optional_key_position(command, 'keys') : 3
7779
when 'xread', 'xreadgroup'
7880
determine_optional_key_position(command, 'streams')
7981
else

lib/redis_client/cluster/errors.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,15 @@ def initialize(command)
5151
super("Cluster client doesn't know which node the #{command} command should be sent to.")
5252
end
5353
end
54+
55+
class NodeMightBeDown < ::RedisClient::Error
56+
def initialize(_ = '')
57+
super(
58+
'The client is trying to fetch the latest cluster state '\
59+
'because a subset of nodes might be down. '\
60+
'It might continue to raise errors for a while.'
61+
)
62+
end
63+
end
5464
end
5565
end

lib/redis_client/cluster/key_slot_converter.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ module KeySlotConverter
4343
module_function
4444

4545
def convert(key)
46+
return nil if key.nil?
47+
4648
crc = 0
4749
key.each_byte do |b|
4850
crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff]

lib/redis_client/cluster/node.rb

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class Node
1212
SLOT_SIZE = 16_384
1313
MIN_SLOT = 0
1414
MAX_SLOT = SLOT_SIZE - 1
15+
MAX_STARTUP_SAMPLE = 37
1516
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
1617

1718
ReloadNeeded = Class.new(::RedisClient::Error)
@@ -32,19 +33,33 @@ def build_connection_prelude
3233
end
3334

3435
class << self
35-
def load_info(options, **kwargs)
36-
startup_nodes = ::RedisClient::Cluster::Node.new(options, **kwargs)
37-
38-
errors = startup_nodes.map do |n|
39-
reply = n.call('CLUSTER', 'NODES')
40-
return parse_node_info(reply)
41-
rescue ::RedisClient::ConnectionError, ::RedisClient::CommandError => e
42-
e
36+
def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
37+
startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size
38+
node_info_list = Array.new(startup_size)
39+
errors = Array.new(startup_size)
40+
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
41+
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs)
42+
threads = startup_nodes.each_with_index.map do |raw_client, idx|
43+
Thread.new(raw_client, idx) do |cli, i|
44+
Thread.pass
45+
reply = cli.call('CLUSTER', 'NODES')
46+
node_info_list[i] = parse_node_info(reply)
47+
rescue StandardError => e
48+
errors[i] = e
49+
ensure
50+
cli&.close
51+
end
52+
end
53+
threads.each(&:join)
54+
raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.all?(&:nil?)
55+
56+
grouped = node_info_list.compact.group_by do |rows|
57+
rows.sort_by { |row| row[:id] }
58+
.map { |r| "#{r[:id]}#{r[:node_key]}#{r[:role]}#{r[:primary_id]}#{r[:config_epoch]}" }
59+
.join
4360
end
4461

45-
raise ::RedisClient::Cluster::InitialSetupError, errors
46-
ensure
47-
startup_nodes&.each(&:close)
62+
grouped.max_by { |_, v| v.size }[1].first
4863
end
4964

5065
private
@@ -109,32 +124,58 @@ def replica_node_keys
109124
end
110125

111126
def find_by(node_key)
127+
raise ReloadNeeded if node_key.nil? || !@clients.key?(node_key)
128+
112129
@clients.fetch(node_key)
113-
rescue KeyError
114-
raise ReloadNeeded
115130
end
116131

117132
def call_all(method, *command, **kwargs, &block)
118-
try_map { |_, client| client.send(method, *command, **kwargs, &block) }.values
133+
results, errors = try_map do |_, client|
134+
client.send(method, *command, **kwargs, &block)
135+
end
136+
137+
return results.values if errors.empty?
138+
139+
raise ::RedisClient::Cluster::ErrorCollection, errors
119140
end
120141

121142
def call_primaries(method, *command, **kwargs, &block)
122-
try_map do |node_key, client|
143+
results, errors = try_map do |node_key, client|
123144
next if replica?(node_key)
124145

125146
client.send(method, *command, **kwargs, &block)
126-
end.values
147+
end
148+
149+
return results.values if errors.empty?
150+
151+
raise ::RedisClient::Cluster::ErrorCollection, errors
127152
end
128153

129154
def call_replicas(method, *command, **kwargs, &block)
130155
return call_primaries(method, *command, **kwargs, &block) if replica_disabled?
131156

132157
replica_node_keys = @replications.values.map(&:sample)
133-
try_map do |node_key, client|
158+
results, errors = try_map do |node_key, client|
134159
next if primary?(node_key) || !replica_node_keys.include?(node_key)
135160

136161
client.send(method, *command, **kwargs, &block)
137-
end.values
162+
end
163+
164+
return results.values if errors.empty?
165+
166+
raise ::RedisClient::Cluster::ErrorCollection, errors
167+
end
168+
169+
def send_ping(method, *command, **kwargs, &block)
170+
results, errors = try_map do |_, client|
171+
client.send(method, *command, **kwargs, &block)
172+
end
173+
174+
return results.values if errors.empty?
175+
176+
raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)
177+
178+
raise ::RedisClient::Cluster::ErrorCollection, errors
138179
end
139180

140181
def scale_reading_clients
@@ -144,21 +185,18 @@ def scale_reading_clients
144185
end
145186
end
146187

147-
def slot_exists?(slot)
148-
slot = Integer(slot)
149-
return false if slot < MIN_SLOT || slot > MAX_SLOT
150-
151-
!@slots[slot].nil?
152-
end
153-
154188
def find_node_key_of_primary(slot)
189+
return if slot.nil?
190+
155191
slot = Integer(slot)
156192
return if slot < MIN_SLOT || slot > MAX_SLOT
157193

158194
@slots[slot]
159195
end
160196

161197
def find_node_key_of_replica(slot)
198+
return if slot.nil?
199+
162200
slot = Integer(slot)
163201
return if slot < MIN_SLOT || slot > MAX_SLOT
164202

@@ -241,9 +279,7 @@ def try_map # rubocop:disable Metrics/MethodLength
241279
end
242280

243281
threads.each(&:join)
244-
return results if errors.empty?
245-
246-
raise ::RedisClient::Cluster::ErrorCollection, errors
282+
[results, errors]
247283
end
248284
end
249285
end

test/redis_client/cluster/test_command.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def test_determine_first_key_position
176176
{ command: [['EVAL'], '"return ARGV[1]"', 0, 'hello'], want: 3 },
177177
{ command: %w[EVALSHA sha1 2 foo bar baz zap], want: 3 },
178178
{ command: %w[MIGRATE host port key 0 5 COPY], want: 3 },
179+
{ command: %w[MIGRATE host port "" 0 5 COPY KEYS key], want: 8 },
179180
{ command: %w[ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: 3 },
180181
{ command: %w[ZUNIONSTORE out 2 zset1 zset2 WEIGHTS 2 3], want: 3 },
181182
{ command: %w[OBJECT HELP], want: 2 },

test/redis_client/cluster/test_key_slot_converter.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ def test_convert
1919
got = ::RedisClient::Cluster::KeySlotConverter.convert(key)
2020
assert_equal(want, got, "Case: #{idx}")
2121
end
22+
23+
assert_nil(::RedisClient::Cluster::KeySlotConverter.convert(nil), 'Case: nil')
24+
25+
multi_byte_key = 'あいうえお'
26+
want = @raw_clients.first.call('CLUSTER', 'KEYSLOT', multi_byte_key)
27+
got = ::RedisClient::Cluster::KeySlotConverter.convert(multi_byte_key)
28+
assert_equal(want, got, "Case: #{multi_byte_key}")
2229
end
2330
end
2431
end

test/redis_client/cluster/test_node.rb

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -246,19 +246,12 @@ def test_scale_reading_clients # rubocop:disable Metrics/CyclomaticComplexity
246246
got.each { |e| assert_includes(want, e, 'Case: scale read') }
247247
end
248248

249-
def test_slot_exists?
250-
refute(@test_node.slot_exists?(-1))
251-
assert(@test_node.slot_exists?(0))
252-
assert(@test_node.slot_exists?(16_383))
253-
refute(@test_node.slot_exists?(16_384))
254-
assert_raises(TypeError) { @test_node.slot_exists?(:foo) }
255-
end
256-
257249
def test_find_node_key_of_primary
258250
sample_node = @test_node_info.find { |info| info[:role] == 'master' }
259251
sample_slot = sample_node[:slots].first.first
260252
got = @test_node.find_node_key_of_primary(sample_slot)
261-
assert_equal(sample_node[:node_key], got)
253+
assert_equal(sample_node[:node_key], got, 'Case: sample slot')
254+
assert_nil(@test_node.find_node_key_of_primary(nil), 'Case: nil')
262255
end
263256

264257
def test_find_node_key_of_replica
@@ -273,6 +266,8 @@ def test_find_node_key_of_replica
273266
got = @test_node_with_scale_read.find_node_key_of_replica(sample_slot)
274267
want = sample_replicas.map { |info| info[:node_key] }
275268
assert_includes(want, got, 'Case: scale read')
269+
270+
assert_nil(@test_node.find_node_key_of_replica(nil), 'Case: nil')
276271
end
277272

278273
def test_update_slot
@@ -407,15 +402,15 @@ def test_build_clients # rubocop:disable Metrics/CyclomaticComplexity, Metrics/P
407402
def test_try_map
408403
primary_node_keys = @test_node_info.select { |info| info[:role] == 'master' }.map { |info| info[:node_key] }
409404
[
410-
{ block: ->(_, client) { client.call('PING') }, want: primary_node_keys.to_h { |k| [k, 'PONG'] } },
411-
{ block: ->(_, client) { client.call('UNKNOWN') }, error: ::RedisClient::Cluster::ErrorCollection }
405+
{ block: ->(_, client) { client.call('PING') }, results: primary_node_keys.to_h { |k| [k, 'PONG'] } },
406+
{ block: ->(_, client) { client.call('UNKNOWN') }, errors: ::RedisClient::CommandError }
412407
].each_with_index do |c, idx|
413408
msg = "Case: #{idx}"
414-
got = -> { @test_node.send(:try_map, &c[:block]) }
415-
if c.key?(:error)
416-
assert_raises(c[:error], msg, &got)
409+
results, errors = @test_node.send(:try_map, &c[:block])
410+
if c.key?(:errors)
411+
errors.each_value { |e| assert_instance_of(c[:errors], e, msg) }
417412
else
418-
assert_equal(c[:want], got.call, msg)
413+
assert_equal(c[:results], results, msg)
419414
end
420415
end
421416
end

0 commit comments

Comments
 (0)