diff --git a/src/server/replica.cc b/src/server/replica.cc index 1b41a19b01fa..2a7ab69944e5 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -63,6 +63,8 @@ ABSL_DECLARE_FLAG(uint16_t, announce_port); ABSL_FLAG( int, replica_priority, 100, "Published by info command for sentinel to pick replica based on score during a failover"); +ABSL_FLAG(bool, experimental_replicaof_v2, true, + "Use ReplicaOfV2 algorithm for initiating replication"); namespace dfly { @@ -542,13 +544,6 @@ error_code Replica::InitiateDflySync(std::optional last_mast JoinDflyFlows(); if (sync_type == "full") { service_.RemoveLoadingState(); - } else if (service_.IsLoadingExclusively()) { - // We need this check. We originally set the state unconditionally to LOADING - // when we call ReplicaOf command. If for some reason we fail to start full sync below - // or cancel the context, we still need to switch to ACTIVE state. - // TODO(kostasrim) we can remove this once my proposed changes for replication move forward - // as the state transitions for ReplicaOf command will be much clearer. - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); } state_mask_ &= ~R_SYNCING; last_journal_LSNs_.reset(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8d8ad84e4bbf..33b2594282d2 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -154,6 +154,7 @@ ABSL_DECLARE_FLAG(string, tls_ca_cert_file); ABSL_DECLARE_FLAG(string, tls_ca_cert_dir); ABSL_DECLARE_FLAG(int, replica_priority); ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio); +ABSL_DECLARE_FLAG(bool, experimental_replicaof_v2); bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) { #define RETURN_ON_ERROR(cond, m) \ @@ -3555,6 +3556,11 @@ void ServerFamily::StopAllClusterReplicas() { } void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) { + const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2); + if (use_replica_of_v2) { + ReplicaOfInternalV2(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError); + return; + } ReplicaOfInternal(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError); } @@ -3568,9 +3574,109 @@ void ServerFamily::Replicate(string_view host, string_view port) { CmdArgList args_list = absl::MakeSpan(args_vec); io::NullSink sink; facade::RedisReplyBuilder rb(&sink); + const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2); + if (use_replica_of_v2) { + ReplicaOfInternalV2(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication); + return; + } ReplicaOfInternal(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication); } +void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) { + util::fb2::LockGuard lk(replicaof_mu_); + ServerState* ss = ServerState::tlocal(); + + if (!ss->is_master) { + CHECK(replica_); + + // flip flag before clearing replica_ + SetMasterFlagOnAllThreads(true); + + last_master_data_ = replica_->Stop(); + replica_.reset(); + StopAllClusterReplicas(); + } + + // May not switch to ACTIVE if the process is, for example, shutting down at the same time. + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); + + return builder->SendOk(); +} + +void ServerFamily::ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ActionOnConnectionFail on_error) + ABSL_LOCKS_EXCLUDED(replicaof_mu_) { + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); + if (!replicaof_args.has_value()) { + return; + } + + LOG(INFO) << "Initiate replication with: " << *replicaof_args; + // This is a "weak" check. For example, if the node is already a replica, + // it could be the case that one of the flows disconnects. The MainReplicationFiber + // will then loop and if it can't partial sync it will enter LOADING state because of + // full sync. Note that the fiber is not aware of the replicaof_mu_ so even + // if that mutex is locked below before any state check we can't really enforce + // that the old replication fiber won't try to full sync and update the state to LOADING. + // What is more here is that we always call `replica->Stop()`. So even if we end up in the + // scenario described, the semantics are well defined. First, cancel the old replica and + // move on with the new one. Cancelation will be slower and ReplicaOf() will + // induce higher latency -- but that's ok because it's an highly improbable flow with + // well defined semantics. + ServerState* ss = ServerState::tlocal(); + + if (ss->is_master && ss->gstate() == GlobalState::LOADING) { + builder->SendError(kLoadingErr); + return; + } + + // replicaof no one + if (replicaof_args->IsReplicaOfNoOne()) { + return ReplicaOfNoOne(builder); + } + + auto new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, + master_replid(), replicaof_args->slot_range); + GenericError ec; + switch (on_error) { + case ActionOnConnectionFail::kReturnOnError: + ec = new_replica->Start(); + break; + case ActionOnConnectionFail::kContinueReplication: + new_replica->EnableReplication(); + break; + }; + + if (ec || new_replica->IsContextCancelled()) { + return builder->SendError(ec ? ec.Format() : "replication cancelled"); + } + + // Critical section. + // 1. Stop the old replica_ if it exists + // 2. Update all the pointers to the new replica and update master flag + // 3. Start the main replication fiber + // 4. Send OK + util::fb2::LockGuard lk(replicaof_mu_); + std::optional last_master_data; + if (replica_) + last_master_data = replica_->Stop(); + + StopAllClusterReplicas(); + + if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER) + service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); + + // TODO Update thread locals. That way INFO never blocks + replica_ = new_replica; + SetMasterFlagOnAllThreads(false); + + if (on_error == ActionOnConnectionFail::kReturnOnError) { + replica_->StartMainReplicationFiber(last_master_data); + } + + builder->SendOk(); +} + // REPLTAKEOVER [SAVE] // SAVE is used only by tests. void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) { diff --git a/src/server/server_family.h b/src/server/server_family.h index d04a15f66884..6e9c6e1bc584 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -373,6 +373,11 @@ class ServerFamily { void ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void ReplicaOfNoOne(SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + // REPLICAOF implementation without two phase locking. + void ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + struct LoadOptions { std::string snapshot_id; uint32_t shard_count = 0; // Shard count of the snapshot being loaded. diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d48f9d189ad5..a8506a0598fd 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -448,9 +448,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl """ Issue 100 replication commands. This checks that the replication state machine can handle cancellation well. - We assert that at least one command was cancelled. - After we finish the 'fuzzing' part, replicate the first master and check that - all the data is correct. """ COMMANDS_TO_ISSUE = 100 @@ -469,12 +466,8 @@ async def ping_status(): await asyncio.sleep(0.05) async def replicate(): - try: - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - return True - except redis.exceptions.ResponseError as e: - assert e.args[0] == "replication cancelled" - return False + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + return True ping_job = asyncio.create_task(ping_status()) replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)] @@ -484,7 +477,7 @@ async def replicate(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" + assert COMMANDS_TO_ISSUE == num_successes await wait_available_async(c_replica) capture = await seeder.capture() @@ -493,6 +486,12 @@ async def replicate(): ping_job.cancel() + replica.stop() + lines = replica.find_in_logs("Stopping replication") + # Cancelled 99 times by REPLICAOF command and once by Shutdown() because + # we stopped the instance + assert len(lines) == COMMANDS_TO_ISSUE + """ Test flushall command. Set data to master send flashall and set more data. @@ -2974,8 +2973,7 @@ async def replicate_inside_multi(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" - assert num_successes > 0, "At least one REPLICAOF must success" + assert MULTI_COMMANDS_TO_ISSUE == num_successes async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):