Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions mooncake-store/include/ha_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

namespace mooncake {

// The key to store the master view in etcd
inline const char* const MASTER_VIEW_KEY = "mooncake-store/master_view";

/*
* @brief A helper class for maintain and monitor the master view change.
* The cluster is assumed to have multiple master servers, but only
Expand Down Expand Up @@ -61,6 +58,9 @@ class MasterViewHelper {
*/
ErrorCode GetMasterView(std::string& master_address,
ViewVersionId& version);

private:
std::string master_view_key_;
};

/*
Expand Down
32 changes: 24 additions & 8 deletions mooncake-store/src/ha_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@
namespace mooncake {

ErrorCode MasterViewHelper::ConnectToEtcd(const std::string& etcd_endpoints) {
std::string custom_key;
const char* custom_prefix = std::getenv("MC_STORE_ETCD_KEY_PREFIX");
if (custom_prefix != nullptr && strlen(custom_prefix) > 0) {
custom_key = custom_prefix;
// Ensure the custom key ends with '/'
if (!custom_key.empty() && custom_key.back() != '/') {
custom_key += '/';
}
LOG(INFO) << "Using custom store etcd key prefix: mooncake-store/"
<< custom_key;
}
master_view_key_ = "mooncake-store/" + custom_key + "master_view";
LOG(INFO) << "Master view key: " << master_view_key_;

return EtcdHelper::ConnectToEtcdStoreClient(etcd_endpoints);
}

Expand All @@ -15,8 +29,9 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
// Check if there is already a leader
ViewVersionId current_version = 0;
std::string current_master;
auto ret = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY),
current_master, current_version);
auto ret =
EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(),
current_master, current_version);
if (ret != ErrorCode::OK && ret != ErrorCode::ETCD_KEY_NOT_EXIST) {
LOG(ERROR) << "Failed to get current leader: " << ret;
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -27,8 +42,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
// In rare cases, the leader may be ourselves, but it does not
// matter. We will watch the key until it's deleted.
LOG(INFO) << "Waiting for leadership change...";
auto ret = EtcdHelper::WatchUntilDeleted(MASTER_VIEW_KEY,
strlen(MASTER_VIEW_KEY));
auto ret = EtcdHelper::WatchUntilDeleted(master_view_key_.c_str(),
master_view_key_.size());
if (ret != ErrorCode::OK) {
LOG(ERROR) << "Etcd error when waiting for leadership change: "
<< ret;
Expand All @@ -52,8 +67,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
}

ret = EtcdHelper::CreateWithLease(
MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY), master_address.c_str(),
master_address.size(), lease_id, version);
master_view_key_.c_str(), master_view_key_.size(),
master_address.c_str(), master_address.size(), lease_id, version);
if (ret == ErrorCode::ETCD_TRANSACTION_FAIL) {
LOG(INFO) << "Failed to elect self as leader: " << ret;
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -75,8 +90,9 @@ void MasterViewHelper::KeepLeader(EtcdLeaseId lease_id) {

ErrorCode MasterViewHelper::GetMasterView(std::string& master_address,
ViewVersionId& version) {
auto err_code = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY),
master_address, version);
auto err_code =
EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(),
master_address, version);
if (err_code != ErrorCode::OK) {
if (err_code == ErrorCode::ETCD_KEY_NOT_EXIST) {
LOG(ERROR) << "No master is available";
Expand Down
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/include/transfer_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ class TransferMetadata {
Json::Value &local_json);
int receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json);
std::string getFullMetadataKey(const std::string &segment_name) const;

bool p2p_handshake_mode_{false};
std::string common_key_prefix_;
std::string rpc_meta_prefix_;
// local cache
RWSpinlock segment_lock_;
std::unordered_map<uint64_t, std::shared_ptr<SegmentDesc>>
Expand Down
61 changes: 48 additions & 13 deletions mooncake-transfer-engine/src/transfer_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@

namespace mooncake {

const static std::string kCommonKeyPrefix = "mooncake/";
const static std::string kRpcMetaPrefix = kCommonKeyPrefix + "rpc_meta/";

// mooncake/segments/[...]
static inline std::string getFullMetadataKey(const std::string &segment_name) {
auto pos = segment_name.find("/");
if (pos == segment_name.npos)
return kCommonKeyPrefix + "ram/" + segment_name;
else
return kCommonKeyPrefix + segment_name;
static inline std::string extractProtocolFromConnString(
const std::string &conn_string) {
std::size_t pos = conn_string.find("://");
if (pos != std::string::npos) {
return conn_string.substr(0, pos);
}
return "etcd";
}

struct TransferNotifyUtil {
Expand Down Expand Up @@ -77,6 +74,26 @@ struct TransferHandshakeUtil {

TransferMetadata::TransferMetadata(const std::string &conn_string) {
next_segment_id_.store(1);

std::string protocol = extractProtocolFromConnString(conn_string);
std::string custom_key;

if (protocol == "etcd") {
const char *custom_prefix = std::getenv("MC_METADATA_ETCD_KEY_PREFIX");
if (custom_prefix != nullptr && strlen(custom_prefix) > 0) {
custom_key = custom_prefix;
// Ensure the custom key ends with '/'
if (!custom_key.empty() && custom_key.back() != '/') {
custom_key += '/';
}
LOG(INFO) << "Using custom metadata etcd key prefix: mooncake/"
<< custom_key;
}
}

common_key_prefix_ = "mooncake/" + custom_key;
rpc_meta_prefix_ = common_key_prefix_ + "rpc_meta/";

handshake_plugin_ = HandShakePlugin::Create(conn_string);
if (!handshake_plugin_) {
LOG(ERROR)
Expand All @@ -97,6 +114,23 @@ TransferMetadata::TransferMetadata(const std::string &conn_string) {

TransferMetadata::~TransferMetadata() { handshake_plugin_.reset(); }

std::string TransferMetadata::getFullMetadataKey(
const std::string &segment_name) const {
if (segment_name.empty()) {
LOG(WARNING) << "Empty segment_name provided to getFullMetadataKey";
return common_key_prefix_ + "ram/";
}

auto pos = segment_name.find("/");
if (pos == segment_name.npos) {
// Simple segment name without path
return common_key_prefix_ + "ram/" + segment_name;
} else {
// Segment name already contains path
return common_key_prefix_ + segment_name;
}
}

int TransferMetadata::receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json) {
RWSpinlock::WriteGuard guard(notify_lock_);
Expand Down Expand Up @@ -623,7 +657,7 @@ int TransferMetadata::addRpcMetaEntry(const std::string &server_name,
Json::Value rpcMetaJSON;
rpcMetaJSON["ip_or_host_name"] = desc.ip_or_host_name;
rpcMetaJSON["rpc_port"] = static_cast<Json::UInt64>(desc.rpc_port);
if (!storage_plugin_->set(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->set(rpc_meta_prefix_ + server_name, rpcMetaJSON)) {
LOG(ERROR) << "Failed to set location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -634,7 +668,7 @@ int TransferMetadata::removeRpcMetaEntry(const std::string &server_name) {
if (p2p_handshake_mode_) {
return 0;
}
if (!storage_plugin_->remove(kRpcMetaPrefix + server_name)) {
if (!storage_plugin_->remove(rpc_meta_prefix_ + server_name)) {
LOG(ERROR) << "Failed to remove location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -657,7 +691,8 @@ int TransferMetadata::getRpcMetaEntry(const std::string &server_name,
desc.rpc_port = port;
} else {
Json::Value rpcMetaJSON;
if (!storage_plugin_->get(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->get(rpc_meta_prefix_ + server_name,
rpcMetaJSON)) {
LOG(ERROR) << "Failed to find location of " << server_name;
return ERR_METADATA;
}
Expand Down
Loading