Skip to content

Commit c9acc94

Browse files
authored
chore: new ReplicaOf algorithm (#5774)
Signed-off-by: kostas <[email protected]>
1 parent 2d0dd3c commit c9acc94

File tree

4 files changed

+123
-19
lines changed

4 files changed

+123
-19
lines changed

src/server/replica.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ ABSL_DECLARE_FLAG(uint16_t, announce_port);
6464
ABSL_FLAG(
6565
int, replica_priority, 100,
6666
"Published by info command for sentinel to pick replica based on score during a failover");
67+
ABSL_FLAG(bool, experimental_replicaof_v2, true,
68+
"Use ReplicaOfV2 algorithm for initiating replication");
6769

6870
namespace dfly {
6971

@@ -551,13 +553,6 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
551553
JoinDflyFlows();
552554
if (sync_type == "full") {
553555
service_.RemoveLoadingState();
554-
} else if (service_.IsLoadingExclusively()) {
555-
// We need this check. We originally set the state unconditionally to LOADING
556-
// when we call ReplicaOf command. If for some reason we fail to start full sync below
557-
// or cancel the context, we still need to switch to ACTIVE state.
558-
// TODO(kostasrim) we can remove this once my proposed changes for replication move forward
559-
// as the state transitions for ReplicaOf command will be much clearer.
560-
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
561556
}
562557
state_mask_ &= ~R_SYNCING;
563558
last_journal_LSNs_.reset();

src/server/server_family.cc

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
154154
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
155155
ABSL_DECLARE_FLAG(int, replica_priority);
156156
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
157+
ABSL_DECLARE_FLAG(bool, experimental_replicaof_v2);
157158

158159
bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
159160
#define RETURN_ON_ERROR(cond, m) \
@@ -3588,6 +3589,11 @@ void ServerFamily::StopAllClusterReplicas() {
35883589
}
35893590

35903591
void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) {
3592+
const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2);
3593+
if (use_replica_of_v2) {
3594+
ReplicaOfInternalV2(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError);
3595+
return;
3596+
}
35913597
ReplicaOfInternal(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError);
35923598
}
35933599

@@ -3601,9 +3607,109 @@ void ServerFamily::Replicate(string_view host, string_view port) {
36013607
CmdArgList args_list = absl::MakeSpan(args_vec);
36023608
io::NullSink sink;
36033609
facade::RedisReplyBuilder rb(&sink);
3610+
const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2);
3611+
if (use_replica_of_v2) {
3612+
ReplicaOfInternalV2(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication);
3613+
return;
3614+
}
36043615
ReplicaOfInternal(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication);
36053616
}
36063617

3618+
void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
3619+
util::fb2::LockGuard lk(replicaof_mu_);
3620+
ServerState* ss = ServerState::tlocal();
3621+
3622+
if (!ss->is_master) {
3623+
CHECK(replica_);
3624+
3625+
// flip flag before clearing replica_
3626+
SetMasterFlagOnAllThreads(true);
3627+
3628+
last_master_data_ = replica_->Stop();
3629+
replica_.reset();
3630+
StopAllClusterReplicas();
3631+
}
3632+
3633+
// May not switch to ACTIVE if the process is, for example, shutting down at the same time.
3634+
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
3635+
3636+
return builder->SendOk();
3637+
}
3638+
3639+
void ServerFamily::ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
3640+
ActionOnConnectionFail on_error)
3641+
ABSL_LOCKS_EXCLUDED(replicaof_mu_) {
3642+
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
3643+
if (!replicaof_args.has_value()) {
3644+
return;
3645+
}
3646+
3647+
LOG(INFO) << "Initiate replication with: " << *replicaof_args;
3648+
// This is a "weak" check. For example, if the node is already a replica,
3649+
// it could be the case that one of the flows disconnects. The MainReplicationFiber
3650+
// will then loop and if it can't partial sync it will enter LOADING state because of
3651+
// full sync. Note that the fiber is not aware of the replicaof_mu_ so even
3652+
// if that mutex is locked below before any state check we can't really enforce
3653+
// that the old replication fiber won't try to full sync and update the state to LOADING.
3654+
// What is more here is that we always call `replica->Stop()`. So even if we end up in the
3655+
// scenario described, the semantics are well defined. First, cancel the old replica and
3656+
// move on with the new one. Cancelation will be slower and ReplicaOf() will
3657+
// induce higher latency -- but that's ok because it's an highly improbable flow with
3658+
// well defined semantics.
3659+
ServerState* ss = ServerState::tlocal();
3660+
3661+
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
3662+
builder->SendError(kLoadingErr);
3663+
return;
3664+
}
3665+
3666+
// replicaof no one
3667+
if (replicaof_args->IsReplicaOfNoOne()) {
3668+
return ReplicaOfNoOne(builder);
3669+
}
3670+
3671+
auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
3672+
master_replid(), replicaof_args->slot_range);
3673+
GenericError ec;
3674+
switch (on_error) {
3675+
case ActionOnConnectionFail::kReturnOnError:
3676+
ec = new_replica->Start();
3677+
break;
3678+
case ActionOnConnectionFail::kContinueReplication:
3679+
new_replica->EnableReplication();
3680+
break;
3681+
};
3682+
3683+
if (ec || new_replica->IsContextCancelled()) {
3684+
return builder->SendError(ec ? ec.Format() : "replication cancelled");
3685+
}
3686+
3687+
// Critical section.
3688+
// 1. Stop the old replica_ if it exists
3689+
// 2. Update all the pointers to the new replica and update master flag
3690+
// 3. Start the main replication fiber
3691+
// 4. Send OK
3692+
util::fb2::LockGuard lk(replicaof_mu_);
3693+
std::optional<Replica::LastMasterSyncData> last_master_data;
3694+
if (replica_)
3695+
last_master_data = replica_->Stop();
3696+
3697+
StopAllClusterReplicas();
3698+
3699+
if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER)
3700+
service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING);
3701+
3702+
// TODO Update thread locals. That way INFO never blocks
3703+
replica_ = new_replica;
3704+
SetMasterFlagOnAllThreads(false);
3705+
3706+
if (on_error == ActionOnConnectionFail::kReturnOnError) {
3707+
replica_->StartMainReplicationFiber(last_master_data);
3708+
}
3709+
3710+
builder->SendOk();
3711+
}
3712+
36073713
// REPLTAKEOVER <seconds> [SAVE]
36083714
// SAVE is used only by tests.
36093715
void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) {

src/server/server_family.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,11 @@ class ServerFamily {
373373
void ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
374374
ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
375375

376+
void ReplicaOfNoOne(SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
377+
// REPLICAOF implementation without two phase locking.
378+
void ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
379+
ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_);
380+
376381
struct LoadOptions {
377382
std::string snapshot_id;
378383
uint32_t shard_count = 0; // Shard count of the snapshot being loaded.

tests/dragonfly/replication_test.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -454,9 +454,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl
454454
"""
455455
Issue 100 replication commands. This checks that the replication state
456456
machine can handle cancellation well.
457-
We assert that at least one command was cancelled.
458-
After we finish the 'fuzzing' part, replicate the first master and check that
459-
all the data is correct.
460457
"""
461458
COMMANDS_TO_ISSUE = 100
462459

@@ -475,12 +472,8 @@ async def ping_status():
475472
await asyncio.sleep(0.05)
476473

477474
async def replicate():
478-
try:
479-
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
480-
return True
481-
except redis.exceptions.ResponseError as e:
482-
assert e.args[0] == "replication cancelled"
483-
return False
475+
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
476+
return True
484477

485478
ping_job = asyncio.create_task(ping_status())
486479
replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)]
@@ -490,7 +483,7 @@ async def replicate():
490483
num_successes += await result
491484

492485
logging.info(f"succeses: {num_successes}")
493-
assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
486+
assert COMMANDS_TO_ISSUE == num_successes
494487

495488
await wait_available_async(c_replica)
496489
capture = await seeder.capture()
@@ -499,6 +492,12 @@ async def replicate():
499492

500493
ping_job.cancel()
501494

495+
replica.stop()
496+
lines = replica.find_in_logs("Stopping replication")
497+
# Cancelled 99 times by REPLICAOF command and once by Shutdown() because
498+
# we stopped the instance
499+
assert len(lines) == COMMANDS_TO_ISSUE
500+
502501

503502
"""
504503
Test flushall command. Set data to master send flashall and set more data.
@@ -2981,8 +2980,7 @@ async def replicate_inside_multi():
29812980
num_successes += await result
29822981

29832982
logging.info(f"succeses: {num_successes}")
2984-
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
2985-
assert num_successes > 0, "At least one REPLICAOF must success"
2983+
assert MULTI_COMMANDS_TO_ISSUE == num_successes
29862984

29872985

29882986
async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):

0 commit comments

Comments
 (0)