diff --git a/mooncake-store/include/master_client.h b/mooncake-store/include/master_client.h index b576eeef4..279da9597 100644 --- a/mooncake-store/include/master_client.h +++ b/mooncake-store/include/master_client.h @@ -181,7 +181,7 @@ class MasterClient { * containing view version and client status */ [[nodiscard]] tl::expected Ping( - const UUID& client_id); + const UUID& client_id, size_t qp_count); private: /** diff --git a/mooncake-store/include/master_metric_manager.h b/mooncake-store/include/master_metric_manager.h index 0e9d7399d..5ce0c2738 100644 --- a/mooncake-store/include/master_metric_manager.h +++ b/mooncake-store/include/master_metric_manager.h @@ -40,7 +40,10 @@ class MasterMetricManager { // Cluster Metrics void inc_active_clients(int64_t val = 1); void dec_active_clients(int64_t val = 1); + void inc_cluster_total_qp_num(int64_t val = 1); + void dec_cluster_total_qp_num(int64_t val = 1); int64_t get_active_clients(); + int64_t get_cluster_total_qp_num(); // Operation Statistics (Counters) void inc_put_start_requests(int64_t val = 1); @@ -178,6 +181,7 @@ class MasterMetricManager { // Cluster Metrics ylt::metric::gauge_t active_clients_; + ylt::metric::gauge_t cluster_total_qp_num_; // Operation Statistics ylt::metric::counter_t put_start_requests_; diff --git a/mooncake-store/include/master_service.h b/mooncake-store/include/master_service.h index fb9ae8317..f1e2f1e6f 100644 --- a/mooncake-store/include/master_service.h +++ b/mooncake-store/include/master_service.h @@ -235,7 +235,8 @@ class MasterService { * @return ErrorCode::OK on success, ErrorCode::INTERNAL_ERROR if the client * ping queue is full */ - auto Ping(const UUID& client_id) -> tl::expected; + auto Ping(const UUID& client_id, size_t qp_count) + -> tl::expected; /** * @brief Get the master service cluster ID to use as subdirectory name @@ -435,6 +436,8 @@ class MasterService { mutable std::shared_mutex client_mutex_; std::unordered_set> ok_client_; // client with ok status + std::unordered_map> + client_qp_counts_; // QP count per client void ClientMonitorFunc(); std::thread client_monitor_thread_; std::atomic client_monitor_running_{false}; diff --git a/mooncake-store/include/rpc_service.h b/mooncake-store/include/rpc_service.h index 77cb9995f..38dd687df 100644 --- a/mooncake-store/include/rpc_service.h +++ b/mooncake-store/include/rpc_service.h @@ -80,7 +80,8 @@ class WrappedMasterService { tl::expected GetFsdir(); - tl::expected Ping(const UUID& client_id); + tl::expected Ping(const UUID& client_id, + size_t qp_count); private: MasterService master_service_; diff --git a/mooncake-store/include/types.h b/mooncake-store/include/types.h index df9816cf1..91f4e6e53 100644 --- a/mooncake-store/include/types.h +++ b/mooncake-store/include/types.h @@ -489,19 +489,24 @@ inline std::ostream& operator<<(std::ostream& os, struct PingResponse { ViewVersionId view_version_id; ClientStatus client_status; + uint64_t total_qp_num{0}; PingResponse() = default; - PingResponse(ViewVersionId view_version, ClientStatus status) - : view_version_id(view_version), client_status(status) {} + PingResponse(ViewVersionId view_version, ClientStatus status, + uint64_t qp_number = 0) + : view_version_id(view_version), + client_status(status), + total_qp_num(qp_number) {} friend std::ostream& operator<<(std::ostream& os, const PingResponse& response) noexcept { return os << "PingResponse: { view_version_id: " << response.view_version_id - << ", client_status: " << response.client_status << " }"; + << ", client_status: " << response.client_status + << ", total_qp_num: " << response.total_qp_num << " }"; } }; -YLT_REFL(PingResponse, view_version_id, client_status); +YLT_REFL(PingResponse, view_version_id, client_status, total_qp_num); enum class BufferAllocatorType { CACHELIB = 0, // CachelibBufferAllocator diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index b190d48cc..1086e0fdc 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -1289,7 +1289,9 @@ void Client::PingThreadFunc() { } // Ping master - auto ping_result = master_client_.Ping(client_id_); + size_t qp_num_per_client = transfer_engine_.getTotalQpNum(); + auto ping_result = + master_client_.Ping(client_id_, (const size_t&)qp_num_per_client); if (ping_result) { // Reset ping failure count ping_fail_count = 0; diff --git a/mooncake-store/src/master_client.cpp b/mooncake-store/src/master_client.cpp index 3287f71b0..0692e118a 100644 --- a/mooncake-store/src/master_client.cpp +++ b/mooncake-store/src/master_client.cpp @@ -402,13 +402,13 @@ tl::expected MasterClient::UnmountSegment( return result; } -tl::expected MasterClient::Ping( - const UUID& client_id) { +tl::expected MasterClient::Ping(const UUID& client_id, + size_t qp_count) { ScopedVLogTimer timer(1, "MasterClient::Ping"); timer.LogRequest("client_id=", client_id); - auto result = - invoke_rpc<&WrappedMasterService::Ping, PingResponse>(client_id); + auto result = invoke_rpc<&WrappedMasterService::Ping, PingResponse>( + client_id, qp_count); timer.LogResponseExpected(result); return result; } diff --git a/mooncake-store/src/master_metric_manager.cpp b/mooncake-store/src/master_metric_manager.cpp index 670b8e5b2..b478a7cb1 100644 --- a/mooncake-store/src/master_metric_manager.cpp +++ b/mooncake-store/src/master_metric_manager.cpp @@ -34,6 +34,8 @@ MasterMetricManager::MasterMetricManager() // Initialize cluster metrics active_clients_("master_active_clients", "Total number of active clients"), + cluster_total_qp_num_("master_cluster_total_qp_num", + "Total number of QP across all clients"), // Initialize Request Counters put_start_requests_("master_put_start_requests_total", @@ -241,6 +243,18 @@ int64_t MasterMetricManager::get_active_clients() { return active_clients_.value(); } +void MasterMetricManager::inc_cluster_total_qp_num(int64_t val) { + cluster_total_qp_num_.inc(val); +} + +void MasterMetricManager::dec_cluster_total_qp_num(int64_t val) { + cluster_total_qp_num_.dec(val); +} + +int64_t MasterMetricManager::get_cluster_total_qp_num() { + return cluster_total_qp_num_.value(); +} + // Operation Statistics (Counters) void MasterMetricManager::inc_exist_key_requests(int64_t val) { exist_key_requests_.inc(val); @@ -616,6 +630,7 @@ std::string MasterMetricManager::serialize_metrics() { serialize_metric(total_capacity_); serialize_metric(key_count_); serialize_metric(soft_pin_key_count_); + serialize_metric(cluster_total_qp_num_); if (enable_ha_) { serialize_metric(active_clients_); } @@ -680,6 +695,7 @@ std::string MasterMetricManager::get_summary_string() { int64_t keys = key_count_.value(); int64_t soft_pin_keys = soft_pin_key_count_.value(); int64_t active_clients = active_clients_.value(); + int64_t cluster_total_qp_num = cluster_total_qp_num_.value(); // Request counters int64_t exist_keys = exist_key_requests_.value(); @@ -754,6 +770,7 @@ std::string MasterMetricManager::get_summary_string() { << ((double)allocated / (double)capacity * 100.0) << "%)"; } ss << " | Keys: " << keys << " (soft-pinned: " << soft_pin_keys << ")"; + ss << " | QP: " << cluster_total_qp_num; if (enable_ha_) { ss << " | Clients: " << active_clients; } diff --git a/mooncake-store/src/master_service.cpp b/mooncake-store/src/master_service.cpp index 17e3c8add..cef096109 100644 --- a/mooncake-store/src/master_service.cpp +++ b/mooncake-store/src/master_service.cpp @@ -567,7 +567,7 @@ size_t MasterService::GetKeyCount() const { return total; } -auto MasterService::Ping(const UUID& client_id) +auto MasterService::Ping(const UUID& client_id, const size_t qp_count) -> tl::expected { if (!enable_ha_) { LOG(ERROR) << "Ping is only available in HA mode"; @@ -589,7 +589,7 @@ auto MasterService::Ping(const UUID& client_id) << ", error=client_ping_queue_full"; return tl::make_unexpected(ErrorCode::INTERNAL_ERROR); } - return PingResponse(view_version_, client_status); + return PingResponse(view_version_, client_status, 0); } tl::expected MasterService::GetFsdir() const { diff --git a/mooncake-store/src/rpc_service.cpp b/mooncake-store/src/rpc_service.cpp index a65a8ded8..5a3a1179d 100644 --- a/mooncake-store/src/rpc_service.cpp +++ b/mooncake-store/src/rpc_service.cpp @@ -491,13 +491,19 @@ tl::expected WrappedMasterService::GetFsdir() { } tl::expected WrappedMasterService::Ping( - const UUID& client_id) { + const UUID& client_id, size_t qp_count) { ScopedVLogTimer timer(1, "Ping"); timer.LogRequest("client_id=", client_id); MasterMetricManager::instance().inc_ping_requests(); - auto result = master_service_.Ping(client_id); + auto result = master_service_.Ping(client_id, 0); + + if (result.has_value()) { + // Increment cluster total QP number based on client's reported QP count + MasterMetricManager::instance().inc_cluster_total_qp_num( + result.value().total_qp_num); + } timer.LogResponseExpected(result); return result; @@ -543,4 +549,4 @@ void RegisterRpcService( &wrapped_master_service); } -} // namespace mooncake \ No newline at end of file +} // namespace mooncake diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 831d72436..16848c2ca 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -216,6 +216,9 @@ class TransferEngine { return local_topology_; } + // Get the total number of QPs from RDMA transport + size_t getTotalQpNum() const; + private: struct MemoryRegion { void *addr; diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/endpoint_store.h b/mooncake-transfer-engine/include/transport/rdma_transport/endpoint_store.h index 478fe3b04..314d52fe9 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/endpoint_store.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/endpoint_store.h @@ -49,7 +49,7 @@ class EndpointStore { virtual int disconnectQPs() = 0; // Get the total number of QPs across all endpoints - virtual size_t getTotalQPNumber() = 0; + virtual size_t getTotalQpNum() = 0; }; // FIFO @@ -68,7 +68,7 @@ class FIFOEndpointStore : public EndpointStore { int destroyQPs() override; int disconnectQPs() override; - size_t getTotalQPNumber() override; + size_t getTotalQpNum() override; private: RWSpinlock endpoint_map_lock_; @@ -99,7 +99,7 @@ class SIEVEEndpointStore : public EndpointStore { int destroyQPs() override; int disconnectQPs() override; - size_t getTotalQPNumber() override; + size_t getTotalQpNum() override; private: RWSpinlock endpoint_map_lock_; diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h index 0cc84a2fa..3813cd859 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h @@ -91,7 +91,7 @@ class RdmaContext { int disconnectAllEndpoints(); // Get the total number of QPs across all endpoints in this context - size_t getTotalQPNumber() const; + size_t getTotalQpNum() const; public: // Device name, such as `mlx5_3` diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h index 4a411e3b2..ca65cf093 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h @@ -113,6 +113,9 @@ class RdmaTransport : public Transport { std::string_view hint, int &buffer_id, int &device_id, int retry_cnt = 0); + // Get the total number of QPs across all RDMA contexts + size_t getTotalQpNum() const; + private: std::vector> context_list_; std::shared_ptr local_topology_; @@ -126,4 +129,4 @@ using BatchID = Transport::BatchID; } // namespace mooncake -#endif // RDMA_TRANSPORT_H_ \ No newline at end of file +#endif // RDMA_TRANSPORT_H_ diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index fb28cd85d..8da2daa3e 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -23,8 +23,10 @@ #include #include +#include "common.h" #include "transfer_metadata_plugin.h" #include "transport/transport.h" +#include "transport/rdma_transport/rdma_transport.h" namespace mooncake { @@ -435,6 +437,22 @@ int TransferEngine::unregisterLocalMemoryBatch( return 0; } +size_t TransferEngine::getTotalQpNum() const { + Transport *rdma_transport = multi_transports_->getTransport("rdma"); + if (!rdma_transport) { + return 0; + } + + // Cast to RdmaTransport to access the actual QP counting functionality + RdmaTransport *rdma = dynamic_cast(rdma_transport); + if (!rdma) { + LOG(ERROR) << "Failed to cast RDMA transport to RdmaTransport type"; + return 0; + } + + return rdma->getTotalQpNum(); +} + #ifdef WITH_METRICS // Helper function to convert string to lowercase for case-insensitive // comparison diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp index 2da73d01e..2c87d47e5 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp @@ -114,7 +114,7 @@ int FIFOEndpointStore::disconnectQPs() { return 0; } -size_t FIFOEndpointStore::getTotalQPNumber() { +size_t FIFOEndpointStore::getTotalQpNum() { RWSpinlock::ReadGuard guard(endpoint_map_lock_); size_t total_qps = 0; for (const auto &kv : endpoint_map_) { @@ -238,7 +238,7 @@ int SIEVEEndpointStore::disconnectQPs() { size_t SIEVEEndpointStore::getSize() { return endpoint_map_.size(); } -size_t SIEVEEndpointStore::getTotalQPNumber() { +size_t SIEVEEndpointStore::getTotalQpNum() { RWSpinlock::ReadGuard guard(endpoint_map_lock_); size_t total_qps = 0; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp index 41f20b270..bc2412605 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp @@ -328,8 +328,8 @@ int RdmaContext::deleteEndpoint(const std::string &peer_nic_path) { return endpoint_store_->deleteEndpoint(peer_nic_path); } -size_t RdmaContext::getTotalQPNumber() const { - return endpoint_store_->getTotalQPNumber(); +size_t RdmaContext::getTotalQpNum() const { + return endpoint_store_->getTotalQpNum(); } std::string RdmaContext::nicPath() const { diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index faf80f14b..f56b37619 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -505,4 +505,14 @@ int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset, return selectDevice(desc, offset, length, "", buffer_id, device_id, retry_count); } + +size_t RdmaTransport::getTotalQpNum() const { + size_t total_qps = 0; + for (const auto &context : context_list_) { + if (context && context->active()) { + total_qps += context->getTotalQpNum(); + } + } + return total_qps; +} } // namespace mooncake