Skip to content

Commit 62e9aa1

Browse files
authored
fix(replication): use correct replica pointer in takeover catchup loop (#6068)
* fix(replication): use correct replica pointer in takeover catchup loop * fix: review comments
1 parent c4d05ba commit 62e9aa1

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

src/server/dflycmd.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,8 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
567567
continue;
568568
}
569569

570-
auto cb = [&, end_time](EngineShard* shard) {
571-
if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) {
570+
auto cb = [repl_ptr = repl_ptr, end_time, &rest_catchup_success](EngineShard* shard) {
571+
if (!WaitReplicaFlowToCatchup(end_time, repl_ptr.get(), shard)) {
572572
rest_catchup_success.store(false);
573573
}
574574
};

tests/dragonfly/replication_test.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3562,3 +3562,44 @@ async def get_memory(client, field):
35623562
line = lines[0]
35633563
peak_bytes = extract_int_after_prefix("Serialization peak bytes: ", line)
35643564
assert peak_bytes < value_size
3565+
3566+
3567+
@pytest.mark.slow
3568+
async def test_takeover_bug_wrong_replica_checked_in_logs(df_factory):
3569+
master = df_factory.create(proactor_threads=4, vmodule="dflycmd=1")
3570+
replicas = [df_factory.create(proactor_threads=2) for _ in range(3)]
3571+
df_factory.start_all([master] + replicas)
3572+
3573+
c_master = master.client()
3574+
clients = [r.client() for r in replicas]
3575+
3576+
# Connect all replicas
3577+
for c in clients:
3578+
await c.execute_command(f"REPLICAOF localhost {master.port}")
3579+
await asyncio.gather(*[wait_available_async(c) for c in clients])
3580+
3581+
# Disconnect replica[1] to create lag
3582+
await clients[1].execute_command("REPLICAOF NO ONE")
3583+
3584+
# Write data that replica[1] will miss
3585+
pipe = c_master.pipeline()
3586+
for i in range(10000):
3587+
pipe.set(f"k{i}", "x" * 100)
3588+
await pipe.execute()
3589+
3590+
# Reconnect replica[1] and immediately takeover from replica[0]
3591+
await clients[1].execute_command(f"REPLICAOF localhost {master.port}")
3592+
try:
3593+
await clients[0].execute_command("REPLTAKEOVER 1")
3594+
except Exception:
3595+
pass
3596+
3597+
# Check master logs
3598+
master.stop(kill=False)
3599+
timeout_logs = master.find_in_logs("Couldn't synchronize with replica")
3600+
3601+
if timeout_logs:
3602+
for log in timeout_logs:
3603+
assert (
3604+
str(replicas[0].port) not in log
3605+
), f"BUG: Checked initiating replica {replicas[0].port} instead of others"

0 commit comments

Comments
 (0)