@@ -1234,10 +1234,7 @@ void ServerFamily::Shutdown() {
1234
1234
DebugCmd::Shutdown ();
1235
1235
});
1236
1236
1237
- service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * cntx) {
1238
- tl_replica = nullptr ;
1239
- tl_cluster_replicas.clear ();
1240
- });
1237
+ UpdateReplicationThreadLocals (nullptr );
1241
1238
}
1242
1239
1243
1240
bool ServerFamily::HasPrivilegedInterface () {
@@ -3457,10 +3454,9 @@ void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
3457
3454
SetMasterFlagOnAllThreads (true );
3458
3455
// TODO we should not allow partial sync after NO-ONE. Only after Takeover.
3459
3456
last_master_data_ = replica_->Stop ();
3460
- // TODO set thread locals to nullptr
3461
3457
replica_.reset ();
3462
3458
StopAllClusterReplicas ();
3463
- service_. proactor_pool (). AwaitFiberOnAll ([]( auto index, auto * cntx) { tl_replica = nullptr ; } );
3459
+ UpdateReplicationThreadLocals ( nullptr );
3464
3460
}
3465
3461
3466
3462
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
@@ -3523,6 +3519,11 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
3523
3519
return builder->SendError (ec ? ec.Format () : " replication cancelled" );
3524
3520
}
3525
3521
3522
+ // Critical section.
3523
+ // 1. Stop the old replica_ if it exists
3524
+ // 2. Update all the pointers to the new replica and update master flag
3525
+ // 3. Start the main replication fiber
3526
+ // 4. Send OK
3526
3527
util::fb2::LockGuard lk (replicaof_mu_);
3527
3528
if (replica_)
3528
3529
last_master_data = replica_->Stop ();
@@ -3534,10 +3535,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
3534
3535
3535
3536
// Update thread locals. That way INFO never blocks
3536
3537
replica_ = new_replica;
3537
- service_.proactor_pool ().AwaitFiberOnAll ([new_replica](auto index, auto * context) {
3538
- tl_replica = new_replica;
3539
- tl_cluster_replicas.clear ();
3540
- });
3538
+ UpdateReplicationThreadLocals (new_replica);
3541
3539
SetMasterFlagOnAllThreads (false );
3542
3540
3543
3541
if (on_err == ActionOnConnectionFail::kReturnOnError ) {
@@ -3622,7 +3620,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
3622
3620
last_master_data_ = replica_->Stop ();
3623
3621
replica_.reset ();
3624
3622
3625
- service_. proactor_pool (). AwaitFiberOnAll ([]( auto index, auto * context) { tl_replica = nullptr ; } );
3623
+ UpdateReplicationThreadLocals ( nullptr );
3626
3624
3627
3625
return builder->SendOk ();
3628
3626
}
@@ -3916,6 +3914,13 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder,
3916
3914
}
3917
3915
}
3918
3916
3917
+ void ServerFamily::UpdateReplicationThreadLocals (std::shared_ptr<Replica> repl) {
3918
+ service_.proactor_pool ().AwaitFiberOnAll ([repl](auto index, auto * context) {
3919
+ tl_replica = nullptr ;
3920
+ tl_cluster_replicas.clear ();
3921
+ });
3922
+ }
3923
+
3919
3924
#define HFUNC (x ) SetHandler(HandlerFunc(this , &ServerFamily::x))
3920
3925
3921
3926
namespace acl {
0 commit comments