Skip to content

Commit 95ca692

Browse files
committed
clean up
1 parent bf087da commit 95ca692

File tree

3 files changed

+22
-17
lines changed

3 files changed

+22
-17
lines changed

src/server/server_family.cc

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,10 +1234,7 @@ void ServerFamily::Shutdown() {
12341234
DebugCmd::Shutdown();
12351235
});
12361236

1237-
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) {
1238-
tl_replica = nullptr;
1239-
tl_cluster_replicas.clear();
1240-
});
1237+
UpdateReplicationThreadLocals(nullptr);
12411238
}
12421239

12431240
bool ServerFamily::HasPrivilegedInterface() {
@@ -3457,10 +3454,9 @@ void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
34573454
SetMasterFlagOnAllThreads(true);
34583455
// TODO we should not allow partial sync after NO-ONE. Only after Takeover.
34593456
last_master_data_ = replica_->Stop();
3460-
// TODO set thread locals to nullptr
34613457
replica_.reset();
34623458
StopAllClusterReplicas();
3463-
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { tl_replica = nullptr; });
3459+
UpdateReplicationThreadLocals(nullptr);
34643460
}
34653461

34663462
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
@@ -3523,6 +3519,11 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
35233519
return builder->SendError(ec ? ec.Format() : "replication cancelled");
35243520
}
35253521

3522+
// Critical section.
3523+
// 1. Stop the old replica_ if it exists
3524+
// 2. Update all the pointers to the new replica and update master flag
3525+
// 3. Start the main replication fiber
3526+
// 4. Send OK
35263527
util::fb2::LockGuard lk(replicaof_mu_);
35273528
if (replica_)
35283529
last_master_data = replica_->Stop();
@@ -3534,10 +3535,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
35343535

35353536
// Update thread locals. That way INFO never blocks
35363537
replica_ = new_replica;
3537-
service_.proactor_pool().AwaitFiberOnAll([new_replica](auto index, auto* context) {
3538-
tl_replica = new_replica;
3539-
tl_cluster_replicas.clear();
3540-
});
3538+
UpdateReplicationThreadLocals(new_replica);
35413539
SetMasterFlagOnAllThreads(false);
35423540

35433541
if (on_err == ActionOnConnectionFail::kReturnOnError) {
@@ -3622,7 +3620,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
36223620
last_master_data_ = replica_->Stop();
36233621
replica_.reset();
36243622

3625-
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* context) { tl_replica = nullptr; });
3623+
UpdateReplicationThreadLocals(nullptr);
36263624

36273625
return builder->SendOk();
36283626
}
@@ -3916,6 +3914,13 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder,
39163914
}
39173915
}
39183916

3917+
void ServerFamily::UpdateReplicationThreadLocals(std::shared_ptr<Replica> repl) {
3918+
service_.proactor_pool().AwaitFiberOnAll([repl](auto index, auto* context) {
3919+
tl_replica = repl;
3920+
tl_cluster_replicas.clear();
3921+
});
3922+
}
3923+
39193924
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
39203925

39213926
namespace acl {

src/server/server_family.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ class ServerFamily {
414414

415415
void ReplicaOfNoOne(SinkReplyBuilder* builder);
416416
bool IsDragonflyLoadingAtomic();
417+
void UpdateReplicationThreadLocals(std::shared_ptr<Replica> repl);
417418

418419
util::fb2::Fiber snapshot_schedule_fb_;
419420
util::fb2::Fiber load_fiber_;

tests/dragonfly/replication_test.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -466,12 +466,8 @@ async def ping_status():
466466
await asyncio.sleep(0.05)
467467

468468
async def replicate():
469-
try:
470-
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
471-
return True
472-
except redis.exceptions.ResponseError as e:
473-
assert e.args[0] == "replication cancelled"
474-
return False
469+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
470+
return True
475471

476472
ping_job = asyncio.create_task(ping_status())
477473
replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)]
@@ -492,6 +488,9 @@ async def replicate():
492488

493489
replica.stop()
494490
lines = replica.find_in_logs("Stopping replication")
491+
# Cancelled 99 times by REPLICAOF command and once by Shutdown() because
492+
# we stopped the instance
493+
assert len(lines) == COMMANDS_TO_ISSUE
495494

496495

497496
"""

0 commit comments

Comments
 (0)