diff --git a/mooncake-store/include/ha_helper.h b/mooncake-store/include/ha_helper.h index 9c3ce53ed..897ba53a5 100644 --- a/mooncake-store/include/ha_helper.h +++ b/mooncake-store/include/ha_helper.h @@ -12,9 +12,6 @@ namespace mooncake { -// The key to store the master view in etcd -inline const char* const MASTER_VIEW_KEY = "mooncake-store/master_view"; - /* * @brief A helper class for maintain and monitor the master view change. * The cluster is assumed to have multiple master servers, but only @@ -26,7 +23,7 @@ class MasterViewHelper { public: MasterViewHelper(const MasterViewHelper&) = delete; MasterViewHelper& operator=(const MasterViewHelper&) = delete; - MasterViewHelper() = default; + MasterViewHelper(); /* * @brief Connect to the etcd cluster. This function should be called at @@ -61,6 +58,9 @@ class MasterViewHelper { */ ErrorCode GetMasterView(std::string& master_address, ViewVersionId& version); + + private: + std::string master_view_key_; }; /* diff --git a/mooncake-store/src/ha_helper.cpp b/mooncake-store/src/ha_helper.cpp index 796838a83..07906772a 100644 --- a/mooncake-store/src/ha_helper.cpp +++ b/mooncake-store/src/ha_helper.cpp @@ -4,6 +4,22 @@ namespace mooncake { +MasterViewHelper::MasterViewHelper() { + std::string cluster_id; + const char* cluster_id_env = std::getenv("MC_STORE_CLUSTER_ID"); + if (cluster_id_env != nullptr && strlen(cluster_id_env) > 0) { + cluster_id = cluster_id_env; + } else { + cluster_id = "mooncake"; + } + // Ensure the cluster_id ends with '/' if not empty + if (!cluster_id.empty() && cluster_id.back() != '/') { + cluster_id += '/'; + } + master_view_key_ = "mooncake-store/" + cluster_id + "master_view"; + LOG(INFO) << "Master view key: " << master_view_key_; +} + ErrorCode MasterViewHelper::ConnectToEtcd(const std::string& etcd_endpoints) { return EtcdHelper::ConnectToEtcdStoreClient(etcd_endpoints); } @@ -15,8 +31,9 @@ void MasterViewHelper::ElectLeader(const std::string& master_address, // Check if there is already a leader ViewVersionId current_version = 0; std::string current_master; - auto ret = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY), - current_master, current_version); + auto ret = + EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(), + current_master, current_version); if (ret != ErrorCode::OK && ret != ErrorCode::ETCD_KEY_NOT_EXIST) { LOG(ERROR) << "Failed to get current leader: " << ret; std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -27,8 +44,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address, // In rare cases, the leader may be ourselves, but it does not // matter. We will watch the key until it's deleted. LOG(INFO) << "Waiting for leadership change..."; - auto ret = EtcdHelper::WatchUntilDeleted(MASTER_VIEW_KEY, - strlen(MASTER_VIEW_KEY)); + auto ret = EtcdHelper::WatchUntilDeleted(master_view_key_.c_str(), + master_view_key_.size()); if (ret != ErrorCode::OK) { LOG(ERROR) << "Etcd error when waiting for leadership change: " << ret; @@ -52,8 +69,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address, } ret = EtcdHelper::CreateWithLease( - MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY), master_address.c_str(), - master_address.size(), lease_id, version); + master_view_key_.c_str(), master_view_key_.size(), + master_address.c_str(), master_address.size(), lease_id, version); if (ret == ErrorCode::ETCD_TRANSACTION_FAIL) { LOG(INFO) << "Failed to elect self as leader: " << ret; std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -75,8 +92,9 @@ void MasterViewHelper::KeepLeader(EtcdLeaseId lease_id) { ErrorCode MasterViewHelper::GetMasterView(std::string& master_address, ViewVersionId& version) { - auto err_code = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY), - master_address, version); + auto err_code = + EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(), + master_address, version); if (err_code != ErrorCode::OK) { if (err_code == ErrorCode::ETCD_KEY_NOT_EXIST) { LOG(ERROR) << "No master is available"; diff --git a/mooncake-transfer-engine/include/transfer_metadata.h b/mooncake-transfer-engine/include/transfer_metadata.h index 70f15c8d4..ba35c4f17 100644 --- a/mooncake-transfer-engine/include/transfer_metadata.h +++ b/mooncake-transfer-engine/include/transfer_metadata.h @@ -187,8 +187,11 @@ class TransferMetadata { Json::Value &local_json); int receivePeerNotify(const Json::Value &peer_json, Json::Value &local_json); + std::string getFullMetadataKey(const std::string &segment_name) const; bool p2p_handshake_mode_{false}; + std::string common_key_prefix_; + std::string rpc_meta_prefix_; // local cache RWSpinlock segment_lock_; std::unordered_map> diff --git a/mooncake-transfer-engine/src/transfer_metadata.cpp b/mooncake-transfer-engine/src/transfer_metadata.cpp index 04c6964ac..6c371e01a 100644 --- a/mooncake-transfer-engine/src/transfer_metadata.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata.cpp @@ -26,16 +26,13 @@ namespace mooncake { -const static std::string kCommonKeyPrefix = "mooncake/"; -const static std::string kRpcMetaPrefix = kCommonKeyPrefix + "rpc_meta/"; - -// mooncake/segments/[...] -static inline std::string getFullMetadataKey(const std::string &segment_name) { - auto pos = segment_name.find("/"); - if (pos == segment_name.npos) - return kCommonKeyPrefix + "ram/" + segment_name; - else - return kCommonKeyPrefix + segment_name; +static inline std::string extractProtocolFromConnString( + const std::string &conn_string) { + std::size_t pos = conn_string.find("://"); + if (pos != std::string::npos) { + return conn_string.substr(0, pos); + } + return "etcd"; } struct TransferNotifyUtil { @@ -77,6 +74,23 @@ struct TransferHandshakeUtil { TransferMetadata::TransferMetadata(const std::string &conn_string) { next_segment_id_.store(1); + + std::string protocol = extractProtocolFromConnString(conn_string); + std::string custom_key; + + const char *custom_prefix = std::getenv("MC_METADATA_CLUSTER_ID"); + if (custom_prefix != nullptr && strlen(custom_prefix) > 0) { + custom_key = custom_prefix; + + if (!custom_key.empty() && custom_key.back() != '/') { + custom_key += '/'; + } + LOG(INFO) << "Using metadata cluster ID: mooncake/" << custom_key; + } + + common_key_prefix_ = "mooncake/" + custom_key; + rpc_meta_prefix_ = common_key_prefix_ + "rpc_meta/"; + handshake_plugin_ = HandShakePlugin::Create(conn_string); if (!handshake_plugin_) { LOG(ERROR) @@ -97,6 +111,23 @@ TransferMetadata::TransferMetadata(const std::string &conn_string) { TransferMetadata::~TransferMetadata() { handshake_plugin_.reset(); } +std::string TransferMetadata::getFullMetadataKey( + const std::string &segment_name) const { + if (segment_name.empty()) { + LOG(WARNING) << "Empty segment_name provided to getFullMetadataKey"; + return common_key_prefix_ + "ram/"; + } + + auto pos = segment_name.find("/"); + if (pos == segment_name.npos) { + // Simple segment name without path + return common_key_prefix_ + "ram/" + segment_name; + } else { + // Segment name already contains path + return common_key_prefix_ + segment_name; + } +} + int TransferMetadata::receivePeerNotify(const Json::Value &peer_json, Json::Value &local_json) { RWSpinlock::WriteGuard guard(notify_lock_); @@ -623,7 +654,7 @@ int TransferMetadata::addRpcMetaEntry(const std::string &server_name, Json::Value rpcMetaJSON; rpcMetaJSON["ip_or_host_name"] = desc.ip_or_host_name; rpcMetaJSON["rpc_port"] = static_cast(desc.rpc_port); - if (!storage_plugin_->set(kRpcMetaPrefix + server_name, rpcMetaJSON)) { + if (!storage_plugin_->set(rpc_meta_prefix_ + server_name, rpcMetaJSON)) { LOG(ERROR) << "Failed to set location of " << server_name; return ERR_METADATA; } @@ -634,7 +665,7 @@ int TransferMetadata::removeRpcMetaEntry(const std::string &server_name) { if (p2p_handshake_mode_) { return 0; } - if (!storage_plugin_->remove(kRpcMetaPrefix + server_name)) { + if (!storage_plugin_->remove(rpc_meta_prefix_ + server_name)) { LOG(ERROR) << "Failed to remove location of " << server_name; return ERR_METADATA; } @@ -657,7 +688,8 @@ int TransferMetadata::getRpcMetaEntry(const std::string &server_name, desc.rpc_port = port; } else { Json::Value rpcMetaJSON; - if (!storage_plugin_->get(kRpcMetaPrefix + server_name, rpcMetaJSON)) { + if (!storage_plugin_->get(rpc_meta_prefix_ + server_name, + rpcMetaJSON)) { LOG(ERROR) << "Failed to find location of " << server_name; return ERR_METADATA; }