Skip to content

Commit 787e0c0

Browse files
committed
chore: almost mutex free replication
Signed-off-by: kostas <[email protected]>
1 parent 1894ba5 commit 787e0c0

File tree

3 files changed

+85
-92
lines changed

3 files changed

+85
-92
lines changed

src/server/replica.cc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -542,13 +542,6 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
542542
JoinDflyFlows();
543543
if (sync_type == "full") {
544544
service_.RemoveLoadingState();
545-
} else if (service_.IsLoadingExclusively()) {
546-
// We need this check. We originally set the state unconditionally to LOADING
547-
// when we call ReplicaOf command. If for some reason we fail to start full sync below
548-
// or cancel the context, we still need to switch to ACTIVE state.
549-
// TODO(kostasrim) we can remove this once my proposed changes for replication move forward
550-
// as the state transitions for ReplicaOf command will be much clearer.
551-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
552545
}
553546
state_mask_ &= ~R_SYNCING;
554547
last_journal_LSNs_.reset();

src/server/server_family.cc

Lines changed: 82 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ using detail::SaveStagesController;
242242
using http::StringResponse;
243243
using strings::HumanReadableNumBytes;
244244

245+
// Initialized by REPLICAOF
246+
thread_local std::shared_ptr<Replica> tl_replica = nullptr;
247+
// Initialized by ADDREPLICAOF
248+
thread_local std::vector<std::shared_ptr<Replica>> tl_cluster_replicas;
249+
245250
namespace {
246251

247252
// TODO these should be configurable as command line flag and at runtime via config set
@@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() {
12281233
dfly_cmd_->Shutdown();
12291234
DebugCmd::Shutdown();
12301235
});
1236+
1237+
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) {
1238+
tl_replica = nullptr;
1239+
tl_cluster_replicas.clear();
1240+
});
12311241
}
12321242

12331243
bool ServerFamily::HasPrivilegedInterface() {
@@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
31303140
append("psync_attempts", rinfo.psync_attempts);
31313141
append("psync_successes", rinfo.psync_successes);
31323142
};
3133-
fb2::LockGuard lk(replicaof_mu_);
3143+
// Deep copy because tl_replica might be overwritten inbetween
3144+
auto replica = tl_replica;
31343145

3135-
replication_info_cb(replica_->GetSummary());
3146+
replication_info_cb(replica->GetSummary());
31363147

3148+
// Deep copy because tl_cluster_replicas might be overwritten inbetween
3149+
auto cluster_replicas = tl_cluster_replicas;
31373150
// Special case, when multiple masters replicate to a single replica.
3138-
for (const auto& replica : cluster_replicas_) {
3151+
for (const auto& replica : cluster_replicas) {
31393152
replication_info_cb(replica->GetSummary());
31403153
}
31413154
}
@@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
34173430
}
34183431
LOG(INFO) << "Add Replica " << *replicaof_args;
34193432

3420-
auto add_replica = make_unique<Replica>(replicaof_args->host, replicaof_args->port, &service_,
3433+
auto add_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
34213434
master_replid(), replicaof_args->slot_range);
34223435
GenericError ec = add_replica->Start();
34233436
if (ec) {
@@ -3426,77 +3439,58 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
34263439
}
34273440
add_replica->StartMainReplicationFiber(nullopt);
34283441
cluster_replicas_.push_back(std::move(add_replica));
3429-
cmd_cntx.rb->SendOk();
3430-
}
3431-
3432-
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3433-
ActionOnConnectionFail on_err) {
3434-
std::shared_ptr<Replica> new_replica;
3435-
std::optional<Replica::LastMasterSyncData> last_master_data;
3436-
{
3437-
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
3438-
3439-
// We should not execute replica of command while loading from snapshot.
3440-
ServerState* ss = ServerState::tlocal();
3441-
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
3442-
builder->SendError(kLoadingErr);
3443-
return;
3444-
}
3445-
3446-
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
3447-
if (!replicaof_args.has_value()) {
3448-
return;
3449-
}
3450-
3451-
LOG(INFO) << "Replicating " << *replicaof_args;
3452-
3453-
// If NO ONE was supplied, just stop the current replica (if it exists)
3454-
if (replicaof_args->IsReplicaOfNoOne()) {
3455-
if (!ss->is_master) {
3456-
CHECK(replica_);
3457-
3458-
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
3459-
last_master_data_ = replica_->Stop();
3460-
replica_.reset();
34613442

3462-
StopAllClusterReplicas();
3463-
}
3443+
service_.proactor_pool().AwaitFiberOnAll(
3444+
[this](auto index, auto* cntx) { tl_cluster_replicas = cluster_replicas_; });
34643445

3465-
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3466-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
3446+
cmd_cntx.rb->SendOk();
3447+
}
34673448

3468-
return builder->SendOk();
3469-
}
3449+
void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
3450+
util::fb2::LockGuard lk(replicaof_mu_);
3451+
ServerState* ss = ServerState::tlocal();
34703452

3471-
// If any replication is in progress, stop it, cancellation should kick in immediately
3453+
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
3454+
builder->SendError(kLoadingErr);
3455+
return;
3456+
}
34723457

3473-
if (replica_)
3474-
last_master_data = replica_->Stop();
3458+
if (!ss->is_master) {
3459+
CHECK(replica_);
3460+
// flip flag before clearing replica_
3461+
SetMasterFlagOnAllThreads(true);
3462+
// TODO we should not allow partial sync after NO-ONE. Only after Takeover.
3463+
last_master_data_ = replica_->Stop();
3464+
// TODO set thread locals to nullptr
3465+
replica_.reset();
34753466
StopAllClusterReplicas();
3467+
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { tl_replica = nullptr; });
3468+
}
34763469

3477-
const GlobalState gstate = ServerState::tlocal()->gstate();
3478-
if (gstate == GlobalState::TAKEN_OVER) {
3479-
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);
3480-
} else if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
3481-
new_state != GlobalState::LOADING) {
3482-
LOG(WARNING) << new_state << " in progress, ignored";
3483-
builder->SendError("Invalid state");
3484-
return;
3485-
}
3470+
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3471+
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
3472+
3473+
return builder->SendOk();
3474+
}
34863475

3487-
// Create a new replica and assign it
3488-
new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
3489-
master_replid(), replicaof_args->slot_range);
3476+
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3477+
ActionOnConnectionFail on_err) {
3478+
std::optional<Replica::LastMasterSyncData> last_master_data;
34903479

3491-
replica_ = new_replica;
3480+
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
3481+
if (!replicaof_args.has_value()) {
3482+
return;
3483+
}
34923484

3493-
// TODO: disconnect pending blocked clients (pubsub, blocking commands)
3494-
SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica
3485+
LOG(INFO) << "Initiate replication with: " << *replicaof_args;
34953486

3496-
} // release the lock, lk.unlock()
3497-
// We proceed connecting below without the lock to allow interrupting the replica immediately.
3498-
// From this point and onward, it should be highly responsive.
3487+
// replicaof no one
3488+
if (replicaof_args->IsReplicaOfNoOne()) {
3489+
return ReplicaOfNoOne(builder);
3490+
}
34993491

3492+
auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
3493+
master_replid(), replicaof_args->slot_range);
35003494
GenericError ec{};
35013495
switch (on_err) {
35023496
case ActionOnConnectionFail::kReturnOnError:
@@ -3507,30 +3501,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
35073501
break;
35083502
};
35093503

3510-
// If the replication attempt failed, clean up global state. The replica should have stopped
3511-
// internally.
3512-
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
3513-
3514-
// If there was an error above during Start we must not start the main replication fiber.
3515-
// However, it could be the case that Start() above connected succefully and by the time
3516-
// we acquire the lock, the context got cancelled because another ReplicaOf command
3517-
// executed and acquired the replicaof_mu_ before us.
3518-
const bool cancelled = new_replica->IsContextCancelled();
3519-
if (ec || cancelled) {
3520-
if (replica_ == new_replica) {
3521-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
3522-
SetMasterFlagOnAllThreads(true);
3523-
replica_.reset();
3524-
}
3525-
builder->SendError(ec ? ec.Format() : "replication cancelled");
3526-
return;
3504+
if (ec) {
3505+
return builder->SendError(ec.Format());
35273506
}
3528-
// Successfully connected now we flush
3529-
// If we are called by "Replicate", tx will be null but we do not need
3530-
// to flush anything.
3507+
3508+
util::fb2::LockGuard lk(replicaof_mu_);
3509+
if (replica_)
3510+
last_master_data = replica_->Stop();
3511+
3512+
StopAllClusterReplicas();
3513+
3514+
if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER)
3515+
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);
3516+
3517+
// Update thread locals. That way INFO never blocks
3518+
replica_ = new_replica;
3519+
service_.proactor_pool().AwaitFiberOnAll([new_replica](auto index, auto* context) {
3520+
tl_replica = new_replica;
3521+
tl_cluster_replicas.clear();
3522+
});
3523+
SetMasterFlagOnAllThreads(false);
3524+
35313525
if (on_err == ActionOnConnectionFail::kReturnOnError) {
3532-
new_replica->StartMainReplicationFiber(last_master_data);
3526+
replica_->StartMainReplicationFiber(last_master_data);
35333527
}
3528+
35343529
builder->SendOk();
35353530
}
35363531

@@ -3608,6 +3603,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
36083603
SetMasterFlagOnAllThreads(true);
36093604
last_master_data_ = replica_->Stop();
36103605
replica_.reset();
3606+
3607+
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* context) { tl_replica = nullptr; });
3608+
36113609
return builder->SendOk();
36123610
}
36133611

src/server/server_family.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ class ServerFamily {
412412
// Set accepting_connections_ and update listners according to it
413413
void ChangeConnectionAccept(bool accept);
414414

415+
void ReplicaOfNoOne(SinkReplyBuilder* builder);
416+
415417
util::fb2::Fiber snapshot_schedule_fb_;
416418
util::fb2::Fiber load_fiber_;
417419

@@ -424,7 +426,7 @@ class ServerFamily {
424426

425427
mutable util::fb2::Mutex replicaof_mu_, save_mu_;
426428
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
427-
std::vector<std::unique_ptr<Replica>> cluster_replicas_
429+
std::vector<std::shared_ptr<Replica>> cluster_replicas_
428430
ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly
429431

430432
std::unique_ptr<ScriptMgr> script_mgr_;

0 commit comments

Comments
 (0)