Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ ABSL_DECLARE_FLAG(uint16_t, announce_port);
ABSL_FLAG(
int, replica_priority, 100,
"Published by info command for sentinel to pick replica based on score during a failover");
ABSL_FLAG(bool, experimental_replicaof_v2, true,
"Use ReplicaOfV2 algorithm for initiating replication");

namespace dfly {

Expand Down Expand Up @@ -542,13 +544,6 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
JoinDflyFlows();
if (sync_type == "full") {
service_.RemoveLoadingState();
} else if (service_.IsLoadingExclusively()) {
// We need this check. We originally set the state unconditionally to LOADING
// when we call ReplicaOf command. If for some reason we fail to start full sync below
// or cancel the context, we still need to switch to ACTIVE state.
// TODO(kostasrim) we can remove this once my proposed changes for replication move forward
// as the state transitions for ReplicaOf command will be much clearer.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
}
state_mask_ &= ~R_SYNCING;
last_journal_LSNs_.reset();
Expand Down
106 changes: 106 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, experimental_replicaof_v2);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
Expand Down Expand Up @@ -3555,6 +3556,11 @@ void ServerFamily::StopAllClusterReplicas() {
}

void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) {
const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2);
if (use_replica_of_v2) {
ReplicaOfInternalV2(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError);
return;
}
ReplicaOfInternal(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError);
Copy link
Collaborator

@romange romange Sep 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that changes are grouped together nicely. Let's introduce the flag experimental_replicaof_v2 set to true so we could have the fallback to the old variant

}

Expand All @@ -3568,9 +3574,109 @@ void ServerFamily::Replicate(string_view host, string_view port) {
CmdArgList args_list = absl::MakeSpan(args_vec);
io::NullSink sink;
facade::RedisReplyBuilder rb(&sink);
const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2);
if (use_replica_of_v2) {
ReplicaOfInternalV2(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication);
return;
}
ReplicaOfInternal(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication);
}

void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
util::fb2::LockGuard lk(replicaof_mu_);
ServerState* ss = ServerState::tlocal();

if (!ss->is_master) {
CHECK(replica_);

// flip flag before clearing replica_
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: please keep an empty line before comment lines. here and everywhere.

SetMasterFlagOnAllThreads(true);

last_master_data_ = replica_->Stop();
replica_.reset();
StopAllClusterReplicas();
}

// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);

return builder->SendOk();
}

void ServerFamily::ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_error)
ABSL_LOCKS_EXCLUDED(replicaof_mu_) {
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
if (!replicaof_args.has_value()) {
return;
}

LOG(INFO) << "Initiate replication with: " << *replicaof_args;
// This is a "weak" check. For example, if the node is already a replica,
// it could be the case that one of the flows disconnects. The MainReplicationFiber
// will then loop and if it can't partial sync it will enter LOADING state because of
// full sync. Note that the fiber is not aware of the replicaof_mu_ so even
// if that mutex is locked below before any state check we can't really enforce
// that the old replication fiber won't try to full sync and update the state to LOADING.
// What is more here is that we always call `replica->Stop()`. So even if we end up in the
// scenario described, the semantics are well defined. First, cancel the old replica and
// move on with the new one. Cancelation will be slower and ReplicaOf() will
// induce higher latency -- but that's ok because it's an highly improbable flow with
// well defined semantics.
ServerState* ss = ServerState::tlocal();

if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
builder->SendError(kLoadingErr);
return;
}

// replicaof no one
if (replicaof_args->IsReplicaOfNoOne()) {
return ReplicaOfNoOne(builder);
}

auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
GenericError ec;
switch (on_error) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start();
break;
case ActionOnConnectionFail::kContinueReplication:
new_replica->EnableReplication();
break;
};

if (ec || new_replica->IsContextCancelled()) {
return builder->SendError(ec ? ec.Format() : "replication cancelled");
}

// Critical section.
// 1. Stop the old replica_ if it exists
// 2. Update all the pointers to the new replica and update master flag
// 3. Start the main replication fiber
// 4. Send OK
util::fb2::LockGuard lk(replicaof_mu_);
std::optional<Replica::LastMasterSyncData> last_master_data;
if (replica_)
last_master_data = replica_->Stop();

StopAllClusterReplicas();

if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER)
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);

// TODO Update thread locals. That way INFO never blocks
replica_ = new_replica;
SetMasterFlagOnAllThreads(false);

if (on_error == ActionOnConnectionFail::kReturnOnError) {
replica_->StartMainReplicationFiber(last_master_data);
}

builder->SendOk();
}

// REPLTAKEOVER <seconds> [SAVE]
// SAVE is used only by tests.
void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) {
Expand Down
5 changes: 5 additions & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ class ServerFamily {
void ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_);

void ReplicaOfNoOne(SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
// REPLICAOF implementation without two phase locking.
void ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_);

struct LoadOptions {
std::string snapshot_id;
uint32_t shard_count = 0; // Shard count of the snapshot being loaded.
Expand Down
22 changes: 10 additions & 12 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl
"""
Issue 100 replication commands. This checks that the replication state
machine can handle cancellation well.
We assert that at least one command was cancelled.
After we finish the 'fuzzing' part, replicate the first master and check that
all the data is correct.
"""
COMMANDS_TO_ISSUE = 100

Expand All @@ -469,12 +466,8 @@ async def ping_status():
await asyncio.sleep(0.05)

async def replicate():
try:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
return True
except redis.exceptions.ResponseError as e:
assert e.args[0] == "replication cancelled"
return False
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
return True

ping_job = asyncio.create_task(ping_status())
replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)]
Expand All @@ -484,7 +477,7 @@ async def replicate():
num_successes += await result

logging.info(f"succeses: {num_successes}")
assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert COMMANDS_TO_ISSUE == num_successes

await wait_available_async(c_replica)
capture = await seeder.capture()
Expand All @@ -493,6 +486,12 @@ async def replicate():

ping_job.cancel()

replica.stop()
lines = replica.find_in_logs("Stopping replication")
# Cancelled 99 times by REPLICAOF command and once by Shutdown() because
# we stopped the instance
assert len(lines) == COMMANDS_TO_ISSUE


"""
Test flushall command. Set data to master send flashall and set more data.
Expand Down Expand Up @@ -2974,8 +2973,7 @@ async def replicate_inside_multi():
num_successes += await result

logging.info(f"succeses: {num_successes}")
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert num_successes > 0, "At least one REPLICAOF must success"
assert MULTI_COMMANDS_TO_ISSUE == num_successes


async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):
Expand Down
Loading