Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/include/transfer_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ class TransferMetadata {
Json::Value &local_json);
int receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json);
std::string getFullMetadataKey(const std::string &segment_name) const;

bool p2p_handshake_mode_{false};
std::string common_key_prefix_;
std::string rpc_meta_prefix_;
// local cache
RWSpinlock segment_lock_;
std::unordered_map<uint64_t, std::shared_ptr<SegmentDesc>>
Expand Down
61 changes: 48 additions & 13 deletions mooncake-transfer-engine/src/transfer_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@

namespace mooncake {

const static std::string kCommonKeyPrefix = "mooncake/";
const static std::string kRpcMetaPrefix = kCommonKeyPrefix + "rpc_meta/";

// mooncake/segments/[...]
static inline std::string getFullMetadataKey(const std::string &segment_name) {
auto pos = segment_name.find("/");
if (pos == segment_name.npos)
return kCommonKeyPrefix + "ram/" + segment_name;
else
return kCommonKeyPrefix + segment_name;
static inline std::string extractProtocolFromConnString(
const std::string &conn_string) {
std::size_t pos = conn_string.find("://");
if (pos != std::string::npos) {
return conn_string.substr(0, pos);
}
return "";
}

struct TransferNotifyUtil {
Expand Down Expand Up @@ -77,6 +74,26 @@ struct TransferHandshakeUtil {

TransferMetadata::TransferMetadata(const std::string &conn_string) {
next_segment_id_.store(1);

std::string protocol = extractProtocolFromConnString(conn_string);
std::string custom_key;

if (protocol == "etcd") {
const char *custom_prefix = std::getenv("MC_METADATA_KEY_PREFIX");
if (custom_prefix != nullptr && strlen(custom_prefix) > 0) {
custom_key = custom_prefix;
// Ensure the custom key ends with '/'
if (!custom_key.empty() && custom_key.back() != '/') {
custom_key += '/';
}
LOG(INFO) << "Using custom metadata key for etcd: mooncake/"
<< custom_key;
}
}

common_key_prefix_ = "mooncake/" + custom_key;
rpc_meta_prefix_ = common_key_prefix_ + "rpc_meta/";

handshake_plugin_ = HandShakePlugin::Create(conn_string);
if (!handshake_plugin_) {
LOG(ERROR)
Expand All @@ -97,6 +114,24 @@ TransferMetadata::TransferMetadata(const std::string &conn_string) {

TransferMetadata::~TransferMetadata() { handshake_plugin_.reset(); }

std::string TransferMetadata::getFullMetadataKey(
const std::string &segment_name) const {
if (segment_name.empty()) {
LOG(WARNING)
<< "Empty segment_name provided to getFullMetadataKey";
return common_key_prefix_ + "ram/";
}

auto pos = segment_name.find("/");
if (pos == segment_name.npos) {
// Simple segment name without path
return common_key_prefix_ + "ram/" + segment_name;
} else {
// Segment name already contains path
return common_key_prefix_ + segment_name;
}
}

int TransferMetadata::receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json) {
RWSpinlock::WriteGuard guard(notify_lock_);
Expand Down Expand Up @@ -623,7 +658,7 @@ int TransferMetadata::addRpcMetaEntry(const std::string &server_name,
Json::Value rpcMetaJSON;
rpcMetaJSON["ip_or_host_name"] = desc.ip_or_host_name;
rpcMetaJSON["rpc_port"] = static_cast<Json::UInt64>(desc.rpc_port);
if (!storage_plugin_->set(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->set(rpc_meta_prefix_ + server_name, rpcMetaJSON)) {
LOG(ERROR) << "Failed to set location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -634,7 +669,7 @@ int TransferMetadata::removeRpcMetaEntry(const std::string &server_name) {
if (p2p_handshake_mode_) {
return 0;
}
if (!storage_plugin_->remove(kRpcMetaPrefix + server_name)) {
if (!storage_plugin_->remove(rpc_meta_prefix_ + server_name)) {
LOG(ERROR) << "Failed to remove location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -657,7 +692,7 @@ int TransferMetadata::getRpcMetaEntry(const std::string &server_name,
desc.rpc_port = port;
} else {
Json::Value rpcMetaJSON;
if (!storage_plugin_->get(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->get(rpc_meta_prefix_ + server_name, rpcMetaJSON)) {
LOG(ERROR) << "Failed to find location of " << server_name;
return ERR_METADATA;
}
Expand Down
Loading