Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,6 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
JoinDflyFlows();
if (sync_type == "full") {
service_.RemoveLoadingState();
} else if (service_.IsLoadingExclusively()) {
// We need this check. We originally set the state unconditionally to LOADING
// when we call ReplicaOf command. If for some reason we fail to start full sync below
// or cancel the context, we still need to switch to ACTIVE state.
// TODO(kostasrim) we can remove this once my proposed changes for replication move forward
// as the state transitions for ReplicaOf command will be much clearer.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
}
state_mask_ &= ~R_SYNCING;
last_journal_LSNs_.reset();
Expand Down
182 changes: 99 additions & 83 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ using detail::SaveStagesController;
using http::StringResponse;
using strings::HumanReadableNumBytes;

// Initialized by REPLICAOF
thread_local std::shared_ptr<Replica> tl_replica = nullptr;
// Initialized by ADDREPLICAOF
thread_local std::vector<std::shared_ptr<Replica>> tl_cluster_replicas;

namespace {

// TODO these should be configurable as command line flag and at runtime via config set
Expand Down Expand Up @@ -1228,6 +1233,11 @@ void ServerFamily::Shutdown() {
dfly_cmd_->Shutdown();
DebugCmd::Shutdown();
});

service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) {
tl_replica = nullptr;
tl_cluster_replicas.clear();
});
}

bool ServerFamily::HasPrivilegedInterface() {
Expand Down Expand Up @@ -3130,12 +3140,15 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("psync_attempts", rinfo.psync_attempts);
append("psync_successes", rinfo.psync_successes);
};
fb2::LockGuard lk(replicaof_mu_);
// Deep copy because tl_replica might be overwritten inbetween
auto replica = tl_replica;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bread and butter no1 of this PR. Bye bye blocking info command because of the mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO worth adding a test


replication_info_cb(replica_->GetSummary());
replication_info_cb(replica->GetSummary());

// Deep copy because tl_cluster_replicas might be overwritten inbetween
auto cluster_replicas = tl_cluster_replicas;
// Special case, when multiple masters replicate to a single replica.
for (const auto& replica : cluster_replicas_) {
for (const auto& replica : cluster_replicas) {
replication_info_cb(replica->GetSummary());
}
}
Expand Down Expand Up @@ -3417,7 +3430,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
}
LOG(INFO) << "Add Replica " << *replicaof_args;

auto add_replica = make_unique<Replica>(replicaof_args->host, replicaof_args->port, &service_,
auto add_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
GenericError ec = add_replica->Start();
if (ec) {
Expand All @@ -3426,77 +3439,76 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
}
add_replica->StartMainReplicationFiber(nullopt);
cluster_replicas_.push_back(std::move(add_replica));
cmd_cntx.rb->SendOk();
}

void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_err) {
std::shared_ptr<Replica> new_replica;
std::optional<Replica::LastMasterSyncData> last_master_data;
{
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
service_.proactor_pool().AwaitFiberOnAll(
[this](auto index, auto* cntx)
ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; });

// We should not execute replica of command while loading from snapshot.
ServerState* ss = ServerState::tlocal();
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
builder->SendError(kLoadingErr);
return;
}

auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
if (!replicaof_args.has_value()) {
return;
}

LOG(INFO) << "Replicating " << *replicaof_args;

// If NO ONE was supplied, just stop the current replica (if it exists)
if (replicaof_args->IsReplicaOfNoOne()) {
if (!ss->is_master) {
CHECK(replica_);

SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
last_master_data_ = replica_->Stop();
replica_.reset();
cmd_cntx.rb->SendOk();
}

StopAllClusterReplicas();
}
void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
util::fb2::LockGuard lk(replicaof_mu_);
ServerState* ss = ServerState::tlocal();

// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
if (!ss->is_master) {
CHECK(replica_);
// flip flag before clearing replica_
SetMasterFlagOnAllThreads(true);
// TODO we should not allow partial sync after NO-ONE. Only after Takeover.
last_master_data_ = replica_->Stop();
// TODO set thread locals to nullptr
replica_.reset();
StopAllClusterReplicas();
service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* cntx) { tl_replica = nullptr; });
}

return builder->SendOk();
}
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);

// If any replication is in progress, stop it, cancellation should kick in immediately
return builder->SendOk();
}

if (replica_)
last_master_data = replica_->Stop();
StopAllClusterReplicas();
bool ServerFamily::IsDragonflyLoadingAtomic() {
util::fb2::LockGuard lk(replicaof_mu_);
ServerState* ss = ServerState::tlocal();

const GlobalState gstate = ServerState::tlocal()->gstate();
if (gstate == GlobalState::TAKEN_OVER) {
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);
} else if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
builder->SendError("Invalid state");
return;
}
return ss->is_master && ss->gstate() == GlobalState::LOADING;
}

// Create a new replica and assign it
new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_err) {
std::optional<Replica::LastMasterSyncData> last_master_data;

replica_ = new_replica;
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
if (!replicaof_args.has_value()) {
return;
}

// TODO: disconnect pending blocked clients (pubsub, blocking commands)
SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica
LOG(INFO) << "Initiate replication with: " << *replicaof_args;
// This is a "weak" check. For example, if the node is already a replica,
// it could be the case that one of the flows disconnects. The MainReplicationFiber
// will then loop and if it can't partial sync it will enter LOADING state because of
// full sync. Note that the fiber is not aware of the replicaof_mu_ so even
// if that mutex is locked below before any state check we can't really enforce
// that the old replication fiber won't try to full sync and update the state to LOADING.
// What is more here is that we always call `replica->Stop()`. So even if we end up in the
// scenario described, the semantics are well defined. First, cancel the old replica and
// move on with the new one. Cancelation will be slower and ReplicaOf() will
// induce higher latency -- but that's ok because it's an highly improbable flow with
// well defined semantics.
if (IsDragonflyLoadingAtomic()) {
builder->SendError(kLoadingErr);
return;
}

} // release the lock, lk.unlock()
// We proceed connecting below without the lock to allow interrupting the replica immediately.
// From this point and onward, it should be highly responsive.
// replicaof no one
if (replicaof_args->IsReplicaOfNoOne()) {
return ReplicaOfNoOne(builder);
}

auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
GenericError ec{};
switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
Expand All @@ -3507,30 +3519,31 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
break;
};

// If the replication attempt failed, clean up global state. The replica should have stopped
// internally.
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time

// If there was an error above during Start we must not start the main replication fiber.
// However, it could be the case that Start() above connected succefully and by the time
// we acquire the lock, the context got cancelled because another ReplicaOf command
// executed and acquired the replicaof_mu_ before us.
const bool cancelled = new_replica->IsContextCancelled();
if (ec || cancelled) {
if (replica_ == new_replica) {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
SetMasterFlagOnAllThreads(true);
replica_.reset();
}
builder->SendError(ec ? ec.Format() : "replication cancelled");
return;
if (ec || new_replica->IsContextCancelled()) {
return builder->SendError(ec ? ec.Format() : "replication cancelled");
}
// Successfully connected now we flush
// If we are called by "Replicate", tx will be null but we do not need
// to flush anything.

util::fb2::LockGuard lk(replicaof_mu_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bread and butter no2. No more loading state prematurely. We only get into loading_state if we are doing full sync. Otherwise, no state change at all.

if (replica_)
last_master_data = replica_->Stop();

StopAllClusterReplicas();

if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER)
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);

// Update thread locals. That way INFO never blocks
replica_ = new_replica;
service_.proactor_pool().AwaitFiberOnAll([new_replica](auto index, auto* context) {
tl_replica = new_replica;
tl_cluster_replicas.clear();
});
SetMasterFlagOnAllThreads(false);

if (on_err == ActionOnConnectionFail::kReturnOnError) {
new_replica->StartMainReplicationFiber(last_master_data);
replica_->StartMainReplicationFiber(last_master_data);
}

builder->SendOk();
}

Expand Down Expand Up @@ -3608,6 +3621,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
SetMasterFlagOnAllThreads(true);
last_master_data_ = replica_->Stop();
replica_.reset();

service_.proactor_pool().AwaitFiberOnAll([](auto index, auto* context) { tl_replica = nullptr; });

return builder->SendOk();
}

Expand Down
5 changes: 4 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,9 @@ class ServerFamily {
// Set accepting_connections_ and update listners according to it
void ChangeConnectionAccept(bool accept);

void ReplicaOfNoOne(SinkReplyBuilder* builder);
bool IsDragonflyLoadingAtomic();

util::fb2::Fiber snapshot_schedule_fb_;
util::fb2::Fiber load_fiber_;

Expand All @@ -424,7 +427,7 @@ class ServerFamily {

mutable util::fb2::Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
std::vector<std::unique_ptr<Replica>> cluster_replicas_
std::vector<std::shared_ptr<Replica>> cluster_replicas_
ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly

std::unique_ptr<ScriptMgr> script_mgr_;
Expand Down
11 changes: 5 additions & 6 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl
"""
Issue 100 replication commands. This checks that the replication state
machine can handle cancellation well.
We assert that at least one command was cancelled.
After we finish the 'fuzzing' part, replicate the first master and check that
all the data is correct.
"""
COMMANDS_TO_ISSUE = 100

Expand Down Expand Up @@ -484,7 +481,7 @@ async def replicate():
num_successes += await result

logging.info(f"succeses: {num_successes}")
assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert COMMANDS_TO_ISSUE == num_successes
Copy link
Contributor Author

@kostasrim kostasrim Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new algorithm does not use two phase locking so the following is no longer a possibility:

  1. client 1 -> REPLICAOF -> Locks the mutex -> updates replica_ to new_replica -> releases the mutex -> calls replica_->Start()
  2. client 2 -> REPLICAOF -> same as (1) but first calls replica_->Stop() -> releases the mutex
  3. client 1 -> REPLICAOF command grabs the second lock to the mutex, observes that the context got cancelled because of step (2) and boom returns "replication cancelled"

This can not happen anymore because we lock only once and atomically update everything including stopping the previous replica. So by the time (2) grabs the lock in the example above, the previous REPLICAOF command is not in some intermediate state. To observe that indeed we cancelled, we should read the logs and see ("Stopping replication") COMMANDS_TO_ISSUE - 1 times + 1 (because of the Shutdown() at the end)

Bonus points is that I suspect we might be able to also solve #4685 but I will need to follow up with that


await wait_available_async(c_replica)
capture = await seeder.capture()
Expand All @@ -493,6 +490,9 @@ async def replicate():

ping_job.cancel()

replica.stop()
lines = replica.find_in_logs("Stopping replication")


"""
Test flushall command. Set data to master send flashall and set more data.
Expand Down Expand Up @@ -2967,8 +2967,7 @@ async def replicate_inside_multi():
num_successes += await result

logging.info(f"succeses: {num_successes}")
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert num_successes > 0, "At least one REPLICAOF must success"
assert MULTI_COMMANDS_TO_ISSUE == num_successes


async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):
Expand Down
Loading