|
3 | 3 | require 'logger' |
4 | 4 | require 'json' |
5 | 5 | require 'testing_helper' |
| 6 | +require 'securerandom' |
6 | 7 |
|
7 | 8 | class TestAgainstClusterBroken < TestingWrapper |
8 | 9 | WAIT_SEC = 0.1 |
@@ -54,6 +55,41 @@ def test_client_patience |
54 | 55 | do_assertions(offset: 3) |
55 | 56 | end |
56 | 57 |
|
| 58 | + def test_reloading_on_connection_error |
| 59 | + sacrifice = @controller.select_sacrifice_of_primary |
| 60 | + # Find a key which lives on the sacrifice node |
| 61 | + test_key = generate_key_for_node(sacrifice) |
| 62 | + @clients[0].call('SET', test_key, 'foobar1') |
| 63 | + |
| 64 | + # Shut the node down. |
| 65 | + kill_a_node_and_wait_for_failover(sacrifice) |
| 66 | + |
| 67 | + # When we try and fetch the key, it'll attempt to connect to the broken node, and |
| 68 | + # thus trigger a reload of the cluster topology. |
| 69 | + assert_equal 'OK', @clients[0].call('SET', test_key, 'foobar2') |
| 70 | + end |
| 71 | + |
| 72 | + def test_transaction_retry_on_connection_error |
| 73 | + sacrifice = @controller.select_sacrifice_of_primary |
| 74 | + # Find a key which lives on the sacrifice node |
| 75 | + test_key = generate_key_for_node(sacrifice) |
| 76 | + @clients[0].call('SET', test_key, 'foobar1') |
| 77 | + |
| 78 | + call_count = 0 |
| 79 | + # Begin a transaction, but shut the node down after the WATCH is issued |
| 80 | + res = @clients[0].multi(watch: [test_key]) do |tx| |
| 81 | + kill_a_node_and_wait_for_failover(sacrifice) if call_count == 0 |
| 82 | + call_count += 1 |
| 83 | + tx.call('SET', test_key, 'foobar2') |
| 84 | + end |
| 85 | + |
| 86 | + # The transaction should have retried once and successfully completed |
| 87 | + # the second time. |
| 88 | + assert_equal ['OK'], res |
| 89 | + assert_equal 'foobar2', @clients[0].call('GET', test_key) |
| 90 | + assert_equal 2, call_count |
| 91 | + end |
| 92 | + |
57 | 93 | private |
58 | 94 |
|
59 | 95 | def prepare_test_data |
@@ -129,6 +165,18 @@ def do_assertions(offset:) |
129 | 165 | end |
130 | 166 | end |
131 | 167 |
|
| 168 | + def generate_key_for_node(conn) |
| 169 | + # Figure out a slot on the the sacrifice node, and a key in that slot. |
| 170 | + conn_id = conn.call('CLUSTER', 'MYID') |
| 171 | + conn_slots = conn.call('CLUSTER', 'SLOTS') |
| 172 | + .select { |res| res[2][2] == conn_id } |
| 173 | + .flat_map { |res| (res[0]..res[1]).to_a } |
| 174 | + loop do |
| 175 | + test_key = SecureRandom.hex |
| 176 | + return test_key if conn_slots.include?(conn.call('CLUSTER', 'KEYSLOT', test_key)) |
| 177 | + end |
| 178 | + end |
| 179 | + |
132 | 180 | def wait_for_replication(client) |
133 | 181 | client_side_timeout = TEST_TIMEOUT_SEC + 1.0 |
134 | 182 | server_side_timeout = (TEST_TIMEOUT_SEC * 1000).to_i |
@@ -211,6 +259,23 @@ def retryable(attempts: MAX_ATTEMPTS, wait_sec: WAIT_SEC) |
211 | 259 | end |
212 | 260 | end |
213 | 261 |
|
| 262 | + def kill_a_node_and_wait_for_failover(sacrifice) |
| 263 | + other_client = @controller.clients.reject { _1 == sacrifice }.first |
| 264 | + sacrifice_id = sacrifice.call('CLUSTER', 'MYID') |
| 265 | + kill_a_node(sacrifice) |
| 266 | + failover_checks = 0 |
| 267 | + loop do |
| 268 | + raise 'Timed out waiting for failover in kill_a_node_and_wait_for_failover' if failover_checks > 30 |
| 269 | + |
| 270 | + # Wait for the sacrifice node to not be a primary according to CLUSTER SLOTS. |
| 271 | + cluster_slots = other_client.call('CLUSTER', 'SLOTS') |
| 272 | + break unless cluster_slots.any? { _1[2][2] == sacrifice_id } |
| 273 | + |
| 274 | + sleep 1 |
| 275 | + failover_checks += 1 |
| 276 | + end |
| 277 | + end |
| 278 | + |
214 | 279 | def build_client( |
215 | 280 | custom: { captured_commands: @captured_commands, redirect_count: @redirect_count }, |
216 | 281 | middlewares: [::Middlewares::CommandCapture, ::Middlewares::RedirectCount], |
|
0 commit comments