Skip to content

Commit 22cbbb8

Browse files
authored
Add node down test (#39)
1 parent 18d2898 commit 22cbbb8

File tree

11 files changed

+190
-22
lines changed

11 files changed

+190
-22
lines changed

.github/workflows/test.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,35 @@ jobs:
124124
run: bundle exec rake test_cluster_state
125125
- name: Stop containers
126126
run: docker compose -f $DOCKER_COMPOSE_FILE down
127+
cluster-broken:
128+
name: Cluster Broken
129+
timeout-minutes: 10
130+
strategy:
131+
fail-fast: false
132+
matrix:
133+
redis: ['7.0.1', '6.2.7']
134+
runs-on: ubuntu-latest
135+
env:
136+
REDIS_VERSION: ${{ matrix.redis }}
137+
DOCKER_COMPOSE_FILE: 'docker-compose.yaml'
138+
RESTART_POLICY: 'no'
139+
steps:
140+
- name: Check out code
141+
uses: actions/checkout@v3
142+
- name: Set up Ruby
143+
uses: ruby/setup-ruby@v1
144+
with:
145+
ruby-version: '3.1'
146+
bundler-cache: true
147+
- name: Pull Docker images
148+
run: docker pull redis:$REDIS_VERSION
149+
- name: Run containers
150+
run: docker compose -f $DOCKER_COMPOSE_FILE up -d
151+
- name: Wait for Redis cluster to be ready
152+
run: bundle exec rake wait
153+
- name: Print containers
154+
run: docker compose -f $DOCKER_COMPOSE_FILE ps
155+
- name: Run minitest
156+
run: bundle exec rake test_cluster_broken
157+
- name: Stop containers
158+
run: docker compose -f $DOCKER_COMPOSE_FILE down

Rakefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ task default: :test
77
Rake::TestTask.new :test do |t|
88
t.libs << :test
99
t.libs << :lib
10-
files = Dir['test/**/test_*.rb'].grep_v(/test_against_cluster_state/)
10+
files = Dir['test/**/test_*.rb'].grep_v(/test_against_cluster_(state|broken)/)
1111
files = ARGV[1..] if ARGV.size > 1
1212
t.test_files = files
1313
t.options = '-v'
@@ -20,6 +20,13 @@ Rake::TestTask.new :test_cluster_state do |t|
2020
t.options = '-v'
2121
end
2222

23+
Rake::TestTask.new :test_cluster_broken do |t|
24+
t.libs << :test
25+
t.libs << :lib
26+
t.test_files = %w[test/test_against_cluster_broken.rb]
27+
t.options = '-v'
28+
end
29+
2330
desc 'Wait for cluster to be ready'
2431
task :wait do
2532
$LOAD_PATH.unshift(File.expand_path('test', __dir__))

docker-compose.replica.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ services:
1111
--cluster-enabled yes
1212
--cluster-config-file nodes.conf
1313
--cluster-node-timeout 5000
14+
restart: "${RESTART_POLICY:-always}"
1415
healthcheck:
1516
test: ["CMD", "redis-cli", "ping"]
1617
interval: "7s"

docker-compose.ssl.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ services:
1818
--tls-ca-cert-file /etc/ssl/private/redis-rb-ca.crt
1919
--tls-cluster yes
2020
--tls-replication yes
21+
restart: "${RESTART_POLICY:-always}"
2122
healthcheck:
2223
test:
2324
- "CMD-SHELL"

docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ services:
1010
--cluster-enabled yes
1111
--cluster-config-file nodes.conf
1212
--cluster-node-timeout 5000
13+
restart: "${RESTART_POLICY:-always}"
1314
healthcheck:
1415
test: ["CMD", "redis-cli", "ping"]
1516
interval: "7s"

lib/redis_client/cluster.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def fetch_cluster_info!(config, pool: nil, **kwargs)
182182
def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
183183
cmd = command.first.to_s.downcase
184184
case cmd
185-
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
185+
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save', 'ping'
186186
@node.call_all(method, *command, **kwargs, &block).first
187187
when 'flushall', 'flushdb'
188188
@node.call_primary(method, *command, **kwargs, &block).first
@@ -206,12 +206,15 @@ def send_command(method, *command, **kwargs, &block) # rubocop:disable Metrics/A
206206
node = assign_node(*command)
207207
try_send(node, method, *command, **kwargs, &block)
208208
end
209+
rescue RedisClient::Cluster::CommandErrorCollection => e
210+
update_cluster_info! if e.errors.values.map(&:class).any?(::RedisClient::ConnectionError)
211+
raise
209212
end
210213

211214
def send_wait_command(method, *command, retry_count: 3, **kwargs, &block)
212215
@node.call_primary(method, *command, **kwargs, &block).sum
213216
rescue RedisClient::Cluster::CommandErrorCollection => e
214-
raise if e.errors.values.map(&:message).grep(/ERR WAIT cannot be used with replica instances/).size.zero?
217+
raise if retry_count <= 0 || e.errors.values.map(&:message).grep(/ERR WAIT cannot be used with replica instances/).empty?
215218

216219
update_cluster_info!
217220
retry_count -= 1
@@ -277,18 +280,16 @@ def send_pubsub_command(method, *command, **kwargs, &block) # rubocop:disable Me
277280

278281
# @see https://redis.io/topics/cluster-spec#redirection-and-resharding
279282
# Redirection and resharding
280-
def try_send(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
283+
def try_send(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
281284
node.send(method, *args, **kwargs, &block)
282285
rescue ::RedisClient::CommandError => e
283-
if e.message.start_with?(REPLY_MOVED)
284-
raise if retry_count <= 0
286+
raise if retry_count <= 0
285287

288+
if e.message.start_with?(REPLY_MOVED)
286289
node = assign_redirection_node(e.message)
287290
retry_count -= 1
288291
retry
289292
elsif e.message.start_with?(REPLY_ASK)
290-
raise if retry_count <= 0
291-
292293
node = assign_asking_node(e.message)
293294
node.call(CMD_ASKING)
294295
retry_count -= 1
@@ -297,8 +298,11 @@ def try_send(node, method, *args, retry_count: 3, **kwargs, &block) # rubocop:di
297298
raise
298299
end
299300
rescue ::RedisClient::ConnectionError
301+
raise if retry_count <= 0
302+
300303
update_cluster_info!
301-
raise
304+
retry_count -= 1
305+
retry
302306
end
303307

304308
def _scan(*command, **kwargs) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
@@ -358,13 +362,16 @@ def find_node_key(*command, primary_only: false) # rubocop:disable Metrics/Metho
358362
end
359363
end
360364

361-
def find_node(node_key)
365+
def find_node(node_key, retry_count: 3)
362366
return @node.sample if node_key.nil?
363367

364368
@node.find_by(node_key)
365369
rescue ::RedisClient::Cluster::Node::ReloadNeeded
370+
raise(::RedisClient::ConnectionError, 'unstable cluster state') if retry_count <= 0
371+
366372
update_cluster_info!(node_key)
367-
@node.find_by(node_key)
373+
retry_count -= 1
374+
retry
368375
end
369376

370377
def update_cluster_info!(node_key = nil)

lib/redis_client/cluster/node.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ def update_slot(slot, node_key)
171171
@mutex.synchronize { @slots[slot] = node_key }
172172
end
173173

174+
def replicated?(primary_node_key, replica_node_key)
175+
return false if @replications.nil? || @replications.size.zero?
176+
177+
@replications.fetch(primary_node_key).include?(replica_node_key)
178+
end
179+
174180
private
175181

176182
def replica_disabled?
@@ -182,7 +188,9 @@ def primary?(node_key)
182188
end
183189

184190
def replica?(node_key)
185-
!(@replications.nil? || @replications.size.zero?) && !@replications.key?(node_key)
191+
return false if @replications.nil? || @replications.size.zero?
192+
193+
!@replications.key?(node_key)
186194
end
187195

188196
def build_slot_node_mappings(node_info)
@@ -196,11 +204,12 @@ def build_slot_node_mappings(node_info)
196204
slots
197205
end
198206

199-
def build_replication_mappings(node_info)
207+
def build_replication_mappings(node_info) # rubocop:disable Metrics/AbcSize
200208
dict = node_info.to_h { |info| [info[:id], info] }
201209
node_info.each_with_object(Hash.new { |h, k| h[k] = [] }) do |info, acc|
202210
primary_info = dict[info[:primary_id]]
203211
acc[primary_info[:node_key]] << info[:node_key] unless primary_info.nil?
212+
acc[info[:node_key]] if info[:role] == 'master' # for the primary which have no replicas
204213
end
205214
end
206215

test/cluster_controller.rb

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ def wait_meeting(clients, max_attempts:)
140140
wait_for_state(clients, max_attempts: max_attempts) do |client|
141141
info = hashify_cluster_info(client)
142142
info['cluster_known_nodes'].to_s == clients.size.to_s
143+
rescue ::RedisClient::ConnectionError
144+
true
143145
end
144146
end
145147

@@ -175,20 +177,26 @@ def wait_cluster_building(clients, max_attempts:)
175177
wait_for_state(clients, max_attempts: max_attempts) do |client|
176178
info = hashify_cluster_info(client)
177179
info['cluster_state'] == 'ok'
180+
rescue ::RedisClient::ConnectionError
181+
true
178182
end
179183
end
180184

181185
def wait_replication(clients, number_of_replicas:, max_attempts:)
182186
wait_for_state(clients, max_attempts: max_attempts) do |client|
183187
flags = hashify_cluster_node_flags(clients, client: client)
184188
flags.values.count { |f| f == 'slave' } == number_of_replicas
189+
rescue ::RedisClient::ConnectionError
190+
true
185191
end
186192
end
187193

188194
def wait_failover(clients, master_key:, slave_key:, max_attempts:)
189195
wait_for_state(clients, max_attempts: max_attempts) do |client|
190196
flags = hashify_cluster_node_flags(clients, client: client)
191197
flags[master_key] == 'slave' && flags[slave_key] == 'master'
198+
rescue ::RedisClient::ConnectionError
199+
true
192200
end
193201
end
194202

@@ -197,6 +205,8 @@ def wait_replication_delay(clients, timeout:)
197205
wait_for_state(clients, max_attempts: clients.size + 1) do |client|
198206
client.blocking_call(timeout, 'WAIT', @replica_size, timeout_msec - 100) if client.call('ROLE').first == 'master'
199207
true
208+
rescue ::RedisClient::ConnectionError
209+
true
200210
end
201211
end
202212

@@ -214,6 +224,8 @@ def wait_cluster_recovering(clients, max_attempts:)
214224
else
215225
true
216226
end
227+
rescue ::RedisClient::ConnectionError
228+
true
217229
end
218230
end
219231

@@ -241,9 +253,13 @@ def hashify_cluster_node_flags(clients, client: nil)
241253
.to_h { |arr| [id2key[arr[0]], (arr[2].split(',') & %w[master slave]).first] }
242254
end
243255

244-
def hashify_node_map(clients, client: nil)
256+
def hashify_node_map(clients)
245257
id2key = fetch_internal_id_to_node_key_mappings(clients)
246-
fetch_cluster_nodes(client || clients.first).to_h { |arr| [id2key[arr[0]], arr[0]] }
258+
clients.each do |client|
259+
return fetch_cluster_nodes(client).to_h { |arr| [id2key[arr[0]], arr[0]] }
260+
rescue ::RedisClient::ConnectionError
261+
next
262+
end
247263
end
248264

249265
def fetch_internal_id_by_natted_node_key(client, node_key)

test/redis_client/cluster/test_node.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,13 @@ def test_update_slot
283283
assert_equal(another_node_key, @test_node.find_node_key_of_primary(sample_slot))
284284
end
285285

286+
def test_replicated?
287+
@test_node_info.select { |info| info[:role] == 'slave' }.each do |replica_info|
288+
primary_info = @test_node_info.find { |info| info[:id] == replica_info[:primary_id] }
289+
assert(@test_node.replicated?(primary_info[:node_key], replica_info[:node_key]))
290+
end
291+
end
292+
286293
def test_replica_disabled?
287294
assert(@test_node.send(:replica_disabled?))
288295
refute(@test_node_with_scale_read.send(:replica_disabled?))
@@ -331,6 +338,7 @@ def test_build_replication_mappings
331338
node_key7 = '127.0.0.1:7007'
332339
node_key8 = '127.0.0.1:7008'
333340
node_key9 = '127.0.0.1:7009'
341+
334342
node_info = [
335343
{ id: '1', node_key: node_key1, primary_id: '-' },
336344
{ id: '2', node_key: node_key2, primary_id: '-' },
@@ -344,12 +352,26 @@ def test_build_replication_mappings
344352
]
345353
got = @test_node.send(:build_replication_mappings, node_info)
346354
got.transform_values!(&:sort!)
347-
assert_same(node_key4, got[node_key1][0])
348-
assert_same(node_key7, got[node_key1][1])
349-
assert_same(node_key5, got[node_key2][0])
350-
assert_same(node_key8, got[node_key2][1])
351-
assert_same(node_key6, got[node_key3][0])
352-
assert_same(node_key9, got[node_key3][1])
355+
assert_same(node_key4, got[node_key1][0], 'Case: regular')
356+
assert_same(node_key7, got[node_key1][1], 'Case: regular')
357+
assert_same(node_key5, got[node_key2][0], 'Case: regular')
358+
assert_same(node_key8, got[node_key2][1], 'Case: regular')
359+
assert_same(node_key6, got[node_key3][0], 'Case: regular')
360+
assert_same(node_key9, got[node_key3][1], 'Case: regular')
361+
362+
node_info = [
363+
{ id: '1', role: 'master', node_key: node_key1, primary_id: '-' },
364+
{ id: '3', role: 'master', node_key: node_key3, primary_id: '-' },
365+
{ id: '4', role: 'slave', node_key: node_key4, primary_id: '1' },
366+
{ id: '5', role: 'master', node_key: node_key5, primary_id: '-' },
367+
{ id: '6', role: 'slave', node_key: node_key6, primary_id: '3' }
368+
]
369+
got = @test_node.send(:build_replication_mappings, node_info)
370+
got.transform_values!(&:sort!)
371+
assert_equal(3, got.size, 'Case: lack of replica')
372+
assert_same(node_key4, got[node_key1][0], 'Case: lack of replica')
373+
assert_same(node_key6, got[node_key3][0], 'Case: lack of replica')
374+
assert_empty(got[node_key5], 'Case: lack of replica')
353375
end
354376

355377
def test_build_clients # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

test/test_against_cluster_broken.rb

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# frozen_string_literal: true
2+
3+
require 'testing_helper'
4+
5+
class TestAgainstClusterBroken < TestingWrapper
6+
def setup
7+
@client = new_test_client
8+
@controller = new_cluster_controller
9+
end
10+
11+
def teardown
12+
@client.close
13+
@controller.close
14+
end
15+
16+
def test_several_nodes_are_down # rubocop:disable Metrics/CyclomaticComplexity
17+
skip('TODO: https://github.com/redis-rb/redis-cluster-client/issues/9')
18+
19+
node = @client.instance_variable_get(:@node)
20+
primary_node_key = node.primary_node_keys.sample
21+
replica_node_key = node.replica_node_keys.reject { |k| node.replicated?(primary_node_key, k) }.sample
22+
23+
msgs = []
24+
msgs += @client.call('CLUSTER', 'NODES').split("\n")
25+
msgs += ["Primary: #{primary_node_key}, Replica: #{replica_node_key}"]
26+
27+
node.instance_variable_get(:@clients).each do |node_key, raw_cli|
28+
next if node_key != primary_node_key && node_key != replica_node_key
29+
30+
# @see https://github.com/redis/redis/blob/475563e2e941ebbdb83f50474bf2daa5ae276fcf/src/debug.c#L387-L493
31+
raw_cli.call('SHUTDOWN')
32+
end
33+
34+
@controller.wait_for_cluster_to_be_ready
35+
36+
errors = []
37+
begin
38+
@client.call_once('PING')
39+
rescue ::RedisClient::Error => e
40+
errors << e
41+
end
42+
43+
msgs += @client.call('CLUSTER', 'NODES').split("\n")
44+
msgs += errors.group_by(&:message).map do |k, v|
45+
"#{v.first.class.name}: #{k}\n#{v.first.backtrace.take(10).join("\n")}"
46+
end
47+
msg = msgs.join("\n")
48+
49+
assert_equal([1], @client.instance_variable_get(:@node).node_keys, msg)
50+
assert_equal(0, errors.size, msg)
51+
end
52+
53+
private
54+
55+
def new_test_client
56+
config = ::RedisClient::ClusterConfig.new(
57+
nodes: TEST_NODE_URIS,
58+
replica: true,
59+
fixed_hostname: TEST_FIXED_HOSTNAME,
60+
**TEST_GENERIC_OPTIONS
61+
)
62+
::RedisClient::Cluster.new(config)
63+
end
64+
65+
def new_cluster_controller
66+
@controller = ClusterController.new(
67+
TEST_NODE_URIS,
68+
replica_size: TEST_REPLICA_SIZE,
69+
**TEST_GENERIC_OPTIONS.merge(timeout: 30.0)
70+
)
71+
end
72+
end

0 commit comments

Comments
 (0)