Skip to content

Commit ae90499

Browse files
authored
refactor: cluster controller for test (#75)
1 parent b99346d commit ae90499

File tree

4 files changed

+146
-137
lines changed

4 files changed

+146
-137
lines changed

test/cluster_controller.rb

Lines changed: 126 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,32 @@ def down
5353
end
5454

5555
def failover
56-
master, slave = take_a_replication_pair(@clients)
56+
rows = associate_with_clients_and_nodes(@clients)
57+
primary_info = rows.find { |row| row[:role] == 'master' }
58+
replica_info = rows.find { |row| row[:primary_id] == primary_info[:id] }
59+
5760
wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout)
58-
slave.call('CLUSTER', 'FAILOVER', 'TAKEOVER')
59-
wait_failover(@clients, master_key: to_node_key(master), slave_key: to_node_key(slave), max_attempts: @max_attempts)
61+
replica_info.fetch(:client).call('CLUSTER', 'FAILOVER', 'TAKEOVER')
62+
wait_failover(
63+
@clients,
64+
primary_node_key: primary_info.fetch(:node_key),
65+
replica_node_key: replica_info.fetch(:node_key),
66+
max_attempts: @max_attempts
67+
)
6068
wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout)
6169
wait_cluster_recovering(@clients, max_attempts: @max_attempts)
6270
end
6371

64-
def start_resharding(slot:, src_node_key:, dest_node_key:)
65-
src_node_id = fetch_internal_id_by_natted_node_key(@clients.first, src_node_key)
66-
src_client = find_client_by_natted_node_key(@clients, src_node_key)
67-
dest_node_id = fetch_internal_id_by_natted_node_key(@clients.first, dest_node_key)
68-
dest_client = find_client_by_natted_node_key(@clients, dest_node_key)
69-
dest_host, dest_port = dest_node_key.split(':')
72+
def start_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Metrics/CyclomaticComplexity
73+
rows = associate_with_clients_and_nodes(@clients)
74+
src_info = rows.find { |r| r[:node_key] == src_node_key || r[:client_node_key] == src_node_key }
75+
dest_info = rows.find { |r| r[:node_key] == dest_node_key || r[:client_node_key] == dest_node_key }
76+
77+
src_node_id = src_info.fetch(:id)
78+
src_client = src_info.fetch(:client)
79+
dest_node_id = dest_info.fetch(:id)
80+
dest_client = dest_info.fetch(:client)
81+
dest_host, dest_port = dest_info.fetch(:node_key).split(':')
7082

7183
# @see https://redis.io/commands/cluster-setslot/#redis-cluster-live-resharding-explained
7284
dest_client.call('CLUSTER', 'SETSLOT', slot, 'IMPORTING', src_node_id)
@@ -91,11 +103,16 @@ def start_resharding(slot:, src_node_key:, dest_node_key:)
91103
wait_replication_delay(@clients, replica_size: @replica_size, timeout: @timeout)
92104
end
93105

94-
def finish_resharding(slot:, src_node_key:, dest_node_key:)
95-
id = fetch_internal_id_by_natted_node_key(@clients.first, dest_node_key)
96-
dest = find_client_by_natted_node_key(@clients, dest_node_key)
97-
src = find_client_by_natted_node_key(@clients, src_node_key)
98-
rest = take_masters(@clients, shard_size: @shard_size).reject { |c| c.equal?(dest) || c.equal?(src) }
106+
def finish_resharding(slot:, src_node_key:, dest_node_key:) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
107+
rows = associate_with_clients_and_nodes(@clients)
108+
src_info = rows.find { |r| r[:node_key] == src_node_key || r[:client_node_key] == src_node_key }
109+
dest_info = rows.find { |r| r[:node_key] == dest_node_key || r[:client_node_key] == dest_node_key }
110+
111+
src = src_info.fetch(:client)
112+
dest = dest_info.fetch(:client)
113+
id = dest_info.fetch(:id)
114+
rest = rows.reject { |r| r[:role] == 'slave' || r[:client].equal?(src) || r[:client].equal?(dest) }.map { |r| r[:client] }
115+
99116
([dest, src] + rest).each do |cli|
100117
cli.call('CLUSTER', 'SETSLOT', slot, 'NODE', id)
101118
rescue ::RedisClient::CommandError => e
@@ -104,9 +121,9 @@ def finish_resharding(slot:, src_node_key:, dest_node_key:)
104121
end
105122
end
106123

107-
def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticComplexity
124+
def scale_out(primary_url:, replica_url:)
108125
# @see https://redis.io/docs/manual/scaling/
109-
rows = fetch_and_parse_cluster_nodes(@clients)
126+
rows = associate_with_clients_and_nodes(@clients)
110127
target_host, target_port = rows.find { |row| row[:role] == 'master' }.fetch(:node_key).split(':')
111128

112129
primary = ::RedisClient.new(url: primary_url, **@kwargs)
@@ -126,38 +143,32 @@ def scale_out(primary_url:, replica_url:) # rubocop:disable Metrics/CyclomaticCo
126143
save_config(@clients)
127144
wait_for_cluster_to_be_ready
128145

129-
rows = fetch_and_parse_cluster_nodes(@clients)
146+
rows = associate_with_clients_and_nodes(@clients)
130147

131148
SLOT_SIZE.times.to_a.sample(100).sort.each do |slot|
132-
src = rows.find do |row|
133-
next if row[:slots].empty?
134-
135-
row[:slots].any? { |first, last| first <= slot && slot <= last }
136-
end.fetch(:node_key)
149+
src = rows.find { |row| row[:slots].include?(slot) }.fetch(:node_key)
137150
dest = rows.find { |row| row[:id] == primary_id }.fetch(:node_key)
138151
start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
139152
finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
140153
end
141154
end
142155

143156
def scale_in # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
144-
rows = fetch_and_parse_cluster_nodes(@clients)
145-
primary_info = rows.reject { |r| r[:slots].empty? }.min_by { |r| r[:slots].flat_map { |start, last| (start..last).to_a }.size }
157+
rows = associate_with_clients_and_nodes(@clients)
158+
159+
primary_info = rows.reject { |r| r[:slots].empty? }.min_by { |r| r[:slots].size }
146160
replica_info = rows.find { |r| r[:primary_id] == primary_info[:id] }
147161
rest_primary_node_keys = rows.reject { |r| r[:id] == primary_info[:id] || r[:role] == 'slave' }.map { |r| r[:node_key] }
148162

149-
primary_info[:slots].each do |start, last|
150-
(start..last).each do |slot|
151-
src = primary_info.fetch(:node_key)
152-
dest = rest_primary_node_keys.sample
153-
start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
154-
finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
155-
end
163+
primary_info[:slots].each do |slot|
164+
src = primary_info.fetch(:node_key)
165+
dest = rest_primary_node_keys.sample
166+
start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
167+
finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
156168
end
157169

158-
id2cli = fetch_internal_id_to_client_mappings(@clients)
159-
replica = id2cli.fetch(replica_info[:id])
160-
primary = id2cli.fetch(primary_info[:id])
170+
replica = replica_info.fetch(:client)
171+
primary = primary_info.fetch(:client)
161172
threads = @clients.map do |cli|
162173
Thread.new(cli) do |c|
163174
Thread.pass
@@ -184,8 +195,31 @@ def scale_in # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedCo
184195
end
185196
end
186197

198+
def select_resharding_target(slot)
199+
rows = associate_with_clients_and_nodes(@clients)
200+
src = rows.find { |r| r[:role] == 'master' && r[:slots].include?(slot) }
201+
dest = rows.reject { |r| r[:role] == 'slave' || r[:id] == src[:id] }.sample
202+
[src.fetch(:node_key), dest.fetch(:node_key)]
203+
end
204+
205+
def select_sacrifice_of_primary
206+
rows = associate_with_clients_and_nodes(@clients)
207+
rows.select { |r| r[:role] == 'master' }
208+
.reject { |primary| rows.none? { |r| r[:primary_id] == primary[:id] } }
209+
.sample.fetch(:client)
210+
end
211+
212+
def select_sacrifice_of_replica
213+
rows = associate_with_clients_and_nodes(@clients)
214+
rows.select { |r| r[:role] == 'slave' }.sample.fetch(:client)
215+
end
216+
187217
def close
188-
@clients.each(&:close)
218+
@clients.each do |client|
219+
client.close
220+
rescue ::RedisClient::ConnectionError
221+
# ignore
222+
end
189223
end
190224

191225
private
@@ -194,7 +228,7 @@ def flush_all_data(clients)
194228
clients.each do |c|
195229
c.call('FLUSHALL')
196230
rescue ::RedisClient::CommandError
197-
# READONLY You can't write against a read only slave.
231+
# READONLY You can't write against a read only replica.
198232
nil
199233
end
200234
end
@@ -204,14 +238,14 @@ def reset_cluster(clients)
204238
end
205239

206240
def assign_slots(clients, shard_size:)
207-
masters = take_masters(clients, shard_size: shard_size)
208-
slot_slice = SLOT_SIZE / masters.size
209-
mod = SLOT_SIZE % masters.size
210-
slot_sizes = Array.new(masters.size, slot_slice)
241+
primaries = take_primaries(clients, shard_size: shard_size)
242+
slot_slice = SLOT_SIZE / primaries.size
243+
mod = SLOT_SIZE % primaries.size
244+
slot_sizes = Array.new(primaries.size, slot_slice)
211245
mod.downto(1) { |i| slot_sizes[i] += 1 }
212246

213247
slot_idx = 0
214-
masters.zip(slot_sizes).each do |c, s|
248+
primaries.zip(slot_sizes).each do |c, s|
215249
slot_range = slot_idx..slot_idx + s - 1
216250
c.call('CLUSTER', 'ADDSLOTS', *slot_range.to_a)
217251
slot_idx += s
@@ -228,7 +262,9 @@ def save_config_epoch(clients)
228262
end
229263

230264
def meet_each_other(clients)
231-
target_host, target_port = fetch_cluster_nodes(clients.first).first[1].split('@').first.split(':')
265+
rows = fetch_cluster_nodes(clients.first)
266+
rows = parse_cluster_nodes(rows)
267+
target_host, target_port = rows.first.fetch(:node_key).split(':')
232268
clients.drop(1).each { |c| c.call('CLUSTER', 'MEET', target_host, target_port) }
233269
end
234270

@@ -242,21 +278,19 @@ def wait_meeting(clients, max_attempts:)
242278
end
243279

244280
def replicate(clients, shard_size:, replica_size:)
245-
node_map = hashify_node_map(clients)
246-
masters = take_masters(clients, shard_size: shard_size)
281+
primaries = take_primaries(clients, shard_size: shard_size)
282+
replicas = take_replicas(clients, shard_size: shard_size)
247283

248-
take_slaves(clients, shard_size: shard_size).each_slice(replica_size).each_with_index do |slaves, i|
249-
master_host = masters[i].config.host
250-
master_port = masters[i].config.port
284+
replicas.each_slice(replica_size).each_with_index do |subset, i|
285+
primary_id = primaries[i].call('CLUSTER', 'MYID')
251286

252287
loop do
253288
begin
254-
master_node_id = node_map.fetch(to_node_key_by_host_port(master_host, master_port))
255-
slaves.each { |slave| slave.call('CLUSTER', 'REPLICATE', master_node_id) }
289+
subset.each { |replica| replica.call('CLUSTER', 'REPLICATE', primary_id) }
256290
rescue ::RedisClient::CommandError
257291
# ERR Unknown node [key]
258292
sleep 0.1
259-
node_map = hashify_node_map(clients)
293+
primary_id = primaries[i].call('CLUSTER', 'MYID')
260294
next
261295
end
262296

@@ -280,17 +314,21 @@ def wait_cluster_building(clients, max_attempts:)
280314

281315
def wait_replication(clients, number_of_replicas:, max_attempts:)
282316
wait_for_state(clients, max_attempts: max_attempts) do |client|
283-
flags = hashify_cluster_node_flags(clients, client: client)
284-
flags.values.count { |f| f == 'slave' } == number_of_replicas
317+
rows = fetch_cluster_nodes(client)
318+
rows = parse_cluster_nodes(rows)
319+
rows.count { |r| r[:role] == 'slave' } == number_of_replicas
285320
rescue ::RedisClient::ConnectionError
286321
true
287322
end
288323
end
289324

290-
def wait_failover(clients, master_key:, slave_key:, max_attempts:)
325+
def wait_failover(clients, primary_node_key:, replica_node_key:, max_attempts:)
291326
wait_for_state(clients, max_attempts: max_attempts) do |client|
292-
flags = hashify_cluster_node_flags(clients, client: client)
293-
flags[master_key] == 'slave' && flags[slave_key] == 'master'
327+
rows = fetch_cluster_nodes(client)
328+
rows = parse_cluster_nodes(rows)
329+
primary_info = rows.find { |r| r[:node_key] == primary_node_key || r[:client_node_key] == primary_node_key }
330+
replica_info = rows.find { |r| r[:node_key] == replica_node_key || r[:client_node_key] == replica_node_key }
331+
primary_info[:role] == 'slave' && replica_info[:role] == 'master'
294332
rescue ::RedisClient::ConnectionError
295333
true
296334
end
@@ -343,87 +381,53 @@ def hashify_cluster_info(client)
343381
client.call('CLUSTER', 'INFO').split("\r\n").to_h { |v| v.split(':') }
344382
end
345383

346-
def hashify_cluster_node_flags(clients, client: nil)
347-
id2key = fetch_internal_id_to_node_key_mappings(clients)
348-
fetch_cluster_nodes(client || clients.first)
349-
.to_h { |arr| [id2key[arr[0]], (arr[2].split(',') & %w[master slave]).first] }
384+
def fetch_cluster_nodes(client)
385+
client.call('CLUSTER', 'NODES').split("\n").map(&:split)
350386
end
351387

352-
def hashify_node_map(clients)
353-
id2key = fetch_internal_id_to_node_key_mappings(clients)
354-
clients.each do |client|
355-
return fetch_cluster_nodes(client).to_h { |arr| [id2key[arr[0]], arr[0]] }
388+
def associate_with_clients_and_nodes(clients)
389+
clients.filter_map do |client|
390+
rows = fetch_cluster_nodes(client)
391+
rows = parse_cluster_nodes(rows)
392+
row = rows.find { |r| r[:flags].include?('myself') }
393+
row.merge(client: client, client_node_key: "#{client.config.host}:#{client.config.port}")
356394
rescue ::RedisClient::ConnectionError
357395
next
358396
end
359397
end
360398

361-
def fetch_internal_id_by_natted_node_key(client, node_key)
362-
fetch_cluster_nodes(client).find { |info| info[1].split('@').first == node_key }.first
363-
end
364-
365-
def find_client_by_natted_node_key(clients, node_key)
366-
id = fetch_internal_id_by_natted_node_key(clients.first, node_key)
367-
id2key = fetch_internal_id_to_node_key_mappings(clients)
368-
key = id2key[id]
369-
clients.find { |cli| key == to_node_key(cli) }
370-
end
371-
372-
def fetch_cluster_nodes(client)
373-
client.call('CLUSTER', 'NODES').split("\n").map(&:split)
374-
end
375-
376-
def fetch_internal_id_to_node_key_mappings(clients)
377-
fetch_internal_id_to_client_mappings(clients).transform_values { |c| to_node_key(c) }
378-
end
379-
380-
def fetch_internal_id_to_client_mappings(clients)
381-
clients.to_h { |c| [c.call('CLUSTER', 'MYID'), c] }
382-
end
383-
384-
def fetch_and_parse_cluster_nodes(clients) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
385-
rows = fetch_cluster_nodes(clients.first)
386-
rows.each { |arr| arr[2] = arr[2].split(',') }
387-
rows.select! { |arr| arr[7] == 'connected' && (arr[2] & %w[fail? fail handshake noaddr noflags]).empty? }
388-
rows.each do |arr|
389-
arr[1] = arr[1].split('@').first
390-
arr[2] = (arr[2] & %w[master slave]).first
391-
if arr[8].nil?
392-
arr[8] = []
393-
next
394-
end
395-
arr[8] = arr[8..].filter_map { |str| str.start_with?('[') ? nil : str.split('-').map { |s| Integer(s) } }
396-
.map { |a| a.size == 1 ? a << a.first : a }.map(&:sort)
397-
end
398-
399-
rows.map do |arr|
400-
{ id: arr[0], node_key: arr[1], role: arr[2], primary_id: arr[3], ping_sent: arr[4],
401-
pong_recv: arr[5], config_epoch: arr[6], link_state: arr[7], slots: arr[8] }
399+
def parse_cluster_nodes(rows) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
400+
rows.map do |row|
401+
flags = row[2].split(',')
402+
slots = if row[8].nil?
403+
[]
404+
else
405+
row[8..].filter_map { |str| str.start_with?('[') ? nil : str.split('-').map { |s| Integer(s) } }
406+
.map { |a| a.size == 1 ? a << a.first : a }.map(&:sort)
407+
.flat_map { |first, last| (first..last).to_a }.sort
408+
end
409+
410+
{
411+
id: row[0],
412+
node_key: row[1].split('@').first,
413+
flags: flags,
414+
role: (flags & %w[master slave]).first,
415+
primary_id: row[3],
416+
ping_sent: row[4],
417+
pong_recv: row[5],
418+
config_epoch: row[6],
419+
link_state: row[7],
420+
slots: slots
421+
}
402422
end
403423
end
404424

405-
def take_masters(clients, shard_size:)
425+
def take_primaries(clients, shard_size:)
406426
clients.select { |cli| cli.call('ROLE').first == 'master' }.take(shard_size)
407427
end
408428

409-
def take_slaves(clients, shard_size:)
429+
def take_replicas(clients, shard_size:)
410430
replicas = clients.select { |cli| cli.call('ROLE').first == 'slave' }
411431
replicas.size.zero? ? clients[shard_size..] : replicas
412432
end
413-
414-
def take_a_replication_pair(clients)
415-
rows = fetch_and_parse_cluster_nodes(clients)
416-
primary = rows.find { |row| row[:role] == 'master' }
417-
replica = rows.find { |row| row[:primary_id] == primary[:id] }
418-
id2cli = fetch_internal_id_to_client_mappings(clients)
419-
[id2cli[primary[:id]], id2cli[replica[:id]]]
420-
end
421-
422-
def to_node_key(client)
423-
to_node_key_by_host_port(client.config.host, client.config.port)
424-
end
425-
426-
def to_node_key_by_host_port(host, port)
427-
"#{host}:#{port}"
428-
end
429433
end

0 commit comments

Comments
 (0)