Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions mooncake-store/include/ha_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

namespace mooncake {

// The key to store the master view in etcd
inline const char* const MASTER_VIEW_KEY = "mooncake-store/master_view";

/*
* @brief A helper class for maintain and monitor the master view change.
* The cluster is assumed to have multiple master servers, but only
Expand All @@ -26,7 +23,7 @@ class MasterViewHelper {
public:
MasterViewHelper(const MasterViewHelper&) = delete;
MasterViewHelper& operator=(const MasterViewHelper&) = delete;
MasterViewHelper() = default;
MasterViewHelper();

/*
* @brief Connect to the etcd cluster. This function should be called at
Expand Down Expand Up @@ -61,6 +58,9 @@ class MasterViewHelper {
*/
ErrorCode GetMasterView(std::string& master_address,
ViewVersionId& version);

private:
std::string master_view_key_;
};

/*
Expand Down
34 changes: 26 additions & 8 deletions mooncake-store/src/ha_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@

namespace mooncake {

MasterViewHelper::MasterViewHelper() {
std::string cluster_id;
const char* cluster_id_env = std::getenv("MC_STORE_CLUSTER_ID");
if (cluster_id_env != nullptr && strlen(cluster_id_env) > 0) {
cluster_id = cluster_id_env;
} else {
cluster_id = "mooncake";
}
// Ensure the cluster_id ends with '/' if not empty
if (!cluster_id.empty() && cluster_id.back() != '/') {
cluster_id += '/';
}
master_view_key_ = "mooncake-store/" + cluster_id + "master_view";
LOG(INFO) << "Master view key: " << master_view_key_;
}

ErrorCode MasterViewHelper::ConnectToEtcd(const std::string& etcd_endpoints) {
return EtcdHelper::ConnectToEtcdStoreClient(etcd_endpoints);
}
Expand All @@ -15,8 +31,9 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
// Check if there is already a leader
ViewVersionId current_version = 0;
std::string current_master;
auto ret = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY),
current_master, current_version);
auto ret =
EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(),
current_master, current_version);
if (ret != ErrorCode::OK && ret != ErrorCode::ETCD_KEY_NOT_EXIST) {
LOG(ERROR) << "Failed to get current leader: " << ret;
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -27,8 +44,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
// In rare cases, the leader may be ourselves, but it does not
// matter. We will watch the key until it's deleted.
LOG(INFO) << "Waiting for leadership change...";
auto ret = EtcdHelper::WatchUntilDeleted(MASTER_VIEW_KEY,
strlen(MASTER_VIEW_KEY));
auto ret = EtcdHelper::WatchUntilDeleted(master_view_key_.c_str(),
master_view_key_.size());
if (ret != ErrorCode::OK) {
LOG(ERROR) << "Etcd error when waiting for leadership change: "
<< ret;
Expand All @@ -52,8 +69,8 @@ void MasterViewHelper::ElectLeader(const std::string& master_address,
}

ret = EtcdHelper::CreateWithLease(
MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY), master_address.c_str(),
master_address.size(), lease_id, version);
master_view_key_.c_str(), master_view_key_.size(),
master_address.c_str(), master_address.size(), lease_id, version);
if (ret == ErrorCode::ETCD_TRANSACTION_FAIL) {
LOG(INFO) << "Failed to elect self as leader: " << ret;
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -75,8 +92,9 @@ void MasterViewHelper::KeepLeader(EtcdLeaseId lease_id) {

ErrorCode MasterViewHelper::GetMasterView(std::string& master_address,
ViewVersionId& version) {
auto err_code = EtcdHelper::Get(MASTER_VIEW_KEY, strlen(MASTER_VIEW_KEY),
master_address, version);
auto err_code =
EtcdHelper::Get(master_view_key_.c_str(), master_view_key_.size(),
master_address, version);
if (err_code != ErrorCode::OK) {
if (err_code == ErrorCode::ETCD_KEY_NOT_EXIST) {
LOG(ERROR) << "No master is available";
Expand Down
3 changes: 3 additions & 0 deletions mooncake-transfer-engine/include/transfer_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ class TransferMetadata {
Json::Value &local_json);
int receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json);
std::string getFullMetadataKey(const std::string &segment_name) const;

bool p2p_handshake_mode_{false};
std::string common_key_prefix_;
std::string rpc_meta_prefix_;
// local cache
RWSpinlock segment_lock_;
std::unordered_map<uint64_t, std::shared_ptr<SegmentDesc>>
Expand Down
58 changes: 45 additions & 13 deletions mooncake-transfer-engine/src/transfer_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@

namespace mooncake {

const static std::string kCommonKeyPrefix = "mooncake/";
const static std::string kRpcMetaPrefix = kCommonKeyPrefix + "rpc_meta/";

// mooncake/segments/[...]
static inline std::string getFullMetadataKey(const std::string &segment_name) {
auto pos = segment_name.find("/");
if (pos == segment_name.npos)
return kCommonKeyPrefix + "ram/" + segment_name;
else
return kCommonKeyPrefix + segment_name;
static inline std::string extractProtocolFromConnString(
const std::string &conn_string) {
std::size_t pos = conn_string.find("://");
if (pos != std::string::npos) {
return conn_string.substr(0, pos);
}
return "etcd";
}

struct TransferNotifyUtil {
Expand Down Expand Up @@ -77,6 +74,23 @@ struct TransferHandshakeUtil {

TransferMetadata::TransferMetadata(const std::string &conn_string) {
next_segment_id_.store(1);

std::string protocol = extractProtocolFromConnString(conn_string);
std::string custom_key;

const char *custom_prefix = std::getenv("MC_METADATA_CLUSTER_ID");
if (custom_prefix != nullptr && strlen(custom_prefix) > 0) {
custom_key = custom_prefix;

if (!custom_key.empty() && custom_key.back() != '/') {
custom_key += '/';
}
LOG(INFO) << "Using metadata cluster ID: mooncake/" << custom_key;
}

common_key_prefix_ = "mooncake/" + custom_key;
rpc_meta_prefix_ = common_key_prefix_ + "rpc_meta/";

handshake_plugin_ = HandShakePlugin::Create(conn_string);
if (!handshake_plugin_) {
LOG(ERROR)
Expand All @@ -97,6 +111,23 @@ TransferMetadata::TransferMetadata(const std::string &conn_string) {

TransferMetadata::~TransferMetadata() { handshake_plugin_.reset(); }

std::string TransferMetadata::getFullMetadataKey(
const std::string &segment_name) const {
if (segment_name.empty()) {
LOG(WARNING) << "Empty segment_name provided to getFullMetadataKey";
return common_key_prefix_ + "ram/";
}

auto pos = segment_name.find("/");
if (pos == segment_name.npos) {
// Simple segment name without path
return common_key_prefix_ + "ram/" + segment_name;
} else {
// Segment name already contains path
return common_key_prefix_ + segment_name;
}
}

int TransferMetadata::receivePeerNotify(const Json::Value &peer_json,
Json::Value &local_json) {
RWSpinlock::WriteGuard guard(notify_lock_);
Expand Down Expand Up @@ -623,7 +654,7 @@ int TransferMetadata::addRpcMetaEntry(const std::string &server_name,
Json::Value rpcMetaJSON;
rpcMetaJSON["ip_or_host_name"] = desc.ip_or_host_name;
rpcMetaJSON["rpc_port"] = static_cast<Json::UInt64>(desc.rpc_port);
if (!storage_plugin_->set(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->set(rpc_meta_prefix_ + server_name, rpcMetaJSON)) {
LOG(ERROR) << "Failed to set location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -634,7 +665,7 @@ int TransferMetadata::removeRpcMetaEntry(const std::string &server_name) {
if (p2p_handshake_mode_) {
return 0;
}
if (!storage_plugin_->remove(kRpcMetaPrefix + server_name)) {
if (!storage_plugin_->remove(rpc_meta_prefix_ + server_name)) {
LOG(ERROR) << "Failed to remove location of " << server_name;
return ERR_METADATA;
}
Expand All @@ -657,7 +688,8 @@ int TransferMetadata::getRpcMetaEntry(const std::string &server_name,
desc.rpc_port = port;
} else {
Json::Value rpcMetaJSON;
if (!storage_plugin_->get(kRpcMetaPrefix + server_name, rpcMetaJSON)) {
if (!storage_plugin_->get(rpc_meta_prefix_ + server_name,
rpcMetaJSON)) {
LOG(ERROR) << "Failed to find location of " << server_name;
return ERR_METADATA;
}
Expand Down
Loading