diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index dd6ea922f789..34cb61f56148 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -161,6 +161,7 @@ void OutgoingMigration::Finish(const GenericError& error) { } bool should_cancel_flows = false; + absl::Cleanup on_exit([this]() { CloseSocket(); }); { util::fb2::LockGuard lk(state_mu_); switch (state_) { @@ -313,6 +314,8 @@ void OutgoingMigration::SyncFb() { break; } + CloseSocket(); + VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString(); } diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index d20f73856677..cbb46f8e018c 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -110,6 +110,7 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) { MaybeInitSslCtx(); #endif } + ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) { #ifdef DFLY_USE_SSL MaybeInitSslCtx(); @@ -119,13 +120,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov ProtocolClient::~ProtocolClient() { exec_st_.JoinErrorHandler(); - // FIXME: We should close the socket explictly outside of the destructor. This currently - // breaks test_cancel_replication_immediately. - if (sock_) { - std::error_code ec; - sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); }); - LOG_IF(ERROR, ec) << "Error closing socket " << ec; - } #ifdef DFLY_USE_SSL if (ssl_ctx_) { SSL_CTX_free(ssl_ctx_); @@ -163,6 +157,7 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time ExecutionState* cntx) { ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); + socket_thread_ = ProactorBase::me(); { unique_lock lk(sock_mu_); // The context closes sock_. So if the context error handler has already @@ -235,6 +230,9 @@ void ProtocolClient::CloseSocket() { auto ec = sock_->Shutdown(SHUT_RDWR); LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; } + auto ec = sock_->Close(); // Quietly close. + + LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); }); } } @@ -385,11 +383,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) { } uint64_t ProtocolClient::LastIoTime() const { - return last_io_time_; + return last_io_time_.load(std::memory_order_relaxed); } void ProtocolClient::TouchIoTime() { - last_io_time_ = Proactor()->GetMonotonicTimeNs(); + last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed); } } // namespace dfly diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 7e7ddda036b7..e21696caeffa 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -107,7 +107,8 @@ class ProtocolClient { } auto* Proactor() const { - return sock_->proactor(); + DCHECK(socket_thread_); + return socket_thread_; } util::FiberSocketBase* Sock() const { @@ -132,7 +133,7 @@ class ProtocolClient { std::string last_cmd_; std::string last_resp_; - uint64_t last_io_time_ = 0; // in ns, monotonic clock. + std::atomic last_io_time_ = 0; // in ns, monotonic clock. #ifdef DFLY_USE_SSL @@ -142,6 +143,7 @@ class ProtocolClient { #else void* ssl_ctx_{nullptr}; #endif + util::fb2::ProactorBase* socket_thread_; }; } // namespace dfly diff --git a/src/server/replica.cc b/src/server/replica.cc index 34d1f4e04049..3e34645171e0 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -64,7 +64,7 @@ ABSL_DECLARE_FLAG(uint16_t, announce_port); ABSL_FLAG( int, replica_priority, 100, "Published by info command for sentinel to pick replica based on score during a failover"); -ABSL_FLAG(bool, experimental_replicaof_v2, true, +ABSL_FLAG(bool, experimental_replicaof_v2, false, "Use ReplicaOfV2 algorithm for initiating replication"); namespace dfly { @@ -152,6 +152,8 @@ void Replica::StartMainReplicationFiber(std::optional last_m void Replica::EnableReplication() { VLOG(1) << "Enabling replication"; + socket_thread_ = ProactorBase::me(); + state_mask_ = R_ENABLED; // set replica state to enabled sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this, nullopt); // call replication fiber } @@ -170,9 +172,17 @@ std::optional Replica::Stop() { sync_fb_.JoinIfNeeded(); DVLOG(1) << "MainReplicationFb stopped " << this; acks_fb_.JoinIfNeeded(); - for (auto& flow : shard_flows_) { - flow.reset(); - } + + proactor_->Await([this]() { + // Destructor is blocking, so other fibers can observe partial state + // of flows during clean up. To avoid this, we move them and clear the + // member before the preemption point + auto shard_flows = std::move(shard_flows_); + shard_flows_.clear(); + for (auto& flow : shard_flows) { + flow.reset(); + } + }); if (last_journal_LSNs_.has_value()) { return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()}; @@ -501,18 +511,12 @@ error_code Replica::InitiatePSync() { return error_code{}; } -// Initialize and start sub-replica for each flow. -error_code Replica::InitiateDflySync(std::optional last_master_sync_data) { - auto start_time = absl::Now(); - - // Initialize MultiShardExecution. - multi_shard_exe_.reset(new MultiShardExecution()); - - // Initialize shard flows. +void Replica::InitializeShardFlows() { shard_flows_.resize(master_context_.num_flows); DCHECK(!shard_flows_.empty()); - for (unsigned i = 0; i < shard_flows_.size(); ++i) { - // Transfer LSN state for partial sync + thread_flow_map_ = Partition(shard_flows_.size()); + + for (size_t i = 0; i < shard_flows_.size(); ++i) { uint64_t partial_sync_lsn = 0; if (shard_flows_[i]) { partial_sync_lsn = shard_flows_[i]->JournalExecutedCount(); @@ -523,7 +527,19 @@ error_code Replica::InitiateDflySync(std::optional last_mast shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn); } } - thread_flow_map_ = Partition(shard_flows_.size()); +} + +// Initialize and start sub-replica for each flow. +error_code Replica::InitiateDflySync(std::optional last_master_sync_data) { + auto start_time = absl::Now(); + + // Initialize MultiShardExecution. + multi_shard_exe_.reset(new MultiShardExecution()); + + // Initialize shard flows. The update to the shard_flows_ should be done by this thread. + // Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset() + // below. + InitializeShardFlows(); // Blocked on until all flows got full sync cut. BlockingCounter sync_block{unsigned(shard_flows_.size())}; @@ -754,11 +770,6 @@ error_code Replica::ConsumeDflyStream() { }; RETURN_ON_ERR(exec_st_.SwitchErrorHandler(std::move(err_handler))); - size_t total_flows_to_finish_partial = 0; - for (const auto& flow : thread_flow_map_) { - total_flows_to_finish_partial += flow.size(); - } - LOG(INFO) << "Transitioned into stable sync"; // Transition flows into stable sync. { @@ -1210,11 +1221,12 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d auto Replica::GetSummary() const -> Summary { auto f = [this]() { + DCHECK(this); auto last_io_time = LastIoTime(); - // Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here - // it's unlikely to cause a real bug. - for (const auto& flow : shard_flows_) { // Get last io time from all sub flows. + for (const auto& flow : shard_flows_) { + DCHECK(Proactor() == ProactorBase::me()); + DCHECK(flow); last_io_time = std::max(last_io_time, flow->LastIoTime()); } @@ -1246,25 +1258,14 @@ auto Replica::GetSummary() const -> Summary { return res; }; - if (Sock()) - return Proactor()->AwaitBrief(f); - - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); + return Proactor()->AwaitBrief(f); } std::vector Replica::GetReplicaOffset() const { std::vector flow_rec_count; flow_rec_count.resize(shard_flows_.size()); for (const auto& flow : shard_flows_) { + DCHECK(flow.get()); uint32_t flow_id = flow->FlowId(); uint64_t rec_count = flow->JournalExecutedCount(); DCHECK_LT(flow_id, shard_flows_.size()); diff --git a/src/server/replica.h b/src/server/replica.h index 7cc2c7a0e0e2..85846ecf8e41 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -154,6 +154,8 @@ class Replica : ProtocolClient { size_t GetRecCountExecutedPerShard(const std::vector& indexes) const; private: + void InitializeShardFlows(); + util::fb2::ProactorBase* proactor_ = nullptr; Service& service_; MasterContext master_context_; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 581050019032..c1641d84ea73 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -243,6 +243,11 @@ using detail::SaveStagesController; using http::StringResponse; using strings::HumanReadableNumBytes; +// Initialized by REPLICAOF +thread_local std::shared_ptr tl_replica = nullptr; +// Initialized by ADDREPLICAOF +thread_local std::vector> tl_cluster_replicas; + namespace { // TODO these should be configurable as command line flag and at runtime via config set @@ -1049,6 +1054,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { ValidateClientTlsFlags(); dfly_cmd_ = make_unique(this); legacy_format_metrics_ = GetFlag(FLAGS_keep_legacy_memory_metrics); + use_replica_of_v2_ = GetFlag(FLAGS_experimental_replicaof_v2); } ServerFamily::~ServerFamily() { @@ -1231,6 +1237,7 @@ void ServerFamily::Shutdown() { replica_->Stop(); } StopAllClusterReplicas(); + UpdateReplicationThreadLocals(nullptr); dfly_cmd_->Shutdown(); DebugCmd::Shutdown(); @@ -2004,6 +2011,17 @@ vector ServerFamily::GetNonPriviligedListeners() const { } optional ServerFamily::GetReplicaSummary() const { + /// Without lock + if (use_replica_of_v2_) { + if (tl_replica == nullptr) { + return nullopt; + } + auto replica = tl_replica; + return replica->GetSummary(); + } + + // With lock + // TODO deprecate this in favor of v2 util::fb2::LockGuard lk(replicaof_mu_); if (replica_ == nullptr) { return nullopt; @@ -3126,16 +3144,24 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio }; auto add_repl_info = [&] { + std::shared_ptr replica; + std::vector> cluster_replicas; bool is_master = true; // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is // insufficient in this scenario. // Please note that we we do not use Metrics object here. - { + if (use_replica_of_v2_) { + // Deep copy because tl_replica might be overwritten inbetween + replica = tl_replica; + cluster_replicas = tl_cluster_replicas; + is_master = !replica; + } else { fb2::LockGuard lk(replicaof_mu_); is_master = !replica_; } + if (is_master) { vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); append("role", "master"); @@ -3169,13 +3195,22 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("psync_attempts", rinfo.psync_attempts); append("psync_successes", rinfo.psync_successes); }; - fb2::LockGuard lk(replicaof_mu_); - replication_info_cb(replica_->GetSummary()); - - // Special case, when multiple masters replicate to a single replica. - for (const auto& replica : cluster_replicas_) { + if (use_replica_of_v2_) { replication_info_cb(replica->GetSummary()); + + // Special case, when multiple masters replicate to a single replica. + for (const auto& replica : cluster_replicas) { + replication_info_cb(replica->GetSummary()); + } + } else { + fb2::LockGuard lk(replicaof_mu_); + replication_info_cb(replica_->GetSummary()); + + // Special case, when multiple masters replicate to a single replica. + for (const auto& replica : cluster_replicas_) { + replication_info_cb(replica->GetSummary()); + } } } }; @@ -3461,7 +3496,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } LOG(INFO) << "Add Replica " << *replicaof_args; - auto add_replica = make_unique(replicaof_args->host, replicaof_args->port, &service_, + auto add_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, master_replid(), replicaof_args->slot_range); GenericError ec = add_replica->Start(); if (ec) { @@ -3470,6 +3505,9 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) } add_replica->StartMainReplicationFiber(nullopt); cluster_replicas_.push_back(std::move(add_replica)); + service_.proactor_pool().AwaitFiberOnAll( + [this](auto index, auto* cntx) + ABSL_NO_THREAD_SAFETY_ANALYSIS { tl_cluster_replicas = cluster_replicas_; }); cmd_cntx.rb->SendOk(); } @@ -3589,8 +3627,7 @@ void ServerFamily::StopAllClusterReplicas() { } void ServerFamily::ReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) { - const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2); - if (use_replica_of_v2) { + if (use_replica_of_v2_) { ReplicaOfInternalV2(args, cmd_cntx.tx, cmd_cntx.rb, ActionOnConnectionFail::kReturnOnError); return; } @@ -3607,8 +3644,7 @@ void ServerFamily::Replicate(string_view host, string_view port) { CmdArgList args_list = absl::MakeSpan(args_vec); io::NullSink sink; facade::RedisReplyBuilder rb(&sink); - const bool use_replica_of_v2 = absl::GetFlag(FLAGS_experimental_replicaof_v2); - if (use_replica_of_v2) { + if (use_replica_of_v2_) { ReplicaOfInternalV2(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication); return; } @@ -3628,6 +3664,7 @@ void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) { last_master_data_ = replica_->Stop(); replica_.reset(); StopAllClusterReplicas(); + UpdateReplicationThreadLocals(nullptr); } // May not switch to ACTIVE if the process is, for example, shutting down at the same time. @@ -3699,8 +3736,8 @@ void ServerFamily::ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkRep if (ServerState::tlocal()->gstate() == GlobalState::TAKEN_OVER) service_.SwitchState(GlobalState::TAKEN_OVER, GlobalState::LOADING); - // TODO Update thread locals. That way INFO never blocks replica_ = new_replica; + UpdateReplicationThreadLocals(new_replica); SetMasterFlagOnAllThreads(false); if (on_error == ActionOnConnectionFail::kReturnOnError) { @@ -3767,6 +3804,8 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) last_master_data_ = replica_->Stop(); replica_.reset(); + UpdateReplicationThreadLocals(nullptr); + SetMasterFlagOnAllThreads(true); return builder->SendOk(); } @@ -3866,6 +3905,12 @@ void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) { } void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { + // New version without mutex lock + if (use_replica_of_v2_) { + return RoleV2(args, cmd_cntx); + } + + // Old algorithm with mutex lock auto* rb = static_cast(cmd_cntx.rb); util::fb2::LockGuard lk(replicaof_mu_); // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, @@ -3909,6 +3954,54 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) { } } +void ServerFamily::RoleV2(CmdArgList args, const CommandContext& cmd_cntx) { + auto* rb = static_cast(cmd_cntx.rb); + // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, + // ensuring eventual consistency of is_master. When determining if the server is a replica and + // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is + // insufficient in this scenario. + if (!tl_replica) { + rb->StartArray(2); + rb->SendBulkString("master"); + auto vec = dfly_cmd_->GetReplicasRoleInfo(); + rb->StartArray(vec.size()); + for (auto& data : vec) { + rb->StartArray(3); + rb->SendBulkString(data.address); + rb->SendBulkString(absl::StrCat(data.listening_port)); + rb->SendBulkString(data.state); + } + + } else { + // Deep copy because tl_replica might be overwritten inbetween + auto replica = tl_replica; + auto cluster_replicas = tl_cluster_replicas; + + rb->StartArray(4 + cluster_replicas.size() * 3); + rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); + + auto send_replica_info = [rb](Replica::Summary rinfo) { + rb->SendBulkString(rinfo.host); + rb->SendBulkString(absl::StrCat(rinfo.port)); + if (rinfo.full_sync_done) { + rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "online" + : "stable_sync"); + } else if (rinfo.full_sync_in_progress) { + rb->SendBulkString("full_sync"); + } else if (rinfo.master_link_established) { + rb->SendBulkString("preparation"); + } else { + rb->SendBulkString("connecting"); + } + }; + + send_replica_info(replica->GetSummary()); + for (const auto& repl : cluster_replicas) { + send_replica_info(repl->GetSummary()); + } + } +} + void ServerFamily::Script(CmdArgList args, const CommandContext& cmd_cntx) { script_mgr_->Run(std::move(args), cmd_cntx.tx, cmd_cntx.rb, cmd_cntx.conn_cntx); } @@ -4096,6 +4189,13 @@ void ServerFamily::ClientPauseCmd(CmdArgList args, SinkReplyBuilder* builder, } } +void ServerFamily::UpdateReplicationThreadLocals(std::shared_ptr repl) { + service_.proactor_pool().AwaitFiberOnAll([repl](auto index, auto* context) { + tl_replica = repl; + tl_cluster_replicas.clear(); + }); +} + #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) namespace acl { diff --git a/src/server/server_family.h b/src/server/server_family.h index e8894a19702e..482b3ca1ae01 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -355,6 +355,7 @@ class ServerFamily { ABSL_LOCKS_EXCLUDED(replicaof_mu_); void ReplConf(CmdArgList args, const CommandContext& cmd_cntx); void Role(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void RoleV2(CmdArgList args, const CommandContext& cmd_cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); void Save(CmdArgList args, const CommandContext& cmd_cntx); void BgSave(CmdArgList args, const CommandContext& cmd_cntx); void Script(CmdArgList args, const CommandContext& cmd_cntx); @@ -378,6 +379,8 @@ class ServerFamily { void ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void UpdateReplicationThreadLocals(std::shared_ptr repl); + struct LoadOptions { std::string snapshot_id; uint32_t shard_count = 0; // Shard count of the snapshot being loaded. @@ -429,7 +432,7 @@ class ServerFamily { mutable util::fb2::Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_ ABSL_GUARDED_BY(replicaof_mu_); - std::vector> cluster_replicas_ + std::vector> cluster_replicas_ ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly std::unique_ptr script_mgr_; @@ -468,6 +471,7 @@ class ServerFamily { LoadingStats loading_stats_ ABSL_GUARDED_BY(loading_stats_mu_); bool legacy_format_metrics_ = true; + bool use_replica_of_v2_ = false; }; // Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress