Skip to content

Commit 570fcfa

Browse files
authored
Add some test cases (#11)
1 parent ed7c2a3 commit 570fcfa

File tree

6 files changed

+380
-6
lines changed

6 files changed

+380
-6
lines changed

.github/workflows/test.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ jobs:
5151
- name: Docker compose up
5252
run: docker compose up -d
5353
- name: Wait for Redis cluster to be ready
54+
run: bundle exec rake wait
55+
- name: Print containers
5456
run: docker compose ps
5557
- name: Run minitest
5658
run: bundle exec rake test

Rakefile

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,17 @@ require 'rake/testtask'
44

55
task default: :test
66

7-
desc 'execute all test'
87
Rake::TestTask.new :test do |t|
98
t.libs << :test
109
t.libs << :lib
1110
t.test_files = Dir['test/**/test_*.rb']
1211
end
12+
13+
desc 'Wait for cluster to be ready'
14+
task :wait do
15+
$LOAD_PATH.unshift(File.expand_path('test', __dir__))
16+
require 'redis_client/cluster/controller'
17+
nodes = (7000..7005).map { |port| "redis://127.0.0.1:#{port}" }
18+
ctrl = ::RedisClient::Cluster::Controller.new(nodes)
19+
ctrl.wait_for_cluster_to_be_ready
20+
end

lib/redis_client/cluster/command.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ class Command
99
class << self
1010
def load(nodes)
1111
errors = nodes.map do |node|
12-
details = fetch_command_details(node)
12+
reply = node.call('COMMAND')
13+
details = parse_command_details(reply)
1314
return ::RedisClient::Cluster::Command.new(details)
1415
rescue ::RedisClient::ConnectionError, ::RedisClient::CommandError => e
1516
e
@@ -20,9 +21,9 @@ def load(nodes)
2021

2122
private
2223

23-
def fetch_command_details(node)
24-
node.call('COMMAND').to_h do |reply|
25-
[reply[0], { arity: reply[1], flags: reply[2], first: reply[3], last: reply[4], step: reply[5] }]
24+
def parse_command_details(rows)
25+
rows.to_h do |row|
26+
[row[0], { arity: row[1], flags: row[2], first: row[3], last: row[4], step: row[5] }]
2627
end
2728
end
2829
end
@@ -51,7 +52,7 @@ def should_send_to_replica?(command)
5152
private
5253

5354
def pick_details(details)
54-
details.transform_values do |detail|
55+
(details || {}).transform_values do |detail|
5556
{
5657
first_key_position: detail[:first],
5758
write: detail[:flags].include?('write'),
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
# frozen_string_literal: true
2+
3+
require 'redis_client'
4+
5+
class RedisClient
6+
class Cluster
7+
class Controller
8+
SLOT_SIZE = 16_384
9+
10+
def initialize(node_addrs, timeout: 30.0, reconnect_attempts: 10)
11+
raise 'Redis Cluster requires at least 3 master nodes.' if node_addrs.size < 3
12+
13+
@clients = node_addrs.map do |addr|
14+
RedisClient.new(url: addr, timeout: timeout, reconnect_attempts: reconnect_attempts)
15+
end
16+
17+
@timeout = timeout
18+
end
19+
20+
def wait_for_cluster_to_be_ready
21+
wait_meeting(@clients)
22+
wait_cluster_building(@clients)
23+
wait_replication(@clients)
24+
wait_cluster_recovering(@clients)
25+
end
26+
27+
def rebuild
28+
flush_all_data(@clients)
29+
reset_cluster(@clients)
30+
assign_slots(@clients)
31+
save_config_epoch(@clients)
32+
meet_each_other(@clients)
33+
wait_meeting(@clients)
34+
replicate(@clients)
35+
save_config(@clients)
36+
wait_cluster_building(@clients)
37+
wait_replication(@clients)
38+
wait_cluster_recovering(@clients)
39+
end
40+
41+
def down
42+
flush_all_data(@clients)
43+
reset_cluster(@clients)
44+
end
45+
46+
def fail_serving_master
47+
master, slave = take_replication_pairs(@clients)
48+
master.shutdown
49+
attempt_count = 1
50+
max_attempts = 500
51+
attempt_count.step(max_attempts) do |i|
52+
break if slave.role == 'master' || i >= max_attempts
53+
54+
attempt_count += 1
55+
sleep 0.1
56+
end
57+
end
58+
59+
def failover
60+
master, slave = take_replication_pairs(@clients)
61+
wait_replication_delay(@clients, @timeout)
62+
slave.cluster(:failover, :takeover)
63+
wait_failover(to_node_key(master), to_node_key(slave), @clients)
64+
wait_replication_delay(@clients, @timeout)
65+
wait_cluster_recovering(@clients)
66+
end
67+
68+
def start_resharding(slot, src_node_key, dest_node_key, slice_size: 10)
69+
node_map = hashify_node_map(@clients.first)
70+
src_node_id = node_map.fetch(src_node_key)
71+
src_client = find_client(@clients, src_node_key)
72+
dest_node_id = node_map.fetch(dest_node_key)
73+
dest_client = find_client(@clients, dest_node_key)
74+
dest_host, dest_port = dest_node_key.split(':')
75+
76+
dest_client.call('CLUSTER', 'SETSLOT', slot, 'IMPORTING', src_node_id)
77+
src_client.call('CLUSTER', 'SETSLOT', slot, 'MIGRATING', dest_node_id)
78+
79+
keys_count = src_client.call('CLUSTER', 'COUNTKEYSINSLOT', slot)
80+
loop do
81+
break if keys_count <= 0
82+
83+
keys = src_client.call('CLUSTER', 'GETKEYSINSLOT', slot, slice_size)
84+
break if keys.empty?
85+
86+
keys.each do |k|
87+
src_client.call('MIGRATE', dest_host, dest_port, k)
88+
rescue ::RedisClient::CommandError => e
89+
raise unless e.message.start_with?('IOERR')
90+
91+
src_client.call('MIGRATE', dest_host, dest_port, k, 'REPLACE') # retry once
92+
ensure
93+
keys_count -= 1
94+
end
95+
end
96+
end
97+
98+
def finish_resharding(slot, dest_node_key)
99+
node_map = hashify_node_map(@clients.first)
100+
@clients.first.call('CLUSTER', 'SETSLOT', slot, 'NODE', node_map.fetch(dest_node_key))
101+
end
102+
103+
def close
104+
@clients.each(&:close)
105+
end
106+
107+
private
108+
109+
def flush_all_data(clients)
110+
clients.each do |c|
111+
c.flushall
112+
rescue ::RedisClient::CommandError
113+
# READONLY You can't write against a read only slave.
114+
nil
115+
end
116+
end
117+
118+
def reset_cluster(clients)
119+
clients.each { |c| c.cluster(:reset) }
120+
end
121+
122+
def assign_slots(clients)
123+
masters = take_masters(clients)
124+
slot_slice = SLOT_SIZE / masters.size
125+
mod = SLOT_SIZE % masters.size
126+
slot_sizes = Array.new(masters.size, slot_slice)
127+
mod.downto(1) { |i| slot_sizes[i] += 1 }
128+
129+
slot_idx = 0
130+
masters.zip(slot_sizes).each do |c, s|
131+
slot_range = slot_idx..slot_idx + s - 1
132+
c.cluster(:addslots, *slot_range.to_a)
133+
slot_idx += s
134+
end
135+
end
136+
137+
def save_config_epoch(clients)
138+
clients.each_with_index do |c, i|
139+
c.cluster('set-config-epoch', i + 1)
140+
rescue ::RedisClient::CommandError
141+
# ERR Node config epoch is already non-zero
142+
nil
143+
end
144+
end
145+
146+
def meet_each_other(clients)
147+
clients.each do |client|
148+
next if client.id == clients.first.id
149+
150+
client.cluster(:meet, target_host, target_port)
151+
end
152+
end
153+
154+
def wait_meeting(clients, max_attempts: 600)
155+
size = clients.size.to_s
156+
157+
wait_for_state(clients, max_attempts) do |client|
158+
info = hashify_cluster_info(client)
159+
info['cluster_known_nodes'] == size
160+
end
161+
end
162+
163+
def replicate(clients)
164+
node_map = hashify_node_map(clients.first)
165+
masters = take_masters(clients)
166+
167+
take_slaves(clients).each_with_index do |slave, i|
168+
master_info = masters[i].connection
169+
master_host = master_info.fetch(:host)
170+
master_port = master_info.fetch(:port)
171+
172+
loop do
173+
begin
174+
master_node_id = node_map.fetch("#{master_host}:#{master_port}")
175+
slave.cluster(:replicate, master_node_id)
176+
rescue ::RedisClient::CommandError
177+
# ERR Unknown node [key]
178+
sleep 0.1
179+
node_map = hashify_node_map(clients.first)
180+
next
181+
end
182+
183+
break
184+
end
185+
end
186+
end
187+
188+
def save_config(clients)
189+
clients.each { |c| c.cluster(:saveconfig) }
190+
end
191+
192+
def wait_cluster_building(clients, max_attempts: 600)
193+
wait_for_state(clients, max_attempts) do |client|
194+
info = hashify_cluster_info(client)
195+
info['cluster_state'] == 'ok'
196+
end
197+
end
198+
199+
def wait_replication(clients, max_attempts: 600)
200+
wait_for_state(clients, max_attempts) do |client|
201+
flags = hashify_cluster_node_flags(client)
202+
flags.values.count { |f| f == 'slave' } == 3
203+
end
204+
end
205+
206+
def wait_failover(master_key, slave_key, clients, max_attempts: 600)
207+
wait_for_state(clients, max_attempts) do |client|
208+
flags = hashify_cluster_node_flags(client)
209+
flags[master_key] == 'slave' && flags[slave_key] == 'master'
210+
end
211+
end
212+
213+
def wait_replication_delay(clients, timeout_sec)
214+
timeout_msec = timeout_sec.to_i * 1000
215+
wait_for_state(clients, clients.size + 1) do |client|
216+
client.blocking_call('WAIT', 1, timeout_msec) if client.call('ROLE').first == 'master'
217+
true
218+
end
219+
end
220+
221+
def wait_cluster_recovering(clients, max_attempts: 600)
222+
key = 0
223+
wait_for_state(clients, max_attempts) do |client|
224+
client.call('GET', key) if client.call('ROLE').first == 'master'
225+
true
226+
rescue ::RedisClient::CommandError => e
227+
if e.message.start_with?('CLUSTERDOWN')
228+
false
229+
elsif e.message.start_with?('MOVED')
230+
key += 1
231+
false
232+
else
233+
true
234+
end
235+
end
236+
end
237+
238+
def wait_for_state(clients, max_attempts)
239+
attempt_count = 1
240+
clients.each do |client|
241+
attempt_count.step(max_attempts) do |i|
242+
break if i >= max_attempts
243+
244+
attempt_count += 1
245+
break if yield(client)
246+
247+
sleep 0.1
248+
end
249+
end
250+
end
251+
252+
def hashify_cluster_info(client)
253+
client.call('CLUSTER', 'INFO').split("\r\n").to_h { |str| str.split(':') }
254+
end
255+
256+
def hashify_cluster_node_flags(client)
257+
client.call('CLUSTER', 'NODES').split("\n").map(&:split)
258+
.to_h { |arr| [arr[1].split('@').first, (arr[2].split(',') & %w[master slave]).first] }
259+
end
260+
261+
def hashify_node_map(client)
262+
client.call('CLUSTER', 'NODES').split("\n").map(&:split).to_h { |arr| [arr[1].split('@').first, arr[0]] }
263+
end
264+
265+
def take_masters(clients)
266+
size = clients.size / 2
267+
return clients if size < 3
268+
269+
clients.take(size)
270+
end
271+
272+
def take_slaves(clients)
273+
size = clients.size / 2
274+
return [] if size < 3
275+
276+
clients[size..size * 2]
277+
end
278+
279+
def take_replication_pairs(clients)
280+
[take_masters(clients).last, take_slaves(clients).last]
281+
end
282+
283+
def find_client(clients, node_key)
284+
clients.find { |cli| node_key == to_node_key(cli) }
285+
end
286+
287+
def to_node_key(client)
288+
"#{client.config.host}:#{client.config.port}"
289+
end
290+
end
291+
end
292+
end

0 commit comments

Comments
 (0)