Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3131,13 +3131,23 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
CHECK(replica_);

SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
// We are responsive after we set the flag and as long the GLOBAL STATE is not loading
// ThisFiber::SleepFor(std::chrono::seconds(10))

// This is Fine! We can keep the lock for as much as we like. If we are not in FULL SYNC
// stage (which means that the global state is LOADING) then dragonfly can execute incoming
// requests/commands without an issue after the call to SetMasterFlagsOnAllThreads() above.
// What is more, is as long as we have thread locals on each proactor, we can be highly
// responsive for flows like INFO REPLICATION
last_master_data_ = replica_->Stop();
replica_.reset();

StopAllClusterReplicas();
}

// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
// Why we need this ?! If we are in LOADING stage because of the replica being in FULL SYNC
// it should clean up that state....
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);

return builder->SendOk();
Expand All @@ -3156,6 +3166,12 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
builder->SendError("Invalid state");
return;
}
// We are *NOT* responsive after we set the flag here AND UNTIL we finish FULL SYNC
// However, we might get cancelled by the time we lock again the mutex below
// or because `new_replica->Start()` failed.
// See also Service::VerifyCommandState()
// -> allowed_by_state = dfly_cntx.journal_emulated || (cid->opt_mask() & CO::LOADING);
// ThisFiber::SleepFor(std::chrono::seconds(10))

// Create a new replica and assign it
new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
Expand All @@ -3173,6 +3189,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
GenericError ec{};
switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
// Why?! We just Greet and set the ConnectionContext member variables via
// REPLCONF
ec = new_replica->Start();
break;
case ActionOnConnectionFail::kContinueReplication:
Expand Down Expand Up @@ -3269,6 +3287,10 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
return builder->SendError("Full sync not done");
}

// This takes a *lot* of time as it requires some work on the master side as well.
// We keep the lock here so commands like INFO REPLICATION are blocked.
// This should not be the case when we have thread locals for the Replica as it will
// allow other flows to make progress while we are TAKINGOVER and HOLDING the mutex.
std::error_code ec = replica_->TakeOver(ArgS(args, 0), save_flag);
if (ec)
return builder->SendError("Couldn't execute takeover");
Expand Down
Loading