diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index dd6ea922f789..05972685561d 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) { switch (state_) { case MigrationState::C_FATAL: case MigrationState::C_FINISHED: + CloseSocket(); return; // Already finished, nothing else to do case MigrationState::C_CONNECTING: @@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) { }); exec_st_.JoinErrorHandler(); } + + // Close socket for clean disconnect. + CloseSocket(); } MigrationState OutgoingMigration::GetState() const { @@ -314,6 +318,7 @@ void OutgoingMigration::SyncFb() { } VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString(); + CloseSocket(); } bool OutgoingMigration::FinalizeMigration(long attempt) { diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index d20f73856677..07e5192073d9 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -109,23 +109,22 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) { #ifdef DFLY_USE_SSL MaybeInitSslCtx(); #endif + + // We initialize the proactor thread here such that it never races with Sock(). + // ProtocolClient is never migrated to a different thread, so this is safe. + socket_thread_ = ProactorBase::me(); } + ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) { #ifdef DFLY_USE_SSL MaybeInitSslCtx(); #endif + socket_thread_ = ProactorBase::me(); } ProtocolClient::~ProtocolClient() { exec_st_.JoinErrorHandler(); - // FIXME: We should close the socket explictly outside of the destructor. This currently - // breaks test_cancel_replication_immediately. - if (sock_) { - std::error_code ec; - sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); }); - LOG_IF(ERROR, ec) << "Error closing socket " << ec; - } #ifdef DFLY_USE_SSL if (ssl_ctx_) { SSL_CTX_free(ssl_ctx_); @@ -162,6 +161,7 @@ error_code ProtocolClient::ResolveHostDns() { error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, ExecutionState* cntx) { ProactorBase* mythread = ProactorBase::me(); + DCHECK(mythread == socket_thread_); CHECK(mythread); { unique_lock lk(sock_mu_); @@ -235,6 +235,9 @@ void ProtocolClient::CloseSocket() { auto ec = sock_->Shutdown(SHUT_RDWR); LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; } + error_code ec = sock_->Close(); // Quietly close. + + LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); }); } } diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 7e7ddda036b7..70829d688ef1 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -107,7 +107,7 @@ class ProtocolClient { } auto* Proactor() const { - return sock_->proactor(); + return socket_thread_; } util::FiberSocketBase* Sock() const { @@ -142,6 +142,7 @@ class ProtocolClient { #else void* ssl_ctx_{nullptr}; #endif + util::fb2::ProactorBase* socket_thread_; }; } // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 1b41a19b01fa..56be8af11f54 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -167,9 +167,13 @@ std::optional Replica::Stop() { sync_fb_.JoinIfNeeded(); DVLOG(1) << "MainReplicationFb stopped " << this; acks_fb_.JoinIfNeeded(); - for (auto& flow : shard_flows_) { - flow.reset(); - } + + proactor_->Await([this]() { + for (auto& flow : shard_flows_) { + flow.reset(); + } + shard_flows_.clear(); + }); if (last_journal_LSNs_.has_value()) { return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()}; @@ -505,14 +509,23 @@ error_code Replica::InitiateDflySync(std::optional last_mast // Initialize MultiShardExecution. multi_shard_exe_.reset(new MultiShardExecution()); - // Initialize shard flows. - shard_flows_.resize(master_context_.num_flows); - DCHECK(!shard_flows_.empty()); - for (unsigned i = 0; i < shard_flows_.size(); ++i) { - shard_flows_[i].reset( - new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); - } - thread_flow_map_ = Partition(shard_flows_.size()); + // Initialize shard flows. The update to the shard_flows_ should be done by this thread. + // Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset() + // below. + decltype(shard_flows_) shard_flows_copy; + shard_flows_copy.resize(master_context_.num_flows); + DCHECK(!shard_flows_copy.empty()); + thread_flow_map_ = Partition(shard_flows_copy.size()); + const size_t pool_sz = shard_set->pool()->size(); + + shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) { + for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) { + shard_flows_copy[i].reset( + new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); + } + }); + // now update shard_flows on proactor thread + shard_flows_ = std::move(shard_flows_copy); // Blocked on until all flows got full sync cut. BlockingCounter sync_block{unsigned(shard_flows_.size())}; @@ -542,13 +555,6 @@ error_code Replica::InitiateDflySync(std::optional last_mast JoinDflyFlows(); if (sync_type == "full") { service_.RemoveLoadingState(); - } else if (service_.IsLoadingExclusively()) { - // We need this check. We originally set the state unconditionally to LOADING - // when we call ReplicaOf command. If for some reason we fail to start full sync below - // or cancel the context, we still need to switch to ACTIVE state. - // TODO(kostasrim) we can remove this once my proposed changes for replication move forward - // as the state transitions for ReplicaOf command will be much clearer. - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); } state_mask_ &= ~R_SYNCING; last_journal_LSNs_.reset(); @@ -1178,6 +1184,7 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d auto Replica::GetSummary() const -> Summary { auto f = [this]() { + DCHECK(Proactor() == ProactorBase::me()); auto last_io_time = LastIoTime(); // Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here @@ -1214,25 +1221,14 @@ auto Replica::GetSummary() const -> Summary { return res; }; - if (Sock()) - return Proactor()->AwaitBrief(f); - - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); + return Proactor()->AwaitBrief(f); } std::vector Replica::GetReplicaOffset() const { std::vector flow_rec_count; flow_rec_count.resize(shard_flows_.size()); for (const auto& flow : shard_flows_) { + DCHECK(flow.get()); uint32_t flow_id = flow->FlowId(); uint64_t rec_count = flow->JournalExecutedCount(); DCHECK_LT(flow_id, shard_flows_.size()); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 7f00361e2345..02d37a7705c3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -242,6 +242,11 @@ using detail::SaveStagesController; using http::StringResponse; using strings::HumanReadableNumBytes; +// Initialized by REPLICAOF +thread_local std::shared_ptr tl_replica = nullptr; +// Initialized by ADDREPLICAOF +thread_local std::vector> tl_cluster_replicas; + namespace { // TODO these should be configurable as command line flag and at runtime via config set @@ -1224,6 +1229,7 @@ void ServerFamily::Shutdown() { replica_->Stop(); } StopAllClusterReplicas(); + UpdateReplicationThreadLocals(nullptr); dfly_cmd_->Shutdown(); DebugCmd::Shutdown(); @@ -1963,11 +1969,11 @@ vector ServerFamily::GetNonPriviligedListeners() const { } optional ServerFamily::GetReplicaSummary() const { - util::fb2::LockGuard lk(replicaof_mu_); - if (replica_ == nullptr) { + if (tl_replica == nullptr) { return nullopt; } else { - return replica_->GetSummary(); + auto replica = tl_replica; + return replica->GetSummary(); } } @@ -3088,15 +3094,12 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio auto add_repl_info = [&] { bool is_master = true; - // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, - // ensuring eventual consistency of is_master. When determining if the server is a replica and - // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is - // insufficient in this scenario. - // Please note that we we do not use Metrics object here. - { - fb2::LockGuard lk(replicaof_mu_); - is_master = !replica_; - } + + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; + auto cluster_replicas = tl_cluster_replicas; + is_master = !replica; + if (is_master) { vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); append("role", "master"); @@ -3130,12 +3133,11 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("psync_attempts", rinfo.psync_attempts); append("psync_successes", rinfo.psync_successes); }; - fb2::LockGuard lk(replicaof_mu_); - replication_info_cb(replica_->GetSummary()); + replication_info_cb(replica->GetSummary()); // Special case, when multiple masters replicate to a single replica. - for (const auto& replica : cluster_replicas_) { + for (const auto& replica : cluster_replicas) { replication_info_cb(replica->GetSummary()); } } @@ -3417,7 +3419,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } LOG(INFO) << "Add Replica " << *replicaof_args; - auto add_replica = make_unique(replicaof_args->host, replicaof_args->port, &service_, + auto add_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, master_replid(), replicaof_args->slot_range); GenericError ec = add_replica->Start(); if (ec) { @@ -3426,77 +3428,75 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } add_replica->StartMainReplicationFiber(nullopt); cluster_replicas_.push_back(std::move(add_replica)); - cmd_cntx.rb->SendOk(); -} - -void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, - ActionOnConnectionFail on_err) { - std::shared_ptr new_replica; - std::optional last_master_data; - { - util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time - // We should not execute replica of command while loading from snapshot. - ServerState* ss = ServerState::tlocal(); - if (ss->is_master && ss->gstate() == GlobalState::LOADING) { - builder->SendError(kLoadingErr); - return; - } + service_.proactor_pool().AwaitFiberOnAll( + [this](auto index, auto* cntx) + ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; }); - auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); - if (!replicaof_args.has_value()) { - return; - } - - LOG(INFO) << "Replicating " << *replicaof_args; - - // If NO ONE was supplied, just stop the current replica (if it exists) - if (replicaof_args->IsReplicaOfNoOne()) { - if (!ss->is_master) { - CHECK(replica_); - - SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica - last_master_data_ = replica_->Stop(); - replica_.reset(); + cmd_cntx.rb->SendOk(); +} - StopAllClusterReplicas(); - } +void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) { + util::fb2::LockGuard lk(replicaof_mu_); + ServerState* ss = ServerState::tlocal(); - // May not switch to ACTIVE if the process is, for example, shutting down at the same time. - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); + if (!ss->is_master) { + CHECK(replica_); + // flip flag before clearing replica_ + SetMasterFlagOnAllThreads(true); + // TODO we should not allow partial sync after NO-ONE. Only after Takeover. + last_master_data_ = replica_->Stop(); + replica_.reset(); + StopAllClusterReplicas(); + UpdateReplicationThreadLocals(nullptr); + } - return builder->SendOk(); - } + // May not switch to ACTIVE if the process is, for example, shutting down at the same time. + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - // If any replication is in progress, stop it, cancellation should kick in immediately + return builder->SendOk(); +} - if (replica_) - last_master_data = replica_->Stop(); - StopAllClusterReplicas(); +bool ServerFamily::IsDragonflyLoadingAtomic() { + util::fb2::LockGuard lk(replicaof_mu_); + ServerState* ss = ServerState::tlocal(); - const GlobalState gstate = ServerState::tlocal()->gstate(); - if (gstate == GlobalState::TAKEN_OVER) { - service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); - } else if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); - new_state != GlobalState::LOADING) { - LOG(WARNING) << new_state << " in progress, ignored"; - builder->SendError("Invalid state"); - return; - } + return ss->is_master && ss->gstate() == GlobalState::LOADING; +} - // Create a new replica and assign it - new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, - master_replid(), replicaof_args->slot_range); +void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ActionOnConnectionFail on_err) { + std::optional last_master_data; - replica_ = new_replica; + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); + if (!replicaof_args.has_value()) { + return; + } - // TODO: disconnect pending blocked clients (pubsub, blocking commands) - SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica + LOG(INFO) << "Initiate replication with: " << *replicaof_args; + // This is a "weak" check. For example, if the node is already a replica, + // it could be the case that one of the flows disconnects. The MainReplicationFiber + // will then loop and if it can't partial sync it will enter LOADING state because of + // full sync. Note that the fiber is not aware of the replicaof_mu_ so even + // if that mutex is locked below before any state check we can't really enforce + // that the old replication fiber won't try to full sync and update the state to LOADING. + // What is more here is that we always call `replica->Stop()`. So even if we end up in the + // scenario described, the semantics are well defined. First, cancel the old replica and + // move on with the new one. Cancelation will be slower and ReplicaOf() will + // induce higher latency -- but that's ok because it's an highly improbable flow with + // well defined semantics. + if (IsDragonflyLoadingAtomic()) { + builder->SendError(kLoadingErr); + return; + } - } // release the lock, lk.unlock() - // We proceed connecting below without the lock to allow interrupting the replica immediately. - // From this point and onward, it should be highly responsive. + // replicaof no one + if (replicaof_args->IsReplicaOfNoOne()) { + return ReplicaOfNoOne(builder); + } + auto new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, + master_replid(), replicaof_args->slot_range); GenericError ec{}; switch (on_err) { case ActionOnConnectionFail::kReturnOnError: @@ -3507,30 +3507,33 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply break; }; - // If the replication attempt failed, clean up global state. The replica should have stopped - // internally. - util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time - - // If there was an error above during Start we must not start the main replication fiber. - // However, it could be the case that Start() above connected succefully and by the time - // we acquire the lock, the context got cancelled because another ReplicaOf command - // executed and acquired the replicaof_mu_ before us. - const bool cancelled = new_replica->IsContextCancelled(); - if (ec || cancelled) { - if (replica_ == new_replica) { - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - SetMasterFlagOnAllThreads(true); - replica_.reset(); - } - builder->SendError(ec ? ec.Format() : "replication cancelled"); - return; + if (ec || new_replica->IsContextCancelled()) { + return builder->SendError(ec ? ec.Format() : "replication cancelled"); } - // Successfully connected now we flush - // If we are called by "Replicate", tx will be null but we do not need - // to flush anything. + + // Critical section. + // 1. Stop the old replica_ if it exists + // 2. Update all the pointers to the new replica and update master flag + // 3. Start the main replication fiber + // 4. Send OK + util::fb2::LockGuard lk(replicaof_mu_); + if (replica_) + last_master_data = replica_->Stop(); + + StopAllClusterReplicas(); + + if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER) + service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); + + // Update thread locals. That way INFO never blocks + replica_ = new_replica; + UpdateReplicationThreadLocals(new_replica); + SetMasterFlagOnAllThreads(false); + if (on_err == ActionOnConnectionFail::kReturnOnError) { - new_replica->StartMainReplicationFiber(last_master_data); + replica_->StartMainReplicationFiber(last_master_data); } + builder->SendOk(); } @@ -3608,6 +3611,9 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) SetMasterFlagOnAllThreads(true); last_master_data_ = replica_->Stop(); replica_.reset(); + + UpdateReplicationThreadLocals(nullptr); + return builder->SendOk(); } @@ -3707,12 +3713,11 @@ void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) { void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - util::fb2::LockGuard lk(replicaof_mu_); // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is // insufficient in this scenario. - if (!replica_) { + if (!tl_replica) { rb->StartArray(2); rb->SendBulkString("master"); auto vec = dfly_cmd_->GetReplicasRoleInfo(); @@ -3725,7 +3730,11 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { } } else { - rb->StartArray(4 + cluster_replicas_.size() * 3); + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; + auto cluster_replicas = tl_cluster_replicas; + + rb->StartArray(4 + cluster_replicas.size() * 3); rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); auto send_replica_info = [rb](Replica::Summary rinfo) { @@ -3742,9 +3751,9 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { rb->SendBulkString("connecting"); } }; - send_replica_info(replica_->GetSummary()); - for (const auto& replica : cluster_replicas_) { - send_replica_info(replica->GetSummary()); + send_replica_info(replica->GetSummary()); + for (const auto& repl : cluster_replicas) { + send_replica_info(repl->GetSummary()); } } } @@ -3900,6 +3909,13 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder, } } +void ServerFamily::UpdateReplicationThreadLocals(std::shared_ptr repl) { + service_.proactor_pool().AwaitFiberOnAll([repl](auto index, auto* context) { + tl_replica = repl; + tl_cluster_replicas.clear(); + }); +} + #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) namespace acl { diff --git a/src/server/server_family.h b/src/server/server_family.h index d04a15f66884..db861901e55f 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -412,6 +412,10 @@ class ServerFamily { // Set accepting_connections_ and update listners according to it void ChangeConnectionAccept(bool accept); + void ReplicaOfNoOne(SinkReplyBuilder* builder); + bool IsDragonflyLoadingAtomic(); + void UpdateReplicationThreadLocals(std::shared_ptr repl); + util::fb2::Fiber snapshot_schedule_fb_; util::fb2::Fiber load_fiber_; @@ -424,7 +428,7 @@ class ServerFamily { mutable util::fb2::Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_ ABSL_GUARDED_BY(replicaof_mu_); - std::vector> cluster_replicas_ + std::vector> cluster_replicas_ ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly std::unique_ptr script_mgr_; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 8a3f0aaa66ad..e98c944e8d26 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -448,9 +448,6 @@ async def test_cancel_replication_immediately(df_factory, df_seeder_factory: Dfl """ Issue 100 replication commands. This checks that the replication state machine can handle cancellation well. - We assert that at least one command was cancelled. - After we finish the 'fuzzing' part, replicate the first master and check that - all the data is correct. """ COMMANDS_TO_ISSUE = 100 @@ -469,12 +466,8 @@ async def ping_status(): await asyncio.sleep(0.05) async def replicate(): - try: - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - return True - except redis.exceptions.ResponseError as e: - assert e.args[0] == "replication cancelled" - return False + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + return True ping_job = asyncio.create_task(ping_status()) replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)] @@ -484,7 +477,7 @@ async def replicate(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" + assert COMMANDS_TO_ISSUE == num_successes await wait_available_async(c_replica) capture = await seeder.capture() @@ -493,6 +486,12 @@ async def replicate(): ping_job.cancel() + replica.stop() + lines = replica.find_in_logs("Stopping replication") + # Cancelled 99 times by REPLICAOF command and once by Shutdown() because + # we stopped the instance + assert len(lines) == COMMANDS_TO_ISSUE + """ Test flushall command. Set data to master send flashall and set more data. @@ -2967,8 +2966,7 @@ async def replicate_inside_multi(): num_successes += await result logging.info(f"succeses: {num_successes}") - assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" - assert num_successes > 0, "At least one REPLICAOF must success" + assert MULTI_COMMANDS_TO_ISSUE == num_successes async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):