diff --git a/.gitignore b/.gitignore index d85087c459..b7de0bcd84 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ Makefile.dep .idea/* .ccls .ccls-cache/* +.DS_Store compile_commands.json redis.code-workspace .cache diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 22e2436d21..af13a89984 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2903,11 +2903,12 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc clusterNode *migration_source_node = NULL; for (j = 0; j < CLUSTER_SLOTS; j++) { + clusterNode *slot_owner = server.cluster->slots[j]; if (bitmapTestBit(slots, j)) { sender_slots++; /* The slot is already bound to the sender of this message. */ - if (server.cluster->slots[j] == sender) { + if (slot_owner == sender) { bitmapClearBit(server.cluster->owner_not_claiming_slot, j); continue; } @@ -2923,15 +2924,15 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * migration, we will accept the topology update regardless of the * epoch. */ if (isSlotUnclaimed(j) || - server.cluster->slots[j]->configEpoch < senderConfigEpoch || + slot_owner->configEpoch < senderConfigEpoch || clusterSlotFailoverGranted(j)) { - if (!isSlotUnclaimed(j) && !areInSameShard(server.cluster->slots[j], sender)) { + if (!isSlotUnclaimed(j) && !areInSameShard(slot_owner, sender)) { if (first_migrated_slot == -1) { /* Delay-initialize the range of migrated slots. */ first_migrated_slot = j; last_migrated_slot = j; - migration_source_node = server.cluster->slots[j]; - } else if (migration_source_node == server.cluster->slots[j] && j == last_migrated_slot + 1) { + migration_source_node = slot_owner; + } else if (migration_source_node == slot_owner && j == last_migrated_slot + 1) { /* Extend the range of migrated slots. */ last_migrated_slot = j; } else { @@ -2942,13 +2943,13 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc /* Reset the range for the next slot. */ first_migrated_slot = j; last_migrated_slot = j; - migration_source_node = server.cluster->slots[j]; + migration_source_node = slot_owner; } } /* Was this slot mine, and still contains keys? Mark it as * a dirty slot. */ - if (server.cluster->slots[j] == myself && countKeysInSlot(j) && sender != myself) { + if (slot_owner == myself && countKeysInSlot(j)) { dirty_slots[dirty_slots_count] = j; dirty_slots_count++; } @@ -2957,7 +2958,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (clusterIsSlotImporting(j)) importing_slots_count++; - if (server.cluster->slots[j] == cur_primary) { + if (slot_owner == cur_primary) { new_primary = sender; migrated_our_slots++; } @@ -2979,15 +2980,15 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc /* Handle the case where we are importing this slot and the ownership changes */ clusterNode *in = getImportingSlotSource(j); - if (in != NULL && - in != sender) { + if (in != NULL && in != sender) { /* Update importing_slots_from to point to the sender, if it is in the * same shard as the previous slot owner */ if (areInSameShard(sender, in)) { - serverLog(LL_VERBOSE, + serverLog(LL_NOTICE, "Failover occurred in migration source. Update importing " - "source for slot %d to node %.40s (%s) in shard %.40s.", - j, sender->name, sender->human_nodename, sender->shard_id); + "source for slot %d from node %.40s (%s) to node %.40s (%s) in shard %.40s.", + j, in->name, in->human_nodename, + sender->name, sender->human_nodename, sender->shard_id); setImportingSlotSource(j, sender); } else { /* If the sender is from a different shard, it must be a result @@ -3005,7 +3006,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } } else { - if (server.cluster->slots[j] == sender) { + if (slot_owner == sender) { /* The slot is currently bound to the sender but the sender is no longer * claiming it. We don't want to unbind the slot yet as it can cause the cluster * to move to FAIL state and also throw client error. Keeping the slot bound to @@ -3025,7 +3026,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc (mn->configEpoch < senderConfigEpoch || nodeIsReplica(mn)) && areInSameShard(mn, sender)) { - serverLog(LL_VERBOSE, + serverLog(LL_NOTICE, "Failover occurred in migration target." " Slot %d is now being migrated to node %.40s (%s) in shard %.40s.", j, sender->name, sender->human_nodename, sender->shard_id); @@ -3056,11 +3057,11 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc j, sender->name, sender->human_nodename, sender->shard_id); setImportingSlotSource(j, NULL); /* Take over the slot ownership if I am not the owner yet*/ - if (server.cluster->slots[j] != myself) { + if (slot_owner != myself) { /* A primary reason why we are here is likely due to my primary crashing during the * slot finalization process, leading me to become the new primary without * inheriting the slot ownership, while the source shard continued and relinquished - * theslot to its old primary. Under such circumstances, the node would undergo + * the slot to its old primary. Under such circumstances, the node would undergo * an election and have its config epoch increased with consensus. That said, we * will still explicitly bump the config epoch here to be consistent with the * existing practice. @@ -3102,7 +3103,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * its slots, this node should become a replica of the sender if * one of the following conditions is true: * - * 1. cluster-allow-replication-migration is enabled + * 1. cluster-allow-replica-migration is enabled * 2. all the lost slots go to the sender and the sender belongs * to this node's shard * @@ -3130,7 +3131,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG | CLUSTER_TODO_BROADCAST_ALL); - } else if (nodeIsPrimary(myself) && (sender_slots >= migrated_our_slots) && !are_in_same_shard) { + } else if (nodeIsPrimary(myself) && sender_slots >= migrated_our_slots) { /* When all our slots are lost to the sender and the sender belongs to * a different shard, this is likely due to a client triggered slot * migration. Don't reconfigure this node to migrate to the new shard @@ -3612,8 +3613,8 @@ int clusterProcessPacket(clusterLink *link) { uint16_t type = ntohs(hdr->type); if (server.debug_cluster_close_link_on_packet_drop && (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) { - freeClusterLink(link); serverLog(LL_WARNING, "Closing link for matching packet type %s", clusterGetMessageTypeString(type)); + freeClusterLink(link); return 0; } return 1; @@ -4046,10 +4047,13 @@ int clusterProcessPacket(clusterLink *link) { * Or a replica's primary can turn itself into a replica of its other * replica during a failover. In this case, they are in the same shard, * so we can try a psync. */ - serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", - myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1, - !areInSameShard(myself->replicaof->replicaof, myself)); + clusterNode *ultimate_primary = myself->replicaof->replicaof; + serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a " + "replica of %.40s (%s) from %.40s (%s)", + ultimate_primary->name, ultimate_primary->human_nodename, + myself->replicaof->name, myself->replicaof->human_nodename); + clusterSetPrimary(ultimate_primary, 1, + !areInSameShard(ultimate_primary, myself)); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG | CLUSTER_TODO_BROADCAST_ALL); } @@ -4073,15 +4077,14 @@ int clusterProcessPacket(clusterLink *link) { * instance claims is different compared to the set of slots we have * for it or if there was a failover in the sender's shard. Check * this ASAP to avoid other computational expensive checks later.*/ - if (sender && sender_claims_to_be_primary && (sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) { /* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */ serverAssert(nodeIsPrimary(sender)); - serverLog(LL_NOTICE, "Mismatch in topology information for sender node %.40s (%s) in shard %.40s", sender->name, - sender->human_nodename, sender->shard_id); - + serverLog(LL_NOTICE, "%s. Sender node %.40s (%s) in shard %.40s", + sender_last_reported_as_replica ? "Sender last reported as replica" : "Sender changed slots", + sender->name, sender->human_nodename, sender->shard_id); /* 1) If the sender of the message is a primary, and we detected that * the set of slots it claims changed, scan the slots to see if we * need to update our configuration. */ @@ -4125,6 +4128,21 @@ int clusterProcessPacket(clusterLink *link) { } } + /* In some corner case our primary might be failed, and it used to + * have a replica, which has become the new primary. We should start + * following this new primary, otherwise we would start failover and + * eventually become an empty primary. + */ + if (sender && sender_claims_to_be_primary && + nodeFailed(clusterNodeGetPrimary(myself)) && + areInSameShard(sender, myself) && + nodeIsReplica(myself) && + sender != clusterNodeGetPrimary(myself)) { + serverLog(LL_NOTICE, "Sender %.40s (%s) and I are in the same shard and I should follow it", + sender->name, sender->human_nodename); + clusterSetPrimary(sender, 1, 1); + } + /* If our config epoch collides with the sender's try to fix * the problem. */ if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) && @@ -4277,7 +4295,8 @@ void clusterLinkConnectHandler(connection *conn) { /* Check if connection succeeded */ if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s", node->name, node->ip, node->cport, + serverLog(LL_VERBOSE, "Connection with Node %.40s (%s) at %s:%d failed: %s", + node->name, node->human_nodename, node->ip, node->cport, connGetLastError(conn)); freeClusterLink(link); return; @@ -5345,6 +5364,15 @@ void clusterHandleReplicaFailover(void) { } } + /* A replica might not properly start its replication (e.g. the + * primary didn't reply to the replication request yet). + * In this case, it's not a valid candidate for promotion. */ + const long long repl_offset = replicationGetReplicaOffset(); + if (repl_offset == 0) { + serverLog(LL_NOTICE, "Cannot start election with zero replication offset"); + return; + } + /* If the previous failover attempt timeout and the retry time has * elapsed, we can setup a new one. */ if (auth_age > auth_retry_time) { @@ -5383,9 +5411,9 @@ void clusterHandleReplicaFailover(void) { "Start of election delayed for %lld milliseconds " "(rank #%d, primary rank #%d, offset %lld).", server.cluster->failover_auth_time - now, server.cluster->failover_auth_rank, - server.cluster->failover_failed_primary_rank, replicationGetReplicaOffset()); + server.cluster->failover_failed_primary_rank, repl_offset); /* Now that we have a scheduled election, broadcast our offset - * to all the other replicas so that they'll updated their offsets + * to all the other replicas so that they'll update their offsets * if our offset is better. */ clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_REPLICAS); diff --git a/src/replication.c b/src/replication.c index 82ee9450fa..42e7e5f265 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3896,13 +3896,13 @@ void syncWithPrimaryHandleError(connection **conn) { * * Motivation * * - Reduce primary memory load. We do that by moving the COB tracking to the replica side. This also decrease * the chance for COB overruns. Note that primary's input buffer limits at the replica side are less restricted - * then primary's COB as the replica plays less critical part in the replication group. While increasing the + * than primary's COB as the replica plays less critical part in the replication group. While increasing the * primary's COB may end up with primary reaching swap and clients suffering, at replica side we're more at * ease with it. Larger COB means better chance to sync successfully. * - Reduce primary main process CPU load. By opening a new, dedicated channel for the RDB transfer, child * processes can have direct access to the new channel. Due to TLS connection restrictions, this was not * possible using one main channel. We eliminate the need for the child process to use the primary's - * child-proc -> main-proc pipeline, thus freeing up the main process to process clients queries. + * child-proc -> main-proc pipeline, thus freeing up the main process to handle clients queries. * * * High level interface design * * - Dual channel sync begins when the replica sends a REPLCONF capa dual-channel to the primary during initial @@ -4083,7 +4083,7 @@ void syncWithPrimary(connection *conn) { if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING, "syncWithPrimary(): state machine error, " - "state should be RECEIVE_PSYNC but is %d", + "state should be RECEIVE_PSYNC_REPLY but is %d", server.repl_state); syncWithPrimaryHandleError(&conn); return; diff --git a/src/socket.c b/src/socket.c index 336b250cc6..34dabfb096 100644 --- a/src/socket.c +++ b/src/socket.c @@ -327,7 +327,7 @@ static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } - serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport); + serverLog(LL_VERBOSE, "Accepted client connection %s:%d", cip, cport); if (server.tcpkeepalive) anetKeepAlive(NULL, cfd, server.tcpkeepalive); acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), flags, cip); diff --git a/tests/instances.tcl b/tests/instances.tcl index 265fa054a3..5c3b13bac6 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -421,7 +421,7 @@ proc pause_on_error {} { # We redefine 'test' as for Sentinel we don't use the server-client # architecture for the test, everything is sequential. proc test {descr code} { - set ts [clock format [clock seconds] -format %H:%M:%S] + set ts [get_current_ts] puts -nonewline "$ts> $descr: " flush stdout diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index ee14c58648..0e9164ccad 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -89,20 +89,46 @@ proc fix_cluster {addr} { # Check if cluster configuration is consistent. # All the nodes in the cluster should show same slots configuration and have health # state "online" to be considered as consistent. -proc cluster_config_consistent {} { +# We can optionally provide the node index to ignore. This is useful +# when we deliberately stop a server and wait for the other nodes to +# converge without it. +proc cluster_config_consistent { ignored_idx } { + set base_cfg {} + + set ignored_port -1 + if { $ignored_idx != {} } { + set ignored_port [srv $ignored_idx port] + } + for {set j 0} {$j < [llength $::servers]} {incr j} { - # Check if all the nodes are online + set res [lsearch -exact ignored_idx $j] + if { $j == $ignored_idx } { + continue + } + + # Ensure all nodes believe the cluster is ok + set state [CI $j cluster_state] + if { $state ne "ok" } { + return 0 + } + + # Check if all the nodes are online, except the one + # we optionally ignore. set shards_cfg [R $j CLUSTER SHARDS] foreach shard_cfg $shards_cfg { set nodes [dict get $shard_cfg nodes] foreach node $nodes { + set node_port [dict get $node port] + if { $node_port == $ignored_port } { + continue + } if {[dict get $node health] ne "online"} { return 0 } } } - if {$j == 0} { + if {$base_cfg eq {}} { set base_cfg [R $j cluster slots] } else { if {[R $j cluster slots] != $base_cfg} { @@ -126,8 +152,16 @@ proc cluster_size_consistent {cluster_size} { # Wait for cluster configuration to propagate and be consistent across nodes. proc wait_for_cluster_propagation {} { + wait_for_cluster_propagation_except_node {} +} + +proc wait_for_cluster_propagation_except_node { ignored_idx } { + # wait_for_condition evaluates the code in different scope + # so we need to embed the argument first. + set condition_script [format {cluster_config_consistent {%s}} [join $ignored_idx " "]] + wait_for_condition 1000 50 { - [cluster_config_consistent] eq 1 + [eval $condition_script] eq 1 } else { for {set j 0} {$j < [llength $::servers]} {incr j} { puts "R $j cluster slots output: [R $j cluster slots]" diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 5a001c04a8..71f1fca23c 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -22,6 +22,16 @@ proc randstring {min max {type binary}} { return $output } +# Shows the timestamp to millisecond precision. This is helpful +# for cross-examining the Valkey server logs and TCL command logs. +proc get_current_ts {} { + set now_ms [clock milliseconds] + set now_s [expr {$now_ms / 1000}] + set ms_part [format "%03d" [expr {$now_ms % 1000}]] + set ts [clock format [clock seconds] -format %H:%M:%S] + return "$ts.$ms_part" +} + # Useful for some test proc zlistAlikeSort {a b} { if {[lindex $a 0] > [lindex $b 0]} {return 1} diff --git a/tests/unit/cluster/manual-failover.tcl b/tests/unit/cluster/manual-failover.tcl index 56952ff008..cec98f29d1 100644 --- a/tests/unit/cluster/manual-failover.tcl +++ b/tests/unit/cluster/manual-failover.tcl @@ -569,7 +569,7 @@ start_cluster 3 2 {tags {external:skip cluster}} { } # Make sure R4 indeed detect the sub-replica and fixed the replicaof. - set pattern "*I'm a sub-replica! Reconfiguring myself as a replica of $R3_nodeid from $R0_nodeid*" + set pattern "*I'm a sub-replica! Reconfiguring myself as a replica of $R3_nodeid* from $R0_nodeid*" verify_log_message -4 $pattern 0 R 3 debug disable-cluster-reconnection 0 diff --git a/tests/unit/cluster/replica-migration.tcl b/tests/unit/cluster/replica-migration.tcl index 591d732fce..847cd2fce6 100644 --- a/tests/unit/cluster/replica-migration.tcl +++ b/tests/unit/cluster/replica-migration.tcl @@ -1,3 +1,11 @@ +# Most test cases in this file would involve the following setup design: +# +# Replica 4 follows primary 0. Replica 7 follows primary 3, and they have +# only one slot. We migrate this slot to shard 0, thus making server 3 and 7 +# become primary 0's new replicas. At this point all these 4 servers are +# in the same shard. Then we stop primary 0 to trigger failover, and test +# how the remaining three servers would behave. + # Allocate slot 0 to the last primary and evenly distribute the remaining # slots to the remaining primaries. proc my_slot_allocation {masters replicas} { @@ -19,33 +27,80 @@ proc get_my_primary_peer {srv_idx} { return $primary_peer } -proc test_migrated_replica {type} { - test "Migrated replica reports zero repl offset and rank, and fails to win election - $type" { - # Write some data to primary 0, slot 1, make a small repl_offset. - for {set i 0} {$i < 1024} {incr i} { - R 0 incr key_991803 - } - assert_equal {1024} [R 0 get key_991803] +proc populate_data {} { + # Write some data to primary 0, slot 1, make a small repl_offset. + for {set i 0} {$i < 1024} {incr i} { + R 0 incr key_991803 + } + assert_equal {1024} [R 0 get key_991803] + + # Write some data to primary 3, slot 0, make a big repl_offset. + for {set i 0} {$i < 10240} {incr i} { + R 3 incr key_977613 + } + assert_equal {10240} [R 3 get key_977613] - # Write some data to primary 3, slot 0, make a big repl_offset. - for {set i 0} {$i < 10240} {incr i} { - R 3 incr key_977613 + # 10s, make sure primary 0 will hang in the save. + R 0 config set rdb-key-save-delay 100000000 +} + +proc stop_primary_0 { type } { + if {$type == "shutdown"} { + # Shutdown primary 0. + catch {R 0 shutdown nosave} + return -1 + } elseif {$type == "sigstop"} { + # Pause primary 0. + set primary0_pid [s 0 process_id] + pause_process $primary0_pid + return $primary0_pid + } +} + +proc resume_primary_0 { type primary0_pid } { + if {$type == "sigstop"} { + resume_process $primary0_pid + + # Wait for the old primary to go online and become a replica. + wait_for_condition 1000 50 { + [s 0 role] eq {slave} + } else { + fail "The old primary was not converted into replica" } - assert_equal {10240} [R 3 get key_977613] + } +} - # 10s, make sure primary 0 will hang in the save. - R 0 config set rdb-key-save-delay 100000000 +proc move_slot_0_from_primary_3_to_primary_0 {} { + set addr "[srv 0 host]:[srv 0 port]" + set src_node_id [R 3 CLUSTER MYID] + set code [catch { + exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $src_node_id=0 + } result] + if {$code != 0} { + fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" + } +} - # Move the slot 0 from primary 3 to primary 0 - set addr "[srv 0 host]:[srv 0 port]" - set myid [R 3 CLUSTER MYID] - set code [catch { - exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0 - } result] - if {$code != 0} { - fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" +# Wait until all given nodes have the expected key/value pairs. +proc wait_for_key_consistent {nodes kv_pairs} { + # Build one single boolean expression: [string equal [R n get key] val] && ... + set terms {} + foreach node $nodes { + dict for {key val} $kv_pairs { + # Use [list] to safely quote key/val into the format string + lappend terms [format {[string equal [R %d get %s] %s]} \ + $node [list $key] [list $val]] } + } + set condition [join $terms { && }] + + wait_for_condition 1000 50 $condition else { + fail "Keys not consistent" + } +} +proc test_migrated_replica {type} { + test "Migrated replica reports zero repl offset and rank, and fails to win election - $type" { # Validate that shard 3's primary and replica can convert to replicas after # they lose the last slot. R 3 config set cluster-replica-validity-factor 0 @@ -53,27 +108,32 @@ proc test_migrated_replica {type} { R 3 config set cluster-allow-replica-migration yes R 7 config set cluster-allow-replica-migration yes - if {$type == "shutdown"} { - # Shutdown primary 0. - catch {R 0 shutdown nosave} - } elseif {$type == "sigstop"} { - # Pause primary 0. - set primary0_pid [s 0 process_id] - pause_process $primary0_pid - } + populate_data + move_slot_0_from_primary_3_to_primary_0 + + # Ensure slot migration does happen + set R0_id [R 0 CLUSTER MYID] + wait_for_log_messages -3 [list "*Slot 0 is no longer being migrated to node $R0_id*"] 0 1000 10 + wait_for_log_messages -3 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R0_id*"] 0 1000 10 + wait_for_log_messages -7 [list "*Lost my last slot during slot migration. Reconfiguring myself as a replica of $R0_id*"] 0 1000 10 + + # This step is necessary. After server 7 reconfigures itself to follow + # primary 0, it will get blocked for a while and become unreachable to + # all nodes. See the `test_blocked_replica_stale_state_race` test below + # to learn more about this edge case. + # We need to send an arbitrary command (like PING here) to it to wait for + # it to become responsive. + R 7 PING + + # Stop primary 0 to start a failover. + set primary0_pid [stop_primary_0 $type] # Wait for the replica to become a primary, and make sure # the other primary become a replica. - wait_for_condition 1000 50 { - [s -4 role] eq {master} && - [s -3 role] eq {slave} && - [s -7 role] eq {slave} - } else { - puts "s -4 role: [s -4 role]" - puts "s -3 role: [s -3 role]" - puts "s -7 role: [s -7 role]" - fail "Failover does not happened" - } + set R4_id [R 4 CLUSTER MYID] + wait_for_log_messages -4 {"*Failover election won: I'm the new primary*"} 0 1000 10 + wait_for_log_messages -3 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 + wait_for_log_messages -7 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 # The node may not be able to initiate an election in time due to # problems with cluster communication. If an election is initiated, @@ -85,47 +145,18 @@ proc test_migrated_replica {type} { verify_log_message -7 "*Start of election*offset 0*" 0 } - # Make sure the right replica gets the higher rank. + # Make sure the right replica gets the highest rank. verify_log_message -4 "*Start of election*rank #0*" 0 # Wait for the cluster to be ok. - wait_for_condition 1000 50 { - [R 3 cluster slots] eq [R 4 cluster slots] && - [R 4 cluster slots] eq [R 7 cluster slots] && - [CI 3 cluster_state] eq "ok" && - [CI 4 cluster_state] eq "ok" && - [CI 7 cluster_state] eq "ok" - } else { - puts "R 3: [R 3 cluster info]" - puts "R 4: [R 4 cluster info]" - puts "R 7: [R 7 cluster info]" - fail "Cluster is down" - } + wait_for_cluster_propagation_except_node 0 # Make sure the key exists and is consistent. R 3 readonly R 7 readonly - wait_for_condition 1000 50 { - [R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 && - [R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 && - [R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240 - } else { - puts "R 3: [R 3 keys *]" - puts "R 4: [R 4 keys *]" - puts "R 7: [R 7 keys *]" - fail "Key not consistent" - } + wait_for_key_consistent {3 4 7} {key_991803 1024 key_977613 10240} - if {$type == "sigstop"} { - resume_process $primary0_pid - - # Wait for the old primary to go online and become a replica. - wait_for_condition 1000 50 { - [s 0 role] eq {slave} - } else { - fail "The old primary was not converted into replica" - } - } + resume_primary_0 $type $primary0_pid } } ;# proc @@ -139,46 +170,21 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout proc test_nonempty_replica {type} { test "New non-empty replica reports zero repl offset and rank, and fails to win election - $type" { - # Write some data to primary 0, slot 1, make a small repl_offset. - for {set i 0} {$i < 1024} {incr i} { - R 0 incr key_991803 - } - assert_equal {1024} [R 0 get key_991803] - - # Write some data to primary 3, slot 0, make a big repl_offset. - for {set i 0} {$i < 10240} {incr i} { - R 3 incr key_977613 - } - assert_equal {10240} [R 3 get key_977613] - - # 10s, make sure primary 0 will hang in the save. - R 0 config set rdb-key-save-delay 100000000 - - # Make server 7 a replica of server 0. R 7 config set cluster-replica-validity-factor 0 R 7 config set cluster-allow-replica-migration yes + + populate_data + move_slot_0_from_primary_3_to_primary_0 + # Make server 7 a replica of server 0. R 7 cluster replicate [R 0 cluster myid] - if {$type == "shutdown"} { - # Shutdown primary 0. - catch {R 0 shutdown nosave} - } elseif {$type == "sigstop"} { - # Pause primary 0. - set primary0_pid [s 0 process_id] - pause_process $primary0_pid - } + # Stop primary 0 to start a failover. + set primary0_pid [stop_primary_0 $type] # Wait for the replica to become a primary. - wait_for_condition 1000 50 { - [s -4 role] eq {master} && - [s -7 role] eq {slave} - } else { - puts "s -4 role: [s -4 role]" - puts "s -7 role: [s -7 role]" - fail "Failover does not happened" - } - - verify_log_message -4 "*Start of election*rank #0*" 0 + set R4_id [R 4 CLUSTER MYID] + wait_for_log_messages -4 {"*Failover election won: I'm the new primary*"} 0 1000 10 + wait_for_log_messages -7 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 # The node may not be able to initiate an election in time due to # problems with cluster communication. If an election is initiated, @@ -187,38 +193,17 @@ proc test_nonempty_replica {type} { verify_log_message -7 "*Start of election*offset 0*" 0 } + # Make sure the right replica gets the highest rank. + verify_log_message -4 "*Start of election*rank #0*" 0 + # Wait for the cluster to be ok. - wait_for_condition 1000 50 { - [R 4 cluster slots] eq [R 7 cluster slots] && - [CI 4 cluster_state] eq "ok" && - [CI 7 cluster_state] eq "ok" - } else { - puts "R 4: [R 4 cluster info]" - puts "R 7: [R 7 cluster info]" - fail "Cluster is down" - } + wait_for_cluster_propagation_except_node 0 # Make sure the key exists and is consistent. R 7 readonly - wait_for_condition 1000 50 { - [R 4 get key_991803] == 1024 && - [R 7 get key_991803] == 1024 - } else { - puts "R 4: [R 4 get key_991803]" - puts "R 7: [R 7 get key_991803]" - fail "Key not consistent" - } - - if {$type == "sigstop"} { - resume_process $primary0_pid + wait_for_key_consistent {4 7} {key_991803 1024} - # Wait for the old primary to go online and become a replica. - wait_for_condition 1000 50 { - [s 0 role] eq {slave} - } else { - fail "The old primary was not converted into replica" - } - } + resume_primary_0 $type $primary0_pid } } ;# proc @@ -232,37 +217,16 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout proc test_sub_replica {type} { test "Sub-replica reports zero repl offset and rank, and fails to win election - $type" { - # Write some data to primary 0, slot 1, make a small repl_offset. - for {set i 0} {$i < 1024} {incr i} { - R 0 incr key_991803 - } - assert_equal {1024} [R 0 get key_991803] - - # Write some data to primary 3, slot 0, make a big repl_offset. - for {set i 0} {$i < 10240} {incr i} { - R 3 incr key_977613 - } - assert_equal {10240} [R 3 get key_977613] - R 3 config set cluster-replica-validity-factor 0 R 7 config set cluster-replica-validity-factor 0 R 3 config set cluster-allow-replica-migration yes R 7 config set cluster-allow-replica-migration no - # 10s, make sure primary 0 will hang in the save. - R 0 config set rdb-key-save-delay 100000000 + populate_data + move_slot_0_from_primary_3_to_primary_0 - # Move slot 0 from primary 3 to primary 0. + # Make sure server 3 and server 7 become a replica of primary 0. set addr "[srv 0 host]:[srv 0 port]" - set myid [R 3 CLUSTER MYID] - set code [catch { - exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0 - } result] - if {$code != 0} { - fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" - } - - # Make sure server 3 and server 7 becomes a replica of primary 0. wait_for_condition 1000 50 { [get_my_primary_peer 3] eq $addr && [get_my_primary_peer 7] eq $addr @@ -273,29 +237,18 @@ proc test_sub_replica {type} { } # Make sure server 7 got a sub-replica log. - verify_log_message -7 "*I'm a sub-replica!*" 0 - - if {$type == "shutdown"} { - # Shutdown primary 0. - catch {R 0 shutdown nosave} - } elseif {$type == "sigstop"} { - # Pause primary 0. - set primary0_pid [s 0 process_id] - pause_process $primary0_pid - } + set R0_id [R 0 CLUSTER MYID] + verify_log_message -7 "*I'm a sub-replica! Reconfiguring myself as a replica of $R0_id*" 0 + + # Stop primary 0 to start a failover. + set primary0_pid [stop_primary_0 $type] # Wait for the replica to become a primary, and make sure # the other primary become a replica. - wait_for_condition 1000 50 { - [s -4 role] eq {master} && - [s -3 role] eq {slave} && - [s -7 role] eq {slave} - } else { - puts "s -4 role: [s -4 role]" - puts "s -3 role: [s -3 role]" - puts "s -7 role: [s -7 role]" - fail "Failover does not happened" - } + set R4_id [R 4 CLUSTER MYID] + wait_for_log_messages -4 {"*Failover election won: I'm the new primary*"} 0 1000 10 + wait_for_log_messages -3 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 + wait_for_log_messages -7 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 # The node may not be able to initiate an election in time due to # problems with cluster communication. If an election is initiated, @@ -307,44 +260,18 @@ proc test_sub_replica {type} { verify_log_message -7 "*Start of election*offset 0*" 0 } + # Make sure the right replica gets the highest rank. + verify_log_message -4 "*Start of election*rank #0*" 0 + # Wait for the cluster to be ok. - wait_for_condition 1000 50 { - [R 3 cluster slots] eq [R 4 cluster slots] && - [R 4 cluster slots] eq [R 7 cluster slots] && - [CI 3 cluster_state] eq "ok" && - [CI 4 cluster_state] eq "ok" && - [CI 7 cluster_state] eq "ok" - } else { - puts "R 3: [R 3 cluster info]" - puts "R 4: [R 4 cluster info]" - puts "R 7: [R 7 cluster info]" - fail "Cluster is down" - } + wait_for_cluster_propagation_except_node 0 # Make sure the key exists and is consistent. R 3 readonly R 7 readonly - wait_for_condition 1000 50 { - [R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 && - [R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 && - [R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240 - } else { - puts "R 3: [R 3 keys *]" - puts "R 4: [R 4 keys *]" - puts "R 7: [R 7 keys *]" - fail "Key not consistent" - } + wait_for_key_consistent {3 4 7} {key_991803 1024 key_977613 10240} - if {$type == "sigstop"} { - resume_process $primary0_pid - - # Wait for the old primary to go online and become a replica. - wait_for_condition 1000 50 { - [s 0 role] eq {slave} - } else { - fail "The old primary was not converted into replica" - } - } + resume_primary_0 $type $primary0_pid } } @@ -356,6 +283,111 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout test_sub_replica "sigstop" } my_slot_allocation cluster_allocate_replicas ;# start_cluster + +# This test setup is almost identical to the previous sub-replica test, except that +# the replica 7's config cluster-allow-replica-migration is set to yes here. +# +# This test reproduces a natural race condition in Valkey's replication handshake, +# where multiple replicas attempt to synchronize with the same primary at nearly +# the same time. It does not depend on artificial delay configurations. +# +# After migrating shard 3's last slot to primary 0, both primary 3 and its replica 7 +# attempt to replicate from the new primary. Typically, the first replica to send its +# PSYNC request (node 3) successfully enrolls for replication, triggering the master +# to fork the RDB child (RDB_CHILD_TYPE_SOCKET). The second replica (node 7), arriving +# milliseconds later, misses the join window and remains stuck waiting for a PING reply. +# +# While node 7 is in REPL_STATE_RECEIVE_PING_REPLY, its main thread is blocked in +# receiveSynchronousResponse(), unable to process other cluster messages. This makes +# node 7 appear unreachable to the rest of the cluster for roughly +# (5 × cluster-node-timeout) until it times out and retries the sync. +# +# During this blocking period, we stop primary 0 to trigger a failover. Replica 4 wins +# the election and becomes the new primary for the shard. At this moment: +# - Node 0 (stopped) +# - Node 3 (replica of 4) +# - Node 4 (new primary) +# - Node 7 (still replicating from 0) +# all belong to the same shard, but node 7 remains in an outdated state. +# +# When node 7 times out and reconnects, its outbound links are still valid, so it sends +# PINGs and receives a PONG from node 4, correctly identifying 4 as the new primary. +# However, inbound connections were previously closed by peers while node 7 was blocked. +# As node 7 reestablishes inbound sockets, it may receive delayed, stale cluster +# messages from node 4 — messages sent before failover, when 4 was still a replica of 0. +# +# Upon processing these stale messages, node 7 incorrectly believes node 4 is still +# following 0, reconfigures itself as a sub-replica of 0, and immediately starts a new +# election when it sees 0 marked as FAILED. Since it is now the only "replica" of 0, +# node 7 wins and becomes an empty primary in the same shard as node 4. +# +# This test verifies that such race condition does not lead to empty primaries or +# duplicate leaders within the same shard. The scenario reflects a realistic replication +# race that can occur whenever replicas connect during overlapping RDB saves or network +# partitions, even without artificial delays. +proc test_blocked_replica_stale_state_race {type} { + test "Blocked replica mistakenly become sub-replica and gets fixed later - $type" { + R 3 config set cluster-replica-validity-factor 0 + R 7 config set cluster-replica-validity-factor 0 + R 3 config set cluster-allow-replica-migration yes + R 7 config set cluster-allow-replica-migration yes + + populate_data + move_slot_0_from_primary_3_to_primary_0 + + # Make sure server 3 and server 7 become a replica of primary 0. + set R0_id [R 0 CLUSTER MYID] + wait_for_log_messages -3 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R0_id*"] 0 1000 10 + wait_for_log_messages -7 [list "*Lost my last slot during slot migration. Reconfiguring myself as a replica of $R0_id*"] 0 1000 10 + + # Stop primary 0 to start a failover. + set primary0_pid [stop_primary_0 $type] + + # Wait for the replica to become a primary, and make sure + # the other primary become a replica. + set R4_id [R 4 CLUSTER MYID] + wait_for_log_messages -4 {"*Failover election won: I'm the new primary*"} 0 1000 10 + wait_for_log_messages -3 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10 + + # Notice the ordering here is different from the previous sub-replica test function where + # the replica 7 becomes a sub-replica first and then reconfigures to follow primary 4. + # But here replica 7 reconfigures to follow primary 4 first and then mistakenly finds + # out it's a sub-replica. + set matched_result [wait_for_log_messages -7 [list "*Configuration change detected. Reconfiguring myself as a replica of node $R4_id*"] 0 1000 10] + set line_number [lindex $matched_result 1] + wait_for_log_messages -7 [list "*I'm a sub-replica! Reconfiguring myself as a replica of $R0_id*"] $line_number 1000 10 + + # Later replica 7 will start following primary 4 again. + wait_for_log_messages -7 [list "*Sender $R4_id* and I are in the same shard and I should follow it"] $line_number 1000 10 + + # Wait for the cluster to be ok. + wait_for_cluster_propagation_except_node 0 + + # Make sure the key exists and is consistent. + R 3 readonly + R 7 readonly + wait_for_key_consistent {3 4 7} {key_991803 1024 key_977613 10240} + + resume_primary_0 $type $primary0_pid + } +} + +# For this test case, we only do "sigstop" type. Sending shutdown would leave +# the primary 0 in state "Waiting for replicas before shutting down" and meanwhile +# replica 7 is waiting for replication reply from it. This creates a circular +# dependency, and thus replica 4 couldn't get promoted to primary during the +# replica 7 blocking period. Then we can't create the edge case we're trying +# test here. + +# This test is currently disabled because it's flaky. If server 7 receives all +# stale PING packets from server 4 (via inbound link) before receiving PONG reply +# from it (via outbound link), the tricky empty primary scenario won't happen, +# and thus this test case won't be applicable. + +#start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { +# test_blocked_replica_stale_state_race "sigstop" +#} my_slot_allocation cluster_allocate_replicas ;# start_cluster + proc test_cluster_setslot {type} { test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT - $type" { R 3 config set cluster-allow-replica-migration no @@ -367,15 +399,7 @@ proc test_cluster_setslot {type} { R 7 DEBUG DROP-CLUSTER-PACKET-FILTER 1 } - # Move slot 0 from primary 3 to primary 0. - set addr "[srv 0 host]:[srv 0 port]" - set myid [R 3 CLUSTER MYID] - set code [catch { - exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0 - } result] - if {$code != 0} { - fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" - } + move_slot_0_from_primary_3_to_primary_0 # Wait for R 3 to report that it is an empty primary (cluster-allow-replica-migration no) wait_for_log_messages -3 {"*I am now an empty primary*"} 0 1000 50 @@ -385,6 +409,7 @@ proc test_cluster_setslot {type} { } # Make sure server 3 lost its replica (server 7) and server 7 becomes a replica of primary 0. + set addr "[srv 0 host]:[srv 0 port]" wait_for_condition 1000 50 { [s -3 role] eq {master} && [s -3 connected_slaves] eq 0 &&