|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +class ClusterController |
| 4 | + SLOT_SIZE = 16_384 |
| 5 | + |
| 6 | + def initialize(node_addrs, timeout: 30.0, reconnect_attempts: 3, **kwargs) |
| 7 | + raise 'Redis Cluster requires at least 3 master nodes.' if node_addrs.size < 3 |
| 8 | + |
| 9 | + @timeout = kwargs[:timeout] |
| 10 | + @clients = node_addrs.map do |addr| |
| 11 | + ::RedisClient.new( |
| 12 | + url: addr, |
| 13 | + timeout: timeout, |
| 14 | + reconnect_attempts: reconnect_attempts, |
| 15 | + **kwargs |
| 16 | + ) |
| 17 | + end |
| 18 | + end |
| 19 | + |
| 20 | + def wait_for_cluster_to_be_ready |
| 21 | + wait_meeting(@clients) |
| 22 | + wait_cluster_building(@clients) |
| 23 | + wait_replication(@clients) |
| 24 | + wait_cluster_recovering(@clients) |
| 25 | + end |
| 26 | + |
| 27 | + def rebuild |
| 28 | + flush_all_data(@clients) |
| 29 | + reset_cluster(@clients) |
| 30 | + assign_slots(@clients) |
| 31 | + save_config_epoch(@clients) |
| 32 | + meet_each_other(@clients) |
| 33 | + wait_meeting(@clients) |
| 34 | + replicate(@clients) |
| 35 | + save_config(@clients) |
| 36 | + wait_cluster_building(@clients) |
| 37 | + wait_replication(@clients) |
| 38 | + wait_cluster_recovering(@clients) |
| 39 | + end |
| 40 | + |
| 41 | + def down |
| 42 | + flush_all_data(@clients) |
| 43 | + reset_cluster(@clients) |
| 44 | + end |
| 45 | + |
| 46 | + def fail_serving_master |
| 47 | + master, slave = take_replication_pairs(@clients) |
| 48 | + master.shutdown |
| 49 | + attempt_count = 1 |
| 50 | + max_attempts = 500 |
| 51 | + attempt_count.step(max_attempts) do |i| |
| 52 | + break if slave.role == 'master' || i >= max_attempts |
| 53 | + |
| 54 | + attempt_count += 1 |
| 55 | + sleep 0.1 |
| 56 | + end |
| 57 | + end |
| 58 | + |
| 59 | + def failover |
| 60 | + master, slave = take_replication_pairs(@clients) |
| 61 | + wait_replication_delay(@clients, @timeout) |
| 62 | + slave.cluster(:failover, :takeover) |
| 63 | + wait_failover(to_node_key(master), to_node_key(slave), @clients) |
| 64 | + wait_replication_delay(@clients, @timeout) |
| 65 | + wait_cluster_recovering(@clients) |
| 66 | + end |
| 67 | + |
| 68 | + def start_resharding(slot, src_node_key, dest_node_key, slice_size: 10) |
| 69 | + node_map = hashify_node_map(@clients.first) |
| 70 | + src_node_id = node_map.fetch(src_node_key) |
| 71 | + src_client = find_client(@clients, src_node_key) |
| 72 | + dest_node_id = node_map.fetch(dest_node_key) |
| 73 | + dest_client = find_client(@clients, dest_node_key) |
| 74 | + dest_host, dest_port = dest_node_key.split(':') |
| 75 | + |
| 76 | + dest_client.call('CLUSTER', 'SETSLOT', slot, 'IMPORTING', src_node_id) |
| 77 | + src_client.call('CLUSTER', 'SETSLOT', slot, 'MIGRATING', dest_node_id) |
| 78 | + |
| 79 | + keys_count = src_client.call('CLUSTER', 'COUNTKEYSINSLOT', slot) |
| 80 | + loop do |
| 81 | + break if keys_count <= 0 |
| 82 | + |
| 83 | + keys = src_client.call('CLUSTER', 'GETKEYSINSLOT', slot, slice_size) |
| 84 | + break if keys.empty? |
| 85 | + |
| 86 | + keys.each do |k| |
| 87 | + src_client.call('MIGRATE', dest_host, dest_port, k) |
| 88 | + rescue ::RedisClient::CommandError => e |
| 89 | + raise unless e.message.start_with?('IOERR') |
| 90 | + |
| 91 | + src_client.call('MIGRATE', dest_host, dest_port, k, 'REPLACE') # retry once |
| 92 | + ensure |
| 93 | + keys_count -= 1 |
| 94 | + end |
| 95 | + end |
| 96 | + end |
| 97 | + |
| 98 | + def finish_resharding(slot, dest_node_key) |
| 99 | + node_map = hashify_node_map(@clients.first) |
| 100 | + @clients.first.call('CLUSTER', 'SETSLOT', slot, 'NODE', node_map.fetch(dest_node_key)) |
| 101 | + end |
| 102 | + |
| 103 | + def close |
| 104 | + @clients.each(&:close) |
| 105 | + end |
| 106 | + |
| 107 | + private |
| 108 | + |
| 109 | + def flush_all_data(clients) |
| 110 | + clients.each do |c| |
| 111 | + c.flushall |
| 112 | + rescue ::RedisClient::CommandError |
| 113 | + # READONLY You can't write against a read only slave. |
| 114 | + nil |
| 115 | + end |
| 116 | + end |
| 117 | + |
| 118 | + def reset_cluster(clients) |
| 119 | + clients.each { |c| c.cluster(:reset) } |
| 120 | + end |
| 121 | + |
| 122 | + def assign_slots(clients) |
| 123 | + masters = take_masters(clients) |
| 124 | + slot_slice = SLOT_SIZE / masters.size |
| 125 | + mod = SLOT_SIZE % masters.size |
| 126 | + slot_sizes = Array.new(masters.size, slot_slice) |
| 127 | + mod.downto(1) { |i| slot_sizes[i] += 1 } |
| 128 | + |
| 129 | + slot_idx = 0 |
| 130 | + masters.zip(slot_sizes).each do |c, s| |
| 131 | + slot_range = slot_idx..slot_idx + s - 1 |
| 132 | + c.cluster(:addslots, *slot_range.to_a) |
| 133 | + slot_idx += s |
| 134 | + end |
| 135 | + end |
| 136 | + |
| 137 | + def save_config_epoch(clients) |
| 138 | + clients.each_with_index do |c, i| |
| 139 | + c.cluster('set-config-epoch', i + 1) |
| 140 | + rescue ::RedisClient::CommandError |
| 141 | + # ERR Node config epoch is already non-zero |
| 142 | + nil |
| 143 | + end |
| 144 | + end |
| 145 | + |
| 146 | + def meet_each_other(clients) |
| 147 | + clients.each do |client| |
| 148 | + next if client.id == clients.first.id |
| 149 | + |
| 150 | + client.cluster(:meet, target_host, target_port) |
| 151 | + end |
| 152 | + end |
| 153 | + |
| 154 | + def wait_meeting(clients, max_attempts: 600) |
| 155 | + size = clients.size.to_s |
| 156 | + |
| 157 | + wait_for_state(clients, max_attempts) do |client| |
| 158 | + info = hashify_cluster_info(client) |
| 159 | + info['cluster_known_nodes'] == size |
| 160 | + end |
| 161 | + end |
| 162 | + |
| 163 | + def replicate(clients) |
| 164 | + node_map = hashify_node_map(clients.first) |
| 165 | + masters = take_masters(clients) |
| 166 | + |
| 167 | + take_slaves(clients).each_with_index do |slave, i| |
| 168 | + master_info = masters[i].connection |
| 169 | + master_host = master_info.fetch(:host) |
| 170 | + master_port = master_info.fetch(:port) |
| 171 | + |
| 172 | + loop do |
| 173 | + begin |
| 174 | + master_node_id = node_map.fetch("#{master_host}:#{master_port}") |
| 175 | + slave.cluster(:replicate, master_node_id) |
| 176 | + rescue ::RedisClient::CommandError |
| 177 | + # ERR Unknown node [key] |
| 178 | + sleep 0.1 |
| 179 | + node_map = hashify_node_map(clients.first) |
| 180 | + next |
| 181 | + end |
| 182 | + |
| 183 | + break |
| 184 | + end |
| 185 | + end |
| 186 | + end |
| 187 | + |
| 188 | + def save_config(clients) |
| 189 | + clients.each { |c| c.cluster(:saveconfig) } |
| 190 | + end |
| 191 | + |
| 192 | + def wait_cluster_building(clients, max_attempts: 600) |
| 193 | + wait_for_state(clients, max_attempts) do |client| |
| 194 | + info = hashify_cluster_info(client) |
| 195 | + info['cluster_state'] == 'ok' |
| 196 | + end |
| 197 | + end |
| 198 | + |
| 199 | + def wait_replication(clients, max_attempts: 600) |
| 200 | + wait_for_state(clients, max_attempts) do |client| |
| 201 | + flags = hashify_cluster_node_flags(client) |
| 202 | + flags.values.count { |f| f == 'slave' } == 3 |
| 203 | + end |
| 204 | + end |
| 205 | + |
| 206 | + def wait_failover(master_key, slave_key, clients, max_attempts: 600) |
| 207 | + wait_for_state(clients, max_attempts) do |client| |
| 208 | + flags = hashify_cluster_node_flags(client) |
| 209 | + flags[master_key] == 'slave' && flags[slave_key] == 'master' |
| 210 | + end |
| 211 | + end |
| 212 | + |
| 213 | + def wait_replication_delay(clients, timeout_sec) |
| 214 | + timeout_msec = timeout_sec.to_i * 1000 |
| 215 | + wait_for_state(clients, clients.size + 1) do |client| |
| 216 | + client.blocking_call('WAIT', 1, timeout_msec) if client.call('ROLE').first == 'master' |
| 217 | + true |
| 218 | + end |
| 219 | + end |
| 220 | + |
| 221 | + def wait_cluster_recovering(clients, max_attempts: 600) |
| 222 | + key = 0 |
| 223 | + wait_for_state(clients, max_attempts) do |client| |
| 224 | + client.call('GET', key) if client.call('ROLE').first == 'master' |
| 225 | + true |
| 226 | + rescue ::RedisClient::CommandError => e |
| 227 | + if e.message.start_with?('CLUSTERDOWN') |
| 228 | + false |
| 229 | + elsif e.message.start_with?('MOVED') |
| 230 | + key += 1 |
| 231 | + false |
| 232 | + else |
| 233 | + true |
| 234 | + end |
| 235 | + end |
| 236 | + end |
| 237 | + |
| 238 | + def wait_for_state(clients, max_attempts) |
| 239 | + attempt_count = 1 |
| 240 | + clients.each do |client| |
| 241 | + attempt_count.step(max_attempts) do |i| |
| 242 | + break if i >= max_attempts |
| 243 | + |
| 244 | + attempt_count += 1 |
| 245 | + break if yield(client) |
| 246 | + |
| 247 | + sleep 0.1 |
| 248 | + end |
| 249 | + end |
| 250 | + end |
| 251 | + |
| 252 | + def hashify_cluster_info(client) |
| 253 | + client.call('CLUSTER', 'INFO').split("\r\n").to_h do |str| |
| 254 | + arr = str.split(':') |
| 255 | + arr[arr.size - 2..] |
| 256 | + end |
| 257 | + end |
| 258 | + |
| 259 | + def hashify_cluster_node_flags(client) |
| 260 | + client.call('CLUSTER', 'NODES').split("\n").map(&:split) |
| 261 | + .to_h { |arr| [arr[1].split('@').first, (arr[2].split(',') & %w[master slave]).first] } |
| 262 | + end |
| 263 | + |
| 264 | + def hashify_node_map(client) |
| 265 | + client.call('CLUSTER', 'NODES').split("\n").map(&:split).to_h { |arr| [arr[1].split('@').first, arr[0]] } |
| 266 | + end |
| 267 | + |
| 268 | + def take_masters(clients) |
| 269 | + size = clients.size / 2 |
| 270 | + return clients if size < 3 |
| 271 | + |
| 272 | + clients.take(size) |
| 273 | + end |
| 274 | + |
| 275 | + def take_slaves(clients) |
| 276 | + size = clients.size / 2 |
| 277 | + return [] if size < 3 |
| 278 | + |
| 279 | + clients[size..size * 2] |
| 280 | + end |
| 281 | + |
| 282 | + def take_replication_pairs(clients) |
| 283 | + [take_masters(clients).last, take_slaves(clients).last] |
| 284 | + end |
| 285 | + |
| 286 | + def find_client(clients, node_key) |
| 287 | + clients.find { |cli| node_key == to_node_key(cli) } |
| 288 | + end |
| 289 | + |
| 290 | + def to_node_key(client) |
| 291 | + "#{client.config.host}:#{client.config.port}" |
| 292 | + end |
| 293 | +end |
0 commit comments