Skip to content

Commit 7a470e8

Browse files
committed
Use WAIT command instead of uncertain sleep when waiting for replication delay
1 parent 35898db commit 7a470e8

File tree

5 files changed

+67
-29
lines changed

5 files changed

+67
-29
lines changed

lib/redis/cluster.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def send_command(command, &block)
132132
@node.call_all(command, &block).first
133133
when 'flushall', 'flushdb'
134134
@node.call_master(command, &block).first
135+
when 'wait' then @node.call_master(command, &block).reduce(:+)
135136
when 'keys' then @node.call_slave(command, &block).flatten.sort
136137
when 'dbsize' then @node.call_slave(command, &block).reduce(:+)
137138
when 'lastsave' then @node.call_all(command, &block).sort

test/cluster_client_transactions_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def test_transaction_with_replicas
4040
100.times { |i| cli.set("{key}#{i}", i) }
4141
end
4242

43-
sleep 0.5
43+
rc1.wait(1, TIMEOUT.to_i * 1000)
4444

4545
100.times { |i| assert_equal i.to_s, rc1.get("{key}#{i}") }
4646
100.times { |i| assert_equal i.to_s, rc2.get("{key}#{i}") }

test/cluster_commands_on_keys_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def test_unlink
115115

116116
def test_wait
117117
set_some_keys
118-
assert_equal 1, redis.wait(1, 0)
118+
assert_equal 3, redis.wait(1, TIMEOUT.to_i * 1000)
119119
end
120120

121121
def test_scan

test/helper.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def redis_cluster_mock(commands, options = {})
297297
end
298298

299299
def redis_cluster_down
300-
trib = ClusterOrchestrator.new(_default_nodes)
300+
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
301301
trib.down
302302
yield
303303
ensure
@@ -306,7 +306,7 @@ def redis_cluster_down
306306
end
307307

308308
def redis_cluster_failover
309-
trib = ClusterOrchestrator.new(_default_nodes)
309+
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
310310
trib.failover
311311
yield
312312
ensure
@@ -318,7 +318,7 @@ def redis_cluster_failover
318318
# @param src [String] <ip>:<port>
319319
# @param dest [String] <ip>:<port>
320320
def redis_cluster_resharding(slot, src:, dest:)
321-
trib = ClusterOrchestrator.new(_default_nodes)
321+
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
322322
trib.start_resharding(slot, src, dest)
323323
yield
324324
trib.finish_resharding(slot, dest)

test/support/cluster/orchestrator.rb

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
class ClusterOrchestrator
66
SLOT_SIZE = 16384
77

8-
def initialize(node_addrs)
8+
def initialize(node_addrs, timeout: 30.0)
99
raise 'Redis Cluster requires at least 3 master nodes.' if node_addrs.size < 3
10-
timeout_sec = Float(ENV['TIMEOUT'] || 30.0)
11-
@clients = node_addrs.map { |addr| Redis.new(url: addr, timeout: timeout_sec) }
10+
@clients = node_addrs.map { |addr| Redis.new(url: addr, timeout: timeout) }
11+
@timeout = timeout
1212
end
1313

1414
def rebuild
@@ -21,6 +21,8 @@ def rebuild
2121
replicate(@clients)
2222
save_config(@clients)
2323
wait_cluster_building(@clients)
24+
wait_replication(@clients)
25+
wait_cluster_recovering(@clients)
2426
end
2527

2628
def down
@@ -30,8 +32,11 @@ def down
3032

3133
def failover
3234
master, slave = take_replication_pairs(@clients)
35+
wait_replication_delay(@clients, @timeout)
3336
slave.cluster(:failover, :takeover)
3437
wait_failover(to_node_key(master), to_node_key(slave), @clients)
38+
wait_replication_delay(@clients, @timeout)
39+
wait_cluster_recovering(@clients)
3540
end
3641

3742
def start_resharding(slot, src_node_key, dest_node_key)
@@ -117,14 +122,12 @@ def meet_each_other(clients)
117122
end
118123
end
119124

120-
def wait_meeting(clients)
121-
first_cliient = clients.first
122-
size = clients.size
125+
def wait_meeting(clients, max_attempts: 600)
126+
size = clients.size.to_s
123127

124-
loop do
125-
info = hashify_cluster_info(first_cliient)
126-
break if info['cluster_known_nodes'].to_i == size
127-
sleep 0.1
128+
wait_for_state(clients, max_attempts) do |client|
129+
info = hashify_cluster_info(client)
130+
info['cluster_known_nodes'] == size
128131
end
129132
end
130133

@@ -157,27 +160,61 @@ def save_config(clients)
157160
clients.each { |c| c.cluster(:saveconfig) }
158161
end
159162

160-
def wait_cluster_building(clients, max_attempts: 200)
161-
attempt_count = 0
163+
def wait_cluster_building(clients, max_attempts: 600)
164+
wait_for_state(clients, max_attempts) do |client|
165+
info = hashify_cluster_info(client)
166+
info['cluster_state'] == 'ok'
167+
end
168+
end
162169

163-
clients.each do |client|
164-
loop do
165-
info = hashify_cluster_info(client)
166-
attempt_count += 1
167-
break if info['cluster_state'] == 'ok' || attempt_count > max_attempts
168-
sleep 0.1
169-
end
170+
def wait_replication(clients, max_attempts: 600)
171+
wait_for_state(clients, max_attempts) do |client|
172+
flags = hashify_cluster_node_flags(client)
173+
flags.values.select { |f| f == 'slave' }.size == 3
174+
end
175+
end
176+
177+
def wait_failover(master_key, slave_key, clients, max_attempts: 600)
178+
wait_for_state(clients, max_attempts) do |client|
179+
flags = hashify_cluster_node_flags(client)
180+
flags[master_key] == 'slave' && flags[slave_key] == 'master'
170181
end
171182
end
172183

173-
def wait_failover(master_key, slave_key, clients, max_attempts: 200)
174-
attempt_count = 0
184+
def wait_replication_delay(clients, timeout_sec)
185+
timeout_msec = timeout_sec.to_i * 1000
186+
wait_for_state(clients, clients.size + 1) do |client|
187+
client.wait(1, timeout_msec) if client.role.first == 'master'
188+
true
189+
end
190+
end
175191

192+
def wait_cluster_recovering(clients, max_attempts: 600)
193+
key = 0
194+
wait_for_state(clients, max_attempts) do |client|
195+
begin
196+
client.get(key) if client.role.first == 'master'
197+
true
198+
rescue Redis::CommandError => err
199+
if err.message.start_with?('CLUSTERDOWN')
200+
false
201+
elsif err.message.start_with?('MOVED')
202+
key += 1
203+
false
204+
else
205+
true
206+
end
207+
end
208+
end
209+
end
210+
211+
def wait_for_state(clients, max_attempts)
212+
attempt_count = 1
176213
clients.each do |client|
177-
loop do
178-
flags = hashify_cluster_node_flags(client)
214+
attempt_count.step(max_attempts) do |i|
215+
break if i >= max_attempts
179216
attempt_count += 1
180-
break if (flags[master_key] == 'slave' && flags[slave_key] == 'master') || attempt_count > max_attempts
217+
break if yield(client)
181218
sleep 0.1
182219
end
183220
end

0 commit comments

Comments
 (0)