@@ -242,6 +242,11 @@ using detail::SaveStagesController;
242
242
using http::StringResponse;
243
243
using strings::HumanReadableNumBytes;
244
244
245
+ // Initialized by REPLICAOF
246
+ thread_local std::shared_ptr<Replica> tl_replica = nullptr ;
247
+ // Initialized by ADDREPLICAOF
248
+ thread_local std::vector<std::shared_ptr<Replica>> tl_cluster_replicas;
249
+
245
250
namespace {
246
251
247
252
// TODO these should be configurable as command line flag and at runtime via config set
@@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() {
1228
1233
dfly_cmd_->Shutdown ();
1229
1234
DebugCmd::Shutdown ();
1230
1235
});
1236
+
1237
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * cntx) {
1238
+ tl_replica = nullptr ;
1239
+ tl_cluster_replicas.clear ();
1240
+ });
1231
1241
}
1232
1242
1233
1243
bool ServerFamily::HasPrivilegedInterface () {
@@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
3130
3140
append (" psync_attempts" , rinfo.psync_attempts );
3131
3141
append (" psync_successes" , rinfo.psync_successes );
3132
3142
};
3133
- fb2::LockGuard lk (replicaof_mu_);
3143
+ // Deep copy because tl_replica might be overwritten inbetween
3144
+ auto replica = tl_replica;
3134
3145
3135
- replication_info_cb (replica_ ->GetSummary ());
3146
+ replication_info_cb (replica ->GetSummary ());
3136
3147
3148
+ // Deep copy because tl_cluster_replicas might be overwritten inbetween
3149
+ auto cluster_replicas = tl_cluster_replicas;
3137
3150
// Special case, when multiple masters replicate to a single replica.
3138
- for (const auto & replica : cluster_replicas_ ) {
3151
+ for (const auto & replica : cluster_replicas ) {
3139
3152
replication_info_cb (replica->GetSummary ());
3140
3153
}
3141
3154
}
@@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
3417
3430
}
3418
3431
LOG (INFO) << " Add Replica " << *replicaof_args;
3419
3432
3420
- auto add_replica = make_unique <Replica>(replicaof_args->host , replicaof_args->port , &service_,
3433
+ auto add_replica = make_shared <Replica>(replicaof_args->host , replicaof_args->port , &service_,
3421
3434
master_replid (), replicaof_args->slot_range );
3422
3435
GenericError ec = add_replica->Start ();
3423
3436
if (ec) {
@@ -3426,77 +3439,76 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
3426
3439
}
3427
3440
add_replica->StartMainReplicationFiber (nullopt );
3428
3441
cluster_replicas_.push_back (std::move (add_replica));
3429
- cmd_cntx.rb ->SendOk ();
3430
- }
3431
3442
3432
- void ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3433
- ActionOnConnectionFail on_err) {
3434
- std::shared_ptr<Replica> new_replica;
3435
- std::optional<Replica::LastMasterSyncData> last_master_data;
3436
- {
3437
- util::fb2::LockGuard lk (replicaof_mu_); // Only one REPLICAOF command can run at a time
3443
+ service_.proactor_pool ().AwaitFiberOnAll (
3444
+ [this ](auto index, auto * cntx)
3445
+ ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; });
3438
3446
3439
- // We should not execute replica of command while loading from snapshot.
3440
- ServerState* ss = ServerState::tlocal ();
3441
- if (ss->is_master && ss->gstate () == GlobalState::LOADING) {
3442
- builder->SendError (kLoadingErr );
3443
- return ;
3444
- }
3445
-
3446
- auto replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3447
- if (!replicaof_args.has_value ()) {
3448
- return ;
3449
- }
3450
-
3451
- LOG (INFO) << " Replicating " << *replicaof_args;
3452
-
3453
- // If NO ONE was supplied, just stop the current replica (if it exists)
3454
- if (replicaof_args->IsReplicaOfNoOne ()) {
3455
- if (!ss->is_master ) {
3456
- CHECK (replica_);
3457
-
3458
- SetMasterFlagOnAllThreads (true ); // Flip flag before clearing replica
3459
- last_master_data_ = replica_->Stop ();
3460
- replica_.reset ();
3447
+ cmd_cntx.rb ->SendOk ();
3448
+ }
3461
3449
3462
- StopAllClusterReplicas ();
3463
- }
3450
+ void ServerFamily::ReplicaOfNoOne (SinkReplyBuilder* builder) {
3451
+ util::fb2::LockGuard lk (replicaof_mu_);
3452
+ ServerState* ss = ServerState::tlocal ();
3464
3453
3465
- // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3466
- service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3454
+ if (!ss->is_master ) {
3455
+ CHECK (replica_);
3456
+ // flip flag before clearing replica_
3457
+ SetMasterFlagOnAllThreads (true );
3458
+ // TODO we should not allow partial sync after NO-ONE. Only after Takeover.
3459
+ last_master_data_ = replica_->Stop ();
3460
+ // TODO set thread locals to nullptr
3461
+ replica_.reset ();
3462
+ StopAllClusterReplicas ();
3463
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * cntx) { tl_replica = nullptr ; });
3464
+ }
3467
3465
3468
- return builder-> SendOk ();
3469
- }
3466
+ // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3467
+ service_. SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3470
3468
3471
- // If any replication is in progress, stop it, cancellation should kick in immediately
3469
+ return builder->SendOk ();
3470
+ }
3472
3471
3473
- if (replica_)
3474
- last_master_data = replica_-> Stop ( );
3475
- StopAllClusterReplicas ();
3472
+ bool ServerFamily::IsDragonflyLoadingAtomic () {
3473
+ util::fb2::LockGuard lk (replicaof_mu_ );
3474
+ ServerState* ss = ServerState::tlocal ();
3476
3475
3477
- const GlobalState gstate = ServerState::tlocal ()->gstate ();
3478
- if (gstate == GlobalState::TAKEN_OVER) {
3479
- service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3480
- } else if (auto new_state = service_.SwitchState (GlobalState::ACTIVE, GlobalState::LOADING);
3481
- new_state != GlobalState::LOADING) {
3482
- LOG (WARNING) << new_state << " in progress, ignored" ;
3483
- builder->SendError (" Invalid state" );
3484
- return ;
3485
- }
3476
+ return ss->is_master && ss->gstate () == GlobalState::LOADING;
3477
+ }
3486
3478
3487
- // Create a new replica and assign it
3488
- new_replica = make_shared<Replica>(replicaof_args-> host , replicaof_args-> port , &service_,
3489
- master_replid (), replicaof_args-> slot_range ) ;
3479
+ void ServerFamily::ReplicaOfInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3480
+ ActionOnConnectionFail on_err) {
3481
+ std::optional<Replica::LastMasterSyncData> last_master_data ;
3490
3482
3491
- replica_ = new_replica;
3483
+ auto replicaof_args = ReplicaOfArgs::FromCmdArgs (args, builder);
3484
+ if (!replicaof_args.has_value ()) {
3485
+ return ;
3486
+ }
3492
3487
3493
- // TODO: disconnect pending blocked clients (pubsub, blocking commands)
3494
- SetMasterFlagOnAllThreads (false ); // Flip flag after assiging replica
3488
+ LOG (INFO) << " Initiate replication with: " << *replicaof_args;
3489
+ // This is a "weak" check. For example, if the node is already a replica,
3490
+ // it could be the case that one of the flows disconnects. The MainReplicationFiber
3491
+ // will then loop and if it can't partial sync it will enter LOADING state because of
3492
+ // full sync. Note that the fiber is not aware of the replicaof_mu_ so even
3493
+ // if that mutex is locked below before any state check we can't really enforce
3494
+ // that the old replication fiber won't try to full sync and update the state to LOADING.
3495
+ // What is more here is that we always call `replica->Stop()`. So even if we end up in the
3496
+ // scenario described, the semantics are well defined. First, cancel the old replica and
3497
+ // move on with the new one. Cancelation will be slower and ReplicaOf() will
3498
+ // induce higher latency -- but that's ok because it's an highly improbable flow with
3499
+ // well defined semantics.
3500
+ if (IsDragonflyLoadingAtomic ()) {
3501
+ builder->SendError (kLoadingErr );
3502
+ return ;
3503
+ }
3495
3504
3496
- } // release the lock, lk.unlock()
3497
- // We proceed connecting below without the lock to allow interrupting the replica immediately.
3498
- // From this point and onward, it should be highly responsive.
3505
+ // replicaof no one
3506
+ if (replicaof_args->IsReplicaOfNoOne ()) {
3507
+ return ReplicaOfNoOne (builder);
3508
+ }
3499
3509
3510
+ auto new_replica = make_shared<Replica>(replicaof_args->host , replicaof_args->port , &service_,
3511
+ master_replid (), replicaof_args->slot_range );
3500
3512
GenericError ec{};
3501
3513
switch (on_err) {
3502
3514
case ActionOnConnectionFail::kReturnOnError :
@@ -3507,30 +3519,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
3507
3519
break ;
3508
3520
};
3509
3521
3510
- // If the replication attempt failed, clean up global state. The replica should have stopped
3511
- // internally.
3512
- util::fb2::LockGuard lk (replicaof_mu_); // Only one REPLICAOF command can run at a time
3513
-
3514
- // If there was an error above during Start we must not start the main replication fiber.
3515
- // However, it could be the case that Start() above connected succefully and by the time
3516
- // we acquire the lock, the context got cancelled because another ReplicaOf command
3517
- // executed and acquired the replicaof_mu_ before us.
3518
- const bool cancelled = new_replica->IsContextCancelled ();
3519
- if (ec || cancelled) {
3520
- if (replica_ == new_replica) {
3521
- service_.SwitchState (GlobalState::LOADING, GlobalState::ACTIVE);
3522
- SetMasterFlagOnAllThreads (true );
3523
- replica_.reset ();
3524
- }
3525
- builder->SendError (ec ? ec.Format () : " replication cancelled" );
3526
- return ;
3522
+ if (ec || new_replica->IsContextCancelled ()) {
3523
+ return builder->SendError (ec ? ec.Format () : " replication cancelled" );
3527
3524
}
3528
- // Successfully connected now we flush
3529
- // If we are called by "Replicate", tx will be null but we do not need
3530
- // to flush anything.
3525
+
3526
+ util::fb2::LockGuard lk (replicaof_mu_);
3527
+ if (replica_)
3528
+ last_master_data = replica_->Stop ();
3529
+
3530
+ StopAllClusterReplicas ();
3531
+
3532
+ if (ServerState::tlocal ()->gstate () == GlobalState::TAKEN_OVER)
3533
+ service_.SwitchState (GlobalState::TAKEN_OVER, GlobalState::LOADING);
3534
+
3535
+ // Update thread locals. That way INFO never blocks
3536
+ replica_ = new_replica;
3537
+ service_.proactor_pool ().AwaitFiberOnAll ([new_replica](auto index, auto * context) {
3538
+ tl_replica = new_replica;
3539
+ tl_cluster_replicas.clear ();
3540
+ });
3541
+ SetMasterFlagOnAllThreads (false );
3542
+
3531
3543
if (on_err == ActionOnConnectionFail::kReturnOnError ) {
3532
- new_replica ->StartMainReplicationFiber (last_master_data);
3544
+ replica_ ->StartMainReplicationFiber (last_master_data);
3533
3545
}
3546
+
3534
3547
builder->SendOk ();
3535
3548
}
3536
3549
@@ -3608,6 +3621,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
3608
3621
SetMasterFlagOnAllThreads (true );
3609
3622
last_master_data_ = replica_->Stop ();
3610
3623
replica_.reset ();
3624
+
3625
+ service_.proactor_pool ().AwaitFiberOnAll ([](auto index, auto * context) { tl_replica = nullptr ; });
3626
+
3611
3627
return builder->SendOk ();
3612
3628
}
3613
3629
0 commit comments