@@ -242,6 +242,11 @@ using detail::SaveStagesController;
242
242
using http::StringResponse;
243
243
using strings::HumanReadableNumBytes;
244
244
245
+ // Initialized by REPLICAOF
246
+ thread_local std::shared_ptr<Replica> tl_replica = nullptr ;
247
+ // Initialized by ADDREPLICAOF
248
+ thread_local std::vector<std::shared_ptr<Replica>> tl_cluster_replicas;
249
+
245
250
namespace {
246
251
247
252
// TODO these should be configurable as command line flag and at runtime via config set
@@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() {
1228
1233
dfly_cmd_->Shutdown ();
1229
1234
DebugCmd::Shutdown ();
1230
1235
});
1236
+
1237
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * cntx) {
1238
+ tl_replica = nullptr ;
1239
+ tl_cluster_replicas.clear ();
1240
+ });
1231
1241
}
1232
1242
1233
1243
bool ServerFamily::HasPrivilegedInterface () {
@@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
3130
3140
append (" psync_attempts" , rinfo.psync_attempts );
3131
3141
append (" psync_successes" , rinfo.psync_successes );
3132
3142
};
3133
- fb2::LockGuard lk (replicaof_mu_);
3143
+ // Deep copy because tl_replica might be overwritten inbetween
3144
+ auto replica = tl_replica;
3134
3145
3135
- replication_info_cb (replica_ ->GetSummary ());
3146
+ replication_info_cb (replica ->GetSummary ());
3136
3147
3148
+ // Deep copy because tl_cluster_replicas might be overwritten inbetween
3149
+ auto cluster_replicas = tl_cluster_replicas;
3137
3150
// Special case, when multiple masters replicate to a single replica.
3138
- for (const auto & replica : cluster_replicas_ ) {
3151
+ for (const auto & replica : cluster_replicas ) {
3139
3152
replication_info_cb (replica->GetSummary ());
3140
3153
}
3141
3154
}
@@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
3417
3430
}
3418
3431
LOG (INFO) << " Add Replica " << *replicaof_args;
3419
3432
3420
- auto add_replica = make_unique <Replica>(replicaof_args->host , replicaof_args->port , &service_,
3433
+ auto add_replica = make_shared <Replica>(replicaof_args->host , replicaof_args->port , &service_,
3421
3434
master_replid (), replicaof_args->slot_range );
3422
3435
GenericError ec = add_replica->Start ();
3423
3436
if (ec) {
@@ -3426,77 +3439,58 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
3426
3439
}
3427
3440
add_replica->StartMainReplicationFiber (nullopt );
3428
3441
cluster_replicas_.push_back (std::move (add_replica));
3429
- cmd_cntx.rb ->SendOk ();
3430
- }
3431
-
3432
- void ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3433
- ActionOnConnectionFail on_err) {
3434
- std::shared_ptr<Replica> new_replica;
3435
- std::optional<Replica::LastMasterSyncData> last_master_data;
3436
- {
3437
- util::fb2::LockGuard lk (replicaof_mu_); // Only one REPLICAOF command can run at a time
3438
-
3439
- // We should not execute replica of command while loading from snapshot.
3440
- ServerState* ss = ServerState::tlocal ();
3441
- if (ss->is_master && ss->gstate () == GlobalState::LOADING) {
3442
- builder->SendError (kLoadingErr );
3443
- return ;
3444
- }
3445
-
3446
- auto replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3447
- if (!replicaof_args.has_value ()) {
3448
- return ;
3449
- }
3450
-
3451
- LOG (INFO) << " Replicating " << *replicaof_args;
3452
-
3453
- // If NO ONE was supplied, just stop the current replica (if it exists)
3454
- if (replicaof_args->IsReplicaOfNoOne ()) {
3455
- if (!ss->is_master ) {
3456
- CHECK (replica_);
3457
-
3458
- SetMasterFlagOnAllThreads (true ); // Flip flag before clearing replica
3459
- last_master_data_ = replica_->Stop ();
3460
- replica_.reset ();
3461
3442
3462
- StopAllClusterReplicas ();
3463
- }
3443
+ service_. proactor_pool (). AwaitFiberOnAll (
3444
+ [ this ]( auto index, auto * cntx) { tl_cluster_replicas = cluster_replicas_; });
3464
3445
3465
- // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3466
- service_. SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3446
+ cmd_cntx. rb -> SendOk ();
3447
+ }
3467
3448
3468
- return builder->SendOk ();
3469
- }
3449
+ void ServerFamily::ReplicaOfNoOne (SinkReplyBuilder* builder) {
3450
+ util::fb2::LockGuard lk (replicaof_mu_);
3451
+ ServerState* ss = ServerState::tlocal ();
3470
3452
3471
- // If any replication is in progress, stop it, cancellation should kick in immediately
3453
+ if (ss->is_master && ss->gstate () == GlobalState::LOADING) {
3454
+ builder->SendError (kLoadingErr );
3455
+ return ;
3456
+ }
3472
3457
3473
- if (replica_)
3474
- last_master_data = replica_->Stop ();
3458
+ if (!ss->is_master ) {
3459
+ CHECK (replica_);
3460
+ // flip flag before clearing replica_
3461
+ SetMasterFlagOnAllThreads (true );
3462
+ // TODO we should not allow partial sync after NO-ONE. Only after Takeover.
3463
+ last_master_data_ = replica_->Stop ();
3464
+ // TODO set thread locals to nullptr
3465
+ replica_.reset ();
3475
3466
StopAllClusterReplicas ();
3467
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * cntx) { tl_replica = nullptr ; });
3468
+ }
3476
3469
3477
- const GlobalState gstate = ServerState::tlocal ()->gstate ();
3478
- if (gstate == GlobalState::TAKEN_OVER) {
3479
- service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3480
- } else if (auto new_state = service_.SwitchState (GlobalState::ACTIVE, GlobalState::LOADING);
3481
- new_state != GlobalState::LOADING) {
3482
- LOG (WARNING) << new_state << " in progress, ignored" ;
3483
- builder->SendError (" Invalid state" );
3484
- return ;
3485
- }
3470
+ // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3471
+ service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3472
+
3473
+ return builder->SendOk ();
3474
+ }
3486
3475
3487
- // Create a new replica and assign it
3488
- new_replica = make_shared<Replica>(replicaof_args-> host , replicaof_args-> port , &service_,
3489
- master_replid (), replicaof_args-> slot_range ) ;
3476
+ void ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3477
+ ActionOnConnectionFail on_err) {
3478
+ std::optional<Replica::LastMasterSyncData> last_master_data ;
3490
3479
3491
- replica_ = new_replica;
3480
+ auto replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3481
+ if (!replicaof_args.has_value ()) {
3482
+ return ;
3483
+ }
3492
3484
3493
- // TODO: disconnect pending blocked clients (pubsub, blocking commands)
3494
- SetMasterFlagOnAllThreads (false ); // Flip flag after assiging replica
3485
+ LOG (INFO) << " Initiate replication with: " << *replicaof_args;
3495
3486
3496
- } // release the lock, lk.unlock()
3497
- // We proceed connecting below without the lock to allow interrupting the replica immediately.
3498
- // From this point and onward, it should be highly responsive.
3487
+ // replicaof no one
3488
+ if (replicaof_args->IsReplicaOfNoOne ()) {
3489
+ return ReplicaOfNoOne (builder);
3490
+ }
3499
3491
3492
+ auto new_replica = make_shared<Replica>(replicaof_args->host , replicaof_args->port , &service_,
3493
+ master_replid (), replicaof_args->slot_range );
3500
3494
GenericError ec{};
3501
3495
switch (on_err) {
3502
3496
case ActionOnConnectionFail::kReturnOnError :
@@ -3507,30 +3501,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
3507
3501
break ;
3508
3502
};
3509
3503
3510
- // If the replication attempt failed, clean up global state. The replica should have stopped
3511
- // internally.
3512
- util::fb2::LockGuard lk (replicaof_mu_); // Only one REPLICAOF command can run at a time
3513
-
3514
- // If there was an error above during Start we must not start the main replication fiber.
3515
- // However, it could be the case that Start() above connected succefully and by the time
3516
- // we acquire the lock, the context got cancelled because another ReplicaOf command
3517
- // executed and acquired the replicaof_mu_ before us.
3518
- const bool cancelled = new_replica->IsContextCancelled ();
3519
- if (ec || cancelled) {
3520
- if (replica_ == new_replica) {
3521
- service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3522
- SetMasterFlagOnAllThreads (true );
3523
- replica_.reset ();
3524
- }
3525
- builder->SendError (ec ? ec.Format () : " replication cancelled" );
3526
- return ;
3504
+ if (ec) {
3505
+ return builder->SendError (ec.Format ());
3527
3506
}
3528
- // Successfully connected now we flush
3529
- // If we are called by "Replicate", tx will be null but we do not need
3530
- // to flush anything.
3507
+
3508
+ util::fb2::LockGuard lk (replicaof_mu_);
3509
+ if (replica_)
3510
+ last_master_data = replica_->Stop ();
3511
+
3512
+ StopAllClusterReplicas ();
3513
+
3514
+ if (ServerState::tlocal ()->gstate () == GlobalState::TAKEN_OVER)
3515
+ service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3516
+
3517
+ // Update thread locals. That way INFO never blocks
3518
+ replica_ = new_replica;
3519
+ service_.proactor_pool ().AwaitFiberOnAll ([new_replica](auto index, auto * context) {
3520
+ tl_replica = new_replica;
3521
+ tl_cluster_replicas.clear ();
3522
+ });
3523
+ SetMasterFlagOnAllThreads (false );
3524
+
3531
3525
if (on_err == ActionOnConnectionFail::kReturnOnError ) {
3532
- new_replica ->StartMainReplicationFiber (last_master_data);
3526
+ replica_ ->StartMainReplicationFiber (last_master_data);
3533
3527
}
3528
+
3534
3529
builder->SendOk ();
3535
3530
}
3536
3531
@@ -3608,6 +3603,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
3608
3603
SetMasterFlagOnAllThreads (true );
3609
3604
last_master_data_ = replica_->Stop ();
3610
3605
replica_.reset ();
3606
+
3607
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * context) { tl_replica = nullptr ; });
3608
+
3611
3609
return builder->SendOk ();
3612
3610
}
3613
3611
0 commit comments