diff --git a/conanfile.py b/conanfile.py index 046b3bfec..8159bc871 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "3.0.14" + version = "3.0.15" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeStore" diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index a9c0f8bf2..a621303dd 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -13,7 +13,8 @@ namespace homeobject { ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN, ROLL_BACK, CANCELLED, QUORUM_NOT_MET); -ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND, UNKNOWN); +ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND, + UNKNOWN); // https://github.corp.ebay.com/SDS/nuobject_proto/blob/main/src/proto/pg.proto#L52 ENUM(PGStateMask, uint32_t, HEALTHY = 0, DISK_DOWN = 0x1, SCRUBBING = 0x2, BASELINE_RESYNC = 0x4, INCONSISTENT = 0x8, REPAIR = 0x10, GC_IN_PROGRESS = 0x20, RESYNCING = 0x40); @@ -40,6 +41,12 @@ struct PGMember { using MemberSet = std::set< PGMember >; +struct replace_member_task { + std::string task_id; // Unique task id for this replace member operation + uuid_t replica_out; // The replica which is going to be replaced + uuid_t replica_in; // The replica which is going to be added in place of replica_out +}; + struct PGInfo { explicit PGInfo(pg_id_t _id) : id(_id) {} pg_id_t id; @@ -55,14 +62,10 @@ struct PGInfo { // check if the PGInfo has same id, size and members with the rhs PGInfo. bool is_equivalent_to(PGInfo const& rhs) const { - if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) { - return false; - } + if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) { return false; } for (auto const& m : members) { auto it = rhs.members.find(m); - if (it == rhs.members.end() || it->priority != m.priority) { - return false; - } + if (it == rhs.members.end() || it->priority != m.priority) { return false; } } return true; } @@ -72,13 +75,13 @@ struct PGInfo { uint32_t i = 0ul; for (auto const& m : members) { if (i++ > 0) { members_str += ", "; } - members_str += fmt::format("member-{}: id={}, name={}, priority={}", - i, boost::uuids::to_string(m.id), m.name, m.priority); + members_str += fmt::format("member-{}: id={}, name={}, priority={}", i, boost::uuids::to_string(m.id), + m.name, m.priority); } return fmt::format("PGInfo: id={}, replica_set_uuid={}, size={}, chunk_size={}, " "expected_member_num={}, members={}", - id, boost::uuids::to_string(replica_set_uuid), size, chunk_size, - expected_member_num, members_str); + id, boost::uuids::to_string(replica_set_uuid), size, chunk_size, expected_member_num, + members_str); } }; @@ -91,25 +94,19 @@ struct peer_info { }; struct pg_state { - std::atomic state{0}; + std::atomic< uint64_t > state{0}; explicit pg_state(uint64_t s) : state{s} {} - void set_state(PGStateMask mask) { - state.fetch_or(static_cast(mask), std::memory_order_relaxed); - } + void set_state(PGStateMask mask) { state.fetch_or(static_cast< uint64_t >(mask), std::memory_order_relaxed); } - void clear_state(PGStateMask mask) { - state.fetch_and(~static_cast(mask), std::memory_order_relaxed); - } + void clear_state(PGStateMask mask) { state.fetch_and(~static_cast< uint64_t >(mask), std::memory_order_relaxed); } bool is_state_set(PGStateMask mask) const { - return (state.load(std::memory_order_relaxed) & static_cast(mask)) != 0; + return (state.load(std::memory_order_relaxed) & static_cast< uint64_t >(mask)) != 0; } - uint64_t get() const { - return state.load(std::memory_order_relaxed); - } + uint64_t get() const { return state.load(std::memory_order_relaxed); } }; struct PGStats { @@ -171,12 +168,13 @@ struct PGReplaceMemberStatus { class PGManager : public Manager< PGError > { public: virtual NullAsyncResult create_pg(PGInfo&& pg_info, trace_id_t tid = 0) = 0; - virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member, PGMember const& new_member, - u_int32_t commit_quorum = 0, trace_id_t tid = 0) = 0; - virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id, const PGMember& old_member, - const PGMember& new_member, - const std::vector< PGMember >& others, - uint64_t trace_id = 0) const = 0; + virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member, + PGMember const& new_member, u_int32_t commit_quorum = 0, + trace_id_t tid = 0) = 0; + virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id, + const PGMember& old_member, const PGMember& new_member, + const std::vector< PGMember >& others, + uint64_t trace_id = 0) const = 0; /** * Retrieves the statistics for a specific PG (Placement Group) identified by its ID. @@ -201,6 +199,68 @@ class PGManager : public Manager< PGError > { * @param pg_id The ID of the PG. */ virtual void destroy_pg(pg_id_t pg_id) = 0; + + /** + * @brief Single member exits a PG (Placement Group) identified by its ID. + * @param group_id The group ID of the pg. + * @param peer_id The peer ID of the member exiting the PG. + * @param trace_id The trace identifier for logging and tracking purposes. + */ + virtual NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, uint64_t trace_id) = 0; + + /** + * @brief Toggle the learner flag for a specified member. + * + * This function changes the state of the learner flag for a given member in the PG. + * It is typically used to revert the learner flag back to false when rolling back pgmove. + * + * @param pg_id The ID of the PG where the member resides. + * @param member_id The ID of the member whose learner flag is to be toggled. + * @param is_learner The new state of the learner flag (true to set as learner, false to unset). + * @param commit_quorum The quorum required for committing the change. + * @param trace_id The trace ID for tracking the operation. + * @return NullAsyncResult indicating the result of the operation. + */ + virtual NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) = 0; + + /** + * @brief Remove a member from the PG. + * + * This function removes a specified member from the PG, typically used to rollback the pgmove operation. + * + * @param pg_id The ID of the PG from which the member is to be removed. + * @param member_id The ID of the member to be removed. + * @param commit_quorum The quorum required for committing the removal. + * @param trace_id The trace ID for tracking the operation. + * @return NullAsyncResult indicating the result of the operation. + */ + virtual NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum, + trace_id_t trace_id) = 0; + + /** + * @brief Clean up the replace member task in the PG. + * + * This function cleans up the replace member task, typically used to rollback the pgmove operation. + * + * @param pg_id The ID of the PG where the task is to be cleaned. + * @param task_id The ID of the task to be cleaned. + * @param commit_quorum The quorum required for committing the task cleanup. + * @param trace_id The trace ID for tracking the operation. + * @return NullAsyncResult indicating the result of the operation. + */ + virtual NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum, + trace_id_t trace_id) = 0; + + /** + * @brief List all replace member tasks happening on this homeobject instance. + * + * This function retrieves a list of all ongoing tasks on this homeobject instance. + * + * @param trace_id The trace ID for tracking the operation. + * @return Result containing a vector of replace member tasks. + */ + virtual Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) = 0; }; } // namespace homeobject diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 79c558c08..c443d2523 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -113,6 +113,20 @@ class HomeObjectImpl : public HomeObject, virtual void _destroy_pg(pg_id_t pg_id) = 0; + virtual PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) = 0; + + virtual PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) = 0; + + virtual PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum, + trace_id_t trace_id) = 0; + + virtual PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id, + uint32_t commit_quorum, trace_id_t trace_id) = 0; + + virtual PGManager::Result< std::vector< replace_member_task > > + _list_all_replace_member_tasks(trace_id_t trace_id) = 0; + protected: std::mutex _repl_lock; peer_id_t _our_id; @@ -164,6 +178,14 @@ class HomeObjectImpl : public HomeObject, bool get_stats(pg_id_t id, PGStats& stats) const final; void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final; void destroy_pg(pg_id_t pg_id) final; + PGManager::NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) final; + PGManager::NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) final; + PGManager::NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum, + trace_id_t trace_id) final; + PGManager::NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum, + trace_id_t trace_id) final; + PGManager::Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) final; /// ShardManager ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid) const final; diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 8add0aa26..441492a6b 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -127,6 +127,17 @@ add_test(NAME HomestoreResyncTestWithLeaderRestart --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=1000 --override_config homestore_config.consensus.laggy_threshold=2000 --gtest_filter=HomeObjectFixture.RestartLeader*) +#add_test(NAME HomestoreReplaceMemberRollbackTest +# COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ +# --override_config homestore_config.consensus.snapshot_freq_distance=13 +# --override_config homestore_config.consensus.num_reserved_log_items=13 +# --override_config homestore_config.resource_limits.raft_logstore_reserve_threshold=13 +# --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000 +# --override_config homestore_config.generic.repl_dev_cleanup_interval_sec=5 +# --override_config homestore_config.consensus.max_grpc_message_size=138412032 +# --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=60000 +# --override_config homestore_config.consensus.laggy_threshold=1 +# --gtest_filter=HomeObjectFixture.RollbackReplaceMember) # GC tests add_test(NAME FetchDataWithOriginatorGC diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 080fa10a2..dc77db198 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -76,6 +76,16 @@ class HSHomeObject : public HomeObjectImpl { HomeObjectStats _get_stats() const override; void _destroy_pg(pg_id_t pg_id) override; + PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) override; + + PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) override; + PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum, + trace_id_t trace_id) override; + PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum, + trace_id_t trace_id) override; + PGManager::Result< std::vector< replace_member_task > > + _list_all_replace_member_tasks(trace_id_t trace_id) override; // Mapping from index table uuid to pg id. std::shared_mutex index_lock_; @@ -367,6 +377,11 @@ class HSHomeObject : public HomeObjectImpl { * Returns all shards */ std::vector< Shard > get_chunk_shards(homestore::chunk_num_t v_chunk_id) const; + + /** + * Update membership in pg's superblock. + */ + void update_membership(const MemberSet& members); }; struct HS_Shard : public Shard { @@ -778,6 +793,8 @@ class HSHomeObject : public HomeObjectImpl { const homestore::replica_member_info& member_out, const homestore::replica_member_info& member_in, trace_id_t tid); + void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0); + /** * @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node. * diff --git a/src/lib/homestore_backend/hs_http_manager.cpp b/src/lib/homestore_backend/hs_http_manager.cpp index 75b15b834..13b7d70e0 100644 --- a/src/lib/homestore_backend/hs_http_manager.cpp +++ b/src/lib/homestore_backend/hs_http_manager.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "hs_http_manager.hpp" #include "hs_homeobject.hpp" @@ -37,6 +38,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) { Pistache::Rest::Routes::bind(&HttpManager::reconcile_leader, this)}, {Pistache::Http::Method::Post, "/api/v1/yield_leadership_to_follower", Pistache::Rest::Routes::bind(&HttpManager::yield_leadership_to_follower, this)}, + {Pistache::Http::Method::Get, "/api/v1/pg_quorum", + Pistache::Rest::Routes::bind(&HttpManager::get_pg_quorum, this)}, + {Pistache::Http::Method::Post, "/api/v1/flip_learner", + Pistache::Rest::Routes::bind(&HttpManager::flip_learner_flag, this)}, + {Pistache::Http::Method::Delete, "/api/v1/member", + Pistache::Rest::Routes::bind(&HttpManager::remove_member, this)}, + {Pistache::Http::Method::Delete, "/api/v1/pg_replacemember_task", + Pistache::Rest::Routes::bind(&HttpManager::clean_replace_member_task, this)}, + {Pistache::Http::Method::Get, "/api/v1/pg_replacemember_tasks", + Pistache::Rest::Routes::bind(&HttpManager::list_pg_replace_member_task, this)}, + {Pistache::Http::Method::Delete, "/api/v1/pg", Pistache::Rest::Routes::bind(&HttpManager::exit_pg, this)}, #ifdef _PRERELEASE {Pistache::Http::Method::Post, "/api/v1/crashSystem", Pistache::Rest::Routes::bind(&HttpManager::crash_system, this)}, @@ -239,6 +251,168 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H response.send(Pistache::Http::Code::Ok, j.dump()); } +void HttpManager::flip_learner_flag(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + try { + auto body = request.body(); + auto j = nlohmann::json::parse(body); + + std::string pg_id_str = j.at("pg_id").get< std::string >(); + pg_id_t pg_id = std::stoull(pg_id_str); + std::string member_id_str = j.at("member_id").get< std::string >(); + peer_id_t member_id = boost::uuids::string_generator()(member_id_str); + std::string learner = j.at("learner").get< std::string >(); + std::string commit_quorum_str = j.at("commit_quorum").get< std::string >(); + uint32_t commit_quorum = std::stoul(commit_quorum_str); + auto tid = generateRandomTraceId(); + LOGINFO("Flipping learner flag, pg_id={}, member_id={}, learner={}, commit_quorum={}, tid={}", pg_id, + boost::uuids::to_string(member_id), learner, commit_quorum, tid); + auto result = ho_.flip_learner_flag(pg_id, member_id, learner == "true", commit_quorum, tid).get(); + if (!result) { + LOGI("PG flip learner flag failed, err={}", result.error()); + response.send(Pistache::Http::Code::Internal_Server_Error, + fmt::format("Failed to flip learner flag, err={}", result.error())); + return; + } + response.send(Pistache::Http::Code::Ok); + } catch (const std::exception& e) { + response.send(Pistache::Http::Code::Bad_Request, std::string("Invalid JSON: ") + e.what()); + } +} + +void HttpManager::remove_member(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + try { + auto body = request.body(); + auto j = nlohmann::json::parse(body); + + std::string pg_id_str = j.at("pg_id").get< std::string >(); + pg_id_t pg_id = std::stoull(pg_id_str); + std::string member_id_str = j.at("member_id").get< std::string >(); + peer_id_t member_id = boost::uuids::string_generator()(member_id_str); + std::string commit_quorum_str = j.at("commit_quorum").get< std::string >(); + uint32_t commit_quorum = std::stoul(commit_quorum_str); + auto tid = generateRandomTraceId(); + LOGINFO("Remove member, pg_id={}, member_id={}, commit_quorum={}, tid={}", pg_id, + boost::uuids::to_string(member_id), commit_quorum, tid); + auto result = ho_.remove_member(pg_id, member_id, commit_quorum, tid).get(); + if (!result) { + // Some times remove member may fail with RETRY_REQUEST if the target member is not responding, + // in this case return 503 so that the caller can retry later. + auto code = result.error() == PGError::RETRY_REQUEST ? Pistache::Http::Code::Service_Unavailable + : Pistache::Http::Code::Internal_Server_Error; + response.send(code, fmt::format("Failed to remove member, err={}", result.error())); + return; + } + response.send(Pistache::Http::Code::Ok); + } catch (const std::exception& e) { + response.send(Pistache::Http::Code::Bad_Request, std::string("Invalid JSON: ") + e.what()); + } +} + +void HttpManager::clean_replace_member_task(const Pistache::Rest::Request& request, + Pistache::Http::ResponseWriter response) { + try { + auto body = request.body(); + auto j = nlohmann::json::parse(body); + + std::string pg_id_str = j.at("pg_id").get< std::string >(); + pg_id_t pg_id = std::stoull(pg_id_str); + std::string task_id = j.at("task_id").get< std::string >(); + std::string commit_quorum_str = j.at("commit_quorum").get< std::string >(); + uint32_t commit_quorum = std::stoul(commit_quorum_str); + auto tid = generateRandomTraceId(); + LOGINFO("Clean replace member task, pg_id={}, task_id={}, commit_quorum={}, tid={}", pg_id, task_id, + commit_quorum, tid); + auto result = ho_.clean_replace_member_task(pg_id, task_id, commit_quorum, tid).get(); + if (!result) { + response.send(Pistache::Http::Code::Internal_Server_Error, + fmt::format("Failed to clean replace member task, err={}", result.error())); + return; + } + response.send(Pistache::Http::Code::Ok); + } catch (const std::exception& e) { + response.send(Pistache::Http::Code::Bad_Request, std::string("Invalid JSON: ") + e.what()); + } +} +void HttpManager::list_pg_replace_member_task(const Pistache::Rest::Request& request, + Pistache::Http::ResponseWriter response) { + auto tid = generateRandomTraceId(); + auto ret = ho_.list_all_replace_member_tasks(tid); + if (ret.hasError()) { + response.send(Pistache::Http::Code::Internal_Server_Error, + fmt::format("Failed to list replace member task, err={}", ret.error())); + return; + } + LOGINFO("list pg replace member tasks, count={}, tid={}", ret.value().size(), tid); + nlohmann::json j = nlohmann::json::array(); + for (const auto& task : ret.value()) { + nlohmann::json task_j; + task_j["task_id"] = task.task_id; + task_j["replica_out"] = to_string(task.replica_out); + task_j["replica_in"] = to_string(task.replica_in); + j.push_back(task_j); + } + response.send(Pistache::Http::Code::Ok, j.dump(2)); +} + +// This API is used to get the PG quorum status, typically used by CM to fix its view of the PG status after a pg move +// failure. +void HttpManager::get_pg_quorum(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto pg_id_str{request.query().get("pg_id")}; + if (pg_id_str == std::nullopt) { + response.send(Pistache::Http::Code::Bad_Request, "Missing pg_id query parameter"); + return; + } + pg_id_t pg_id = std::stoull(pg_id_str.value()); + PGStats stats; + if (ho_.get_stats(pg_id, stats)) { + nlohmann::json j; + j["pg_id"] = pg_id; + j["replica_set_uuid"] = boost::uuids::to_string(stats.replica_set_uuid); + j["leader"] = boost::uuids::to_string(stats.leader_id); + j["members"] = nlohmann::json::array(); + for (auto peer : stats.members) { + nlohmann::json member_j; + member_j["id"] = boost::uuids::to_string(peer.id); + member_j["name"] = peer.name; + member_j["can_vote"] = peer.can_vote; + member_j["last_commit_lsn"] = peer.last_commit_lsn; + member_j["last_succ_resp_us"] = peer.last_succ_resp_us; + j["members"].push_back(member_j); + } + response.send(Pistache::Http::Code::Ok, j.dump(2)); + } else { + response.send(Pistache::Http::Code::Internal_Server_Error, fmt::format("Failed to get pg quorum")); + } +} + +void HttpManager::exit_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto group_id_str{request.query().get("group_id")}; + auto peer_id_str{request.query().get("replica_id")}; + auto tid = generateRandomTraceId(); + if (group_id_str == std::nullopt || peer_id_str == std::nullopt) { + response.send(Pistache::Http::Code::Bad_Request, "Missing group_id or replica_id query parameter"); + return; + } + uuid_t group_id; + uuid_t peer_id; + try { + group_id = boost::uuids::string_generator()(group_id_str.value()); + peer_id = boost::uuids::string_generator()(peer_id_str.value()); + } catch (const std::runtime_error& e) { + response.send(Pistache::Http::Code::Bad_Request, "Invalid group_id or replica_id query parameter"); + return; + } + LOGINFO("Exit pg request received for group_id={}, peer_id={}, tid={}", group_id_str.value(), peer_id_str.value(), + tid); + auto ret = ho_.exit_pg(group_id, peer_id, tid); + if (ret.hasError()) { + response.send(Pistache::Http::Code::Internal_Server_Error, + fmt::format("Failed to list replace member task, err={}", ret.error())); + return; + } + response.send(Pistache::Http::Code::Ok, "Exit pg request submitted"); +} + #ifdef _PRERELEASE void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { std::string crash_type; diff --git a/src/lib/homestore_backend/hs_http_manager.hpp b/src/lib/homestore_backend/hs_http_manager.hpp index 62d45f1ad..f0ef2567f 100644 --- a/src/lib/homestore_backend/hs_http_manager.hpp +++ b/src/lib/homestore_backend/hs_http_manager.hpp @@ -33,6 +33,12 @@ class HttpManager { void dump_chunk(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void dump_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void get_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void flip_learner_flag(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void remove_member(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void clean_replace_member_task(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void list_pg_replace_member_task(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_pg_quorum(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void exit_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); #ifdef _PRERELEASE void crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 340bf3210..e38dc146e 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -7,6 +7,8 @@ #include "hs_homeobject.hpp" #include "replication_state_machine.hpp" +#include + using namespace homestore; namespace homeobject { PGError toPgError(ReplServiceError const& e) { @@ -340,7 +342,7 @@ replica_member_info HSHomeObject::to_replica_member_info(const PGMember& pg_memb void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const std::string& task_id, const replica_member_info& member_out, const replica_member_info& member_in, trace_id_t tid) { - auto lg = std::shared_lock(_pg_lock); + std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; if (pg_repl_dev(*pg).group_id() == group_id) { @@ -377,7 +379,7 @@ void HSHomeObject::on_pg_start_replace_member(group_id_t group_id, const std::st void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std::string& task_id, const replica_member_info& member_out, const replica_member_info& member_in, trace_id_t tid) { - auto lg = std::shared_lock(_pg_lock); + std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; if (pg_repl_dev(*pg).group_id() == group_id) { @@ -385,21 +387,7 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std: auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); pg->pg_info_.members.erase(PGMember(member_out.id)); pg->pg_info_.members.emplace(std::move(to_pg_member(member_in))); - - uint32_t i{0}; - pg_members* sb_members = hs_pg->pg_sb_->get_pg_members_mutable(); - for (auto const& m : pg->pg_info_.members) { - sb_members[i].id = m.id; - DEBUG_ASSERT(m.name.size() <= PGMember::max_name_len, "member name exceeds max len, name={}", m.name); - auto name_len = std::min(m.name.size(), PGMember::max_name_len); - std::strncpy(sb_members[i].name, m.name.c_str(), name_len); - sb_members[i].name[name_len] = '\0'; - sb_members[i].priority = m.priority; - ++i; - } - hs_pg->pg_sb_->num_dynamic_members = pg->pg_info_.members.size(); - // Update the latest membership info to pg superblk. - hs_pg->pg_sb_.write(); + hs_pg->update_membership(pg->pg_info_.members); LOGI("PG complete replace member done member_out={} member_in={}, member_nums={}, trace_id={}", boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), pg->pg_info_.members.size(), tid); @@ -410,6 +398,27 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std: boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); } +void HSHomeObject::on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid) { + LOGINFO("PG remove member, member={}, trace_id={}", boost::uuids::to_string(member), tid); + std::unique_lock lck(_pg_lock); + for (const auto& iter : _pg_map) { + auto& pg = iter.second; + auto& repl_dev = pg_repl_dev(*pg); + if (repl_dev.group_id() == group_id) { + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); + if (pg->pg_info_.members.erase(PGMember(member)) > 0) { + hs_pg->update_membership(pg->pg_info_.members); + LOGI("PG remove member done member={} member_nums={}, trace_id={}", boost::uuids::to_string(member), + pg->pg_info_.members.size(), tid); + } else { + LOGI("PG remove member done, member doesn't exist, member={} member_nums={}, trace_id={}", + boost::uuids::to_string(member), pg->pg_info_.members.size(), tid); + } + return; + } + } +} + PGReplaceMemberStatus HSHomeObject::_get_replace_member_status(pg_id_t id, std::string& task_id, const PGMember& old_member, const PGMember& new_member, const std::vector< PGMember >& others, @@ -432,6 +441,137 @@ PGReplaceMemberStatus HSHomeObject::_get_replace_member_status(pg_id_t id, std:: return ret; } +PGManager::NullAsyncResult HSHomeObject::_flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t tid) { + if (is_shutting_down()) { + LOGI("service is being shut down, trace_id={}", tid); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + + auto hs_pg = get_hs_pg(pg_id); + if (hs_pg == nullptr) { + decr_pending_request_num(); + return folly::makeUnexpected(PGError::UNKNOWN_PG); + } + + auto& repl_dev = pg_repl_dev(*hs_pg); + if (!repl_dev.is_leader() && commit_quorum == 0) { + // Only leader can replace a member + decr_pending_request_num(); + return folly::makeUnexpected(PGError::NOT_LEADER); + } + auto group_id = repl_dev.group_id(); + + LOGI("PG flip learner flag to {}, pg_id={} member={} trace_id={}", is_learner, pg_id, + boost::uuids::to_string(member_id), tid); + + replica_member_info replica; + replica.id = member_id; + + return hs_repl_service() + .flip_learner_flag(group_id, replica, is_learner, commit_quorum, true, tid) + .via(executor_) + .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { + decr_pending_request_num(); + if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } + LOGI("PG flip learner flag done"); + return folly::Unit(); + }); +} + +PGManager::NullAsyncResult HSHomeObject::_remove_member(pg_id_t pg_id, peer_id_t const& member_id, + uint32_t commit_quorum, trace_id_t tid) { + if (is_shutting_down()) { + LOGI("service is being shut down, trace_id={}", tid); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + + auto hs_pg = get_hs_pg(pg_id); + if (hs_pg == nullptr) { + decr_pending_request_num(); + return folly::makeUnexpected(PGError::UNKNOWN_PG); + } + + auto& repl_dev = pg_repl_dev(*hs_pg); + if (!repl_dev.is_leader() && commit_quorum == 0) { + // Only leader can replace a member + decr_pending_request_num(); + return folly::makeUnexpected(PGError::NOT_LEADER); + } + auto group_id = repl_dev.group_id(); + + LOGI("PG remove member, pg_id={}, member={} trace_id={}", pg_id, boost::uuids::to_string(member_id), tid); + return hs_repl_service() + .remove_member(group_id, member_id, commit_quorum, tid) + .via(executor_) + .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { + decr_pending_request_num(); + if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } + return folly::Unit(); + }); +} + +PGManager::NullAsyncResult HSHomeObject::_clean_replace_member_task(pg_id_t pg_id, std::string& task_id, + uint32_t commit_quorum, trace_id_t tid) { + if (is_shutting_down()) { + LOGI("service is being shut down, trace_id={}", tid); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + + auto hs_pg = get_hs_pg(pg_id); + if (hs_pg == nullptr) { + decr_pending_request_num(); + return folly::makeUnexpected(PGError::UNKNOWN_PG); + } + + auto& repl_dev = pg_repl_dev(*hs_pg); + if (!repl_dev.is_leader() && commit_quorum == 0) { + // Only leader can replace a member + decr_pending_request_num(); + return folly::makeUnexpected(PGError::NOT_LEADER); + } + auto group_id = repl_dev.group_id(); + + LOGI("PG clean replace member task, pg={}, task={} trace_id={}", pg_id, task_id, tid); + return hs_repl_service() + .clean_replace_member_task(group_id, task_id, commit_quorum, tid) + .via(executor_) + .thenValue([this](auto&& v) mutable -> PGManager::NullAsyncResult { + decr_pending_request_num(); + if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); } + return folly::Unit(); + }); +} + +PGManager::Result< std::vector< replace_member_task > > HSHomeObject::_list_all_replace_member_tasks(trace_id_t tid) { + if (is_shutting_down()) { + LOGI("service is being shut down, trace_id={}", tid); + return folly::makeUnexpected(PGError::SHUTTING_DOWN); + } + incr_pending_request_num(); + auto ret = hs_repl_service().list_replace_member_tasks(); + if (ret.hasError()) { + LOGE("Failed to list replace member tasks, error={}", ret.error()); + decr_pending_request_num(); + return folly::makeUnexpected(toPgError(ret.error())); + } + + // Convert homestore::replace_member_task to homeobject::replace_member_task + std::vector< replace_member_task > result; + for (const auto& hs_task : ret.value()) { + replace_member_task task; + task.task_id = hs_task.task_id; + task.replica_out = hs_task.replica_out; + task.replica_in = hs_task.replica_in; + result.push_back(std::move(task)); + } + decr_pending_request_num(); + return result; +} + std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(group_id_t group_id) const { auto lg = std::shared_lock(_pg_lock); auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) { @@ -472,6 +612,51 @@ bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine return true; } +// exit_pg is called when a pg/repl_dev is leaked, either during member leaving or a member failed to join the cluster. +// This function will try to destroy both pg and repl_dev. Since pg and repl_dev may be inconsistent, so ignore not +// found error to make it idempotent and retriable. Although pg_destroy will be called in the on_destroy cb when leaving +// the group, we still need to call pg_destroy here to make sure pg is cleaned in case of repl_dev destroy +// failed(http://github.com/eBay/HomeStore/issues/823). +PGManager::NullResult HSHomeObject::_exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t tid) { + if (group_id == boost::uuids::nil_uuid()) { + LOGI("group_id is nil, nothing to exit, trace_id={}", tid); + return folly::makeUnexpected(PGError::INVALID_ARG); + } + pg_id_t pg_id{0}; + { + auto lg = std::shared_lock(_pg_lock); + auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) { + return pg_repl_dev(*entry.second).group_id() == group_id; + }); + if (iter != _pg_map.end()) { + pg_id = iter->first; + } else { + // There is a known case during adding member: the new member may think itself already in group but actually + // not, so the pg is not created yet. + LOGI("no pg found, group_id={}, trace_id={}", group_id, tid); + } + } + if (pg_id != 0 && !pg_destroy(pg_id)) { + // don't need to pause state machine here, this api is called during member leaving or the member is not in the + // cluster actually. + LOGE("failed to destroy pg={}, group_id={}, trace_id={}", pg_id, group_id, tid); + return folly::makeUnexpected(PGError::UNKNOWN); + } + LOGI("pg is cleaned, going to destroy repl_dev, group_id={}, trace_id={}", group_id, tid); + // TODO pass peer_id into destroy_repl_dev for peer validation + // destroy_repl_dev will leave raft group + auto ret = hs_repl_service().destroy_repl_dev(group_id); + if (ret == ReplServiceError::SERVER_NOT_FOUND) { + LOGW("repl dev not found, ignore, group_id={}, trace_id={}", group_id, tid); + return folly::Unit(); + } + if (ret != ReplServiceError::OK) { + LOGE("Failed to destroy repl dev for group_id={}, error={}, trace_id={}", group_id, ret, tid); + return folly::makeUnexpected(toPgError(ret)); + } + return folly::Unit(); +} + bool HSHomeObject::pause_pg_state_machine(pg_id_t pg_id) { LOGI("Pause pg state machine, pg={}", pg_id); auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id)); @@ -806,6 +991,24 @@ std::vector< Shard > HSHomeObject::HS_PG::get_chunk_shards(chunk_num_t v_chunk_i return ret; } +void HSHomeObject::HS_PG::update_membership(const MemberSet& members) { + uint32_t i{0}; + pg_members* sb_members = pg_sb_->get_pg_members_mutable(); + for (auto const& m : members) { + sb_members[i].id = m.id; + DEBUG_ASSERT(m.name.size() <= PGMember::max_name_len, "member name exceeds max len, name={}", m.name); + auto name_len = std::min(m.name.size(), PGMember::max_name_len); + std::strncpy(sb_members[i].name, m.name.c_str(), name_len); + sb_members[i].name[name_len] = '\0'; + sb_members[i].priority = m.priority; + ++i; + } + pg_sb_->num_dynamic_members = members.size(); + // Update the latest membership info to pg superblk. + pg_sb_.write(); + LOGI("PG membership updated, member_nums={}", pg_sb_->num_dynamic_members); +} + // NOTE: caller should hold the _pg_lock const HSHomeObject::HS_PG* HSHomeObject::_get_hs_pg_unlocked(pg_id_t pg_id) const { auto iter = _pg_map.find(pg_id); diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 94f0f5e24..880a62af9 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -286,7 +286,7 @@ void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) } void ReplicationStateMachine::on_remove_member(const homestore::replica_id_t& member, trace_id_t tid) { - // TODO: Implement this + home_object_->on_remove_member(repl_dev()->group_id(), member, tid); } homestore::AsyncReplResult<> diff --git a/src/lib/homestore_backend/tests/CMakeLists.txt b/src/lib/homestore_backend/tests/CMakeLists.txt index b43d4709b..a40812ab3 100644 --- a/src/lib/homestore_backend/tests/CMakeLists.txt +++ b/src/lib/homestore_backend/tests/CMakeLists.txt @@ -19,7 +19,7 @@ target_sources(homestore_tests_misc PRIVATE test_homestore_backend.cpp homeobj_m target_link_libraries(homestore_tests_misc homeobject_homestore ${COMMON_TEST_DEPS}) add_library(homestore_tests_dynamic OBJECT) -target_sources(homestore_tests_dynamic PRIVATE test_homestore_backend_dynamic.cpp) +target_sources(homestore_tests_dynamic PRIVATE test_homestore_backend_dynamic.cpp homeobj_fixture_http.hpp) target_link_libraries(homestore_tests_dynamic homeobject_homestore ${COMMON_TEST_DEPS}) add_executable(test_heap_chunk_selector) diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 33acfde1c..b1404793e 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -589,7 +589,8 @@ class HomeObjectFixture : public ::testing::Test { auto in_member = PGMember(in_member_id, "in_member"); auto out = hs_pg->pg_info_.members.find(out_member); auto in = hs_pg->pg_info_.members.find(in_member); - RELEASE_ASSERT(hs_pg->pg_info_.members.size() == 4, "Invalid pg member size"); + RELEASE_ASSERT(hs_pg->pg_info_.members.size() == 4, "Invalid pg member size={}", + hs_pg->pg_info_.members.size()); if (in == hs_pg->pg_info_.members.end()) { LOGERROR("in_member not found, in_member={}", boost::uuids::to_string(in_member_id)); return false; @@ -660,6 +661,39 @@ class HomeObjectFixture : public ::testing::Test { return true; } + bool verify_rollback_replace_member_result(pg_id_t pg_id, std::string& task_id, peer_id_t out_member_id, + peer_id_t in_member_id) { + auto hs_pg = _obj_inst->get_hs_pg(pg_id); + RELEASE_ASSERT(hs_pg, "PG not found"); + RELEASE_ASSERT(hs_pg->pg_info_.members.size() == 3, "Invalid pg member size"); + auto out_member = PGMember(out_member_id, "out_member"); + auto in_member = PGMember(in_member_id, "in_member"); + auto in = hs_pg->pg_info_.members.find(in_member); + auto out = hs_pg->pg_info_.members.find(out_member); + if (in != hs_pg->pg_info_.members.end()) { + LOGERROR("in_member still in pg, in_member={}", boost::uuids::to_string(in_member_id)); + return false; + } + if (out == hs_pg->pg_info_.members.end()) { + LOGERROR("Out member not exists in PG"); + return false; + } + + // verify task + auto r = _obj_inst->pg_manager()->list_all_replace_member_tasks(0); + RELEASE_ASSERT(r.hasValue(), "Failed to list_all_replace_member_tasks"); + const auto& tasks = r.value(); + bool found = std::any_of(tasks.cbegin(), tasks.cend(), [&task_id](const homeobject::replace_member_task& task) { + return task.task_id == task_id; + }); + if (found) { + LOGI("Task with ID '{}' was found.", task_id); + return false; + } + LOGI("Task with ID '{}' was not found on this member", task_id); + return true; + } + void run_on_pg_leader(pg_id_t pg_id, auto&& lambda) { PGStats pg_stats; auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats); @@ -726,6 +760,20 @@ class HomeObjectFixture : public ::testing::Test { } } + peer_id_t get_group_id(pg_id_t pg_id) const { + auto pg = _obj_inst->get_hs_pg(pg_id); + if (!pg) { + LOGW("pg not found, pg_id={}", pg_id); + return uuids::nil_uuid(); + } + auto repl_dev = pg->repl_dev_; + if (!repl_dev) { + LOGW("repl_dev is null, pg_id={}", pg_id); + return uuids::nil_uuid(); + } + return repl_dev->group_id(); + } + private: bool pg_exist(pg_id_t pg_id) { std::vector< pg_id_t > pg_ids; diff --git a/src/lib/homestore_backend/tests/homeobj_fixture_http.hpp b/src/lib/homestore_backend/tests/homeobj_fixture_http.hpp new file mode 100644 index 000000000..1a9725502 --- /dev/null +++ b/src/lib/homestore_backend/tests/homeobj_fixture_http.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include + +class HttpHelper { +public: + HttpHelper(const std::string& host, uint16_t port) { + auto opts = Pistache::Http::Experimental::Client::options().threads(1).maxConnectionsPerHost(8); + client_.init(opts); + server_addr_ = "http://" + host + ":" + std::to_string(port); + } + + ~HttpHelper() { client_.shutdown(); } + + Pistache::Http::Response get(const std::string& resource) { + auto resp = client_.get(server_addr_ + resource).send(); + Pistache::Http::Response response; + resp.then([&](Pistache::Http::Response r) { response = r; }, Pistache::Async::Throw); + Pistache::Async::Barrier< Pistache::Http::Response > barrier(resp); + barrier.wait(); + return response; + } + + Pistache::Http::Response post(const std::string& resource, const std::string& body) { + auto resp = client_.post(server_addr_ + resource).body(body).send(); + Pistache::Http::Response response; + resp.then([&](Pistache::Http::Response r) { response = r; }, Pistache::Async::Throw); + Pistache::Async::Barrier< Pistache::Http::Response > barrier(resp); + barrier.wait(); + return response; + } + + Pistache::Http::Response del(const std::string& resource, const std::string& body) { + auto resp = client_.del(server_addr_ + resource).body(body).send(); + Pistache::Http::Response response; + resp.then([&](Pistache::Http::Response r) { response = r; }, Pistache::Async::Throw); + Pistache::Async::Barrier< Pistache::Http::Response > barrier(resp); + barrier.wait(); + return response; + } + +private: + Pistache::Http::Experimental::Client client_; + std::string server_addr_; +}; diff --git a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp index f6306c2af..38d283058 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp @@ -16,6 +16,7 @@ * Homeobject Replication testing binaries shared common definitions, apis and data structures */ #include "homeobj_fixture.hpp" +#include "homeobj_fixture_http.hpp" #define BEFORE_FIRST_SHARD_DONE "BEFORE_FIRST_SHARD_DONE" #define RECEIVING_SNAPSHOT "RECEIVING_SNAPSHOT" @@ -488,7 +489,7 @@ void HomeObjectFixture::ReplaceMember(bool withGC) { sleep(5); // wait for incremental append-log requests to complete } - // step 4: after completing replace member, verify the blob on all the members of this pg including the newly added + // step 4: after completing replace member, verify the blob on all members of this pg including the newly added // spare replica, run_if_in_pg(pg_id, [&]() { verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); @@ -751,6 +752,204 @@ void HomeObjectFixture::RestartLeaderDuringBaselineResyncUsingSigKill(uint64_t f g_helper->sync(); } +// In this test, the new member will be stuck at snapshot, at that moment we rollback the +// operation(remove new member, flip learner and clean the task). +TEST_F(HomeObjectFixture, RollbackReplaceMember) { + LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); + auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >(); + ASSERT_TRUE(spare_num_replicas > 0) << "we need spare replicas for homestore backend dynamic tests"; + + auto num_replicas = SISL_OPTIONS["replicas"].as< uint8_t >(); + auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); + auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; + pg_id_t pg_id{1}; + auto out_member_id = g_helper->replica_id(num_replicas - 1); + auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/ + auto http_enabled = SISL_OPTIONS["enable_http"].as< bool >(); + + // ======== Stage 1: Create a pg without spare replicas and put blobs ======== + std::unordered_set< uint8_t > excluding_replicas_in_pg; + for (size_t i = num_replicas; i < num_replicas + spare_num_replicas; i++) + excluding_replicas_in_pg.insert(i); + + create_pg(pg_id, 0 /* pg_leader */, excluding_replicas_in_pg); + + // we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to + // derive them by calculating. + // since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these + // derived info is used by all replicas(including the newly added member) to verify the blobs. + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + pg_blob_id[pg_id] = 0; + for (shard_id_t shard_id = 1; shard_id <= num_shards_per_pg; shard_id++) { + auto derived_shard_id = make_new_shard_id(pg_id, shard_id); + pg_shard_id_vec[pg_id].emplace_back(derived_shard_id); + } + + auto kill_until_shard = pg_shard_id_vec[pg_id].back(); + auto kill_until_blob = num_blobs_per_shard * num_shards_per_pg - 1; +#ifdef _PRERELEASE + flip::FlipCondition cond; + // will only delay the snapshot with blob id 7 during which shutdown will happen + m_fc.create_condition("blob_id", flip::Operator::EQUAL, static_cast< long >(7), &cond); + set_retval_flip("simulate_write_snapshot_save_blob_delay", static_cast< long >(10000) /*ms*/, 1, 100, cond); + // kill after the last blob in the first shard is replicated + kill_until_shard = pg_shard_id_vec[pg_id].front(); + kill_until_blob = num_blobs_per_shard - 1; +#endif + + for (uint64_t j = 0; j < num_shards_per_pg; j++) + create_shard(pg_id, 64 * Mi); + + // put and verify blobs in the pg, excluding the spare replicas + put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id, true, true); + + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); + + // SyncPoint1 all the replicas , including the spare ones, sync at this point + g_helper->sync(); + + // ======== Stage 2: replace a member ======== + std::string task_id = "task_id"; + run_on_pg_leader(pg_id, [&]() { + auto r = _obj_inst->pg_manager() + ->replace_member(pg_id, task_id, out_member_id, PGMember{in_member_id, "new_member", 0}) + .get(); + ASSERT_TRUE(r); + }); + + // ======== Stage 3: rollback the replace member during snapshot ======== + if (in_member_id == g_helper->my_replica_id()) { + while (!am_i_in_pg(pg_id)) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + LOGINFO("new member is waiting to become a member of pg={}", pg_id); + } + LOGDEBUG("wait for the data[shard={}, blob={}] replicated to the new member", kill_until_shard, + kill_until_blob); + wait_for_blob(kill_until_shard, kill_until_blob); + // SyncPoint2 + g_helper->sync(); + // SyncPoint3 + g_helper->sync(); + // SyncPoint4 + g_helper->sync(); + // SyncPoint5 + g_helper->sync(); + // SyncPoint6 + g_helper->sync(); + } else { + // SyncPoint2, verify intermediate status during replacement + g_helper->sync(); + ASSERT_TRUE(verify_start_replace_member_result(pg_id, task_id, out_member_id, in_member_id)); + // SyncPoint3 remove member + g_helper->sync(); + LOGINFO("about to remove new member") + run_on_pg_leader(pg_id, [&]() { + bool retry = true; + while (retry) { + if (http_enabled) { + HttpHelper http_helper("127.0.0.1", 5000 + g_helper->replica_num()); + nlohmann::json j; + j["pg_id"] = std::to_string(pg_id); + j["member_id"] = boost::uuids::to_string(in_member_id); + j["commit_quorum"] = "0"; + auto r = http_helper.del("/api/v1/member", j.dump()); + if (r.code() == Pistache::Http::Code::Ok) { + retry = false; + } else if (r.code() == Pistache::Http::Code::Service_Unavailable) { + LOGINFO("remove_member get RETRY_REQUEST error, will retry"); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } else { + ASSERT_FALSE(true); + } + } else { + auto r = _obj_inst->pg_manager()->remove_member(pg_id, in_member_id, 0, 0).get(); + // new member can not respond to remove_member request because it is stuck at snapshot, so we may + // get RETRY_REQUEST error here, but the remove_member takes effective after force removal timeout. + if (!r.hasError()) { + retry = false; + } else if (r.error() == PGError::RETRY_REQUEST) { + LOGINFO("remove_member get RETRY_REQUEST error, will retry"); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } else { + ASSERT_FALSE(true); + } + } + } + }); + // SyncPoint4 flip learner + g_helper->sync(); + LOGINFO("about to flip learner") + run_on_pg_leader(pg_id, [&]() { + if (http_enabled) { + HttpHelper http_helper("127.0.0.1", 5000 + g_helper->replica_num()); + nlohmann::json j; + j["pg_id"] = std::to_string(pg_id); + j["member_id"] = boost::uuids::to_string(out_member_id); + j["learner"] = "false"; + j["commit_quorum"] = "0"; + auto r = http_helper.post("/api/v1/flip_learner", j.dump()); + ASSERT_EQ(r.code(), Pistache::Http::Code::Ok); + } else { + auto r = _obj_inst->pg_manager()->flip_learner_flag(pg_id, out_member_id, false, 0, 0).get(); + ASSERT_FALSE(r.hasError()); + } + }); + // SyncPoint5 clean task + g_helper->sync(); + LOGINFO("about to clean task") + run_on_pg_leader(pg_id, [&]() { + if (!http_enabled) { + auto r = _obj_inst->pg_manager()->clean_replace_member_task(pg_id, task_id, 0, 0).get(); + ASSERT_FALSE(r.hasError()); + } else { + HttpHelper http_helper("127.0.0.1", 5000 + g_helper->replica_num()); + nlohmann::json j; + j["pg_id"] = std::to_string(pg_id); + j["task_id"] = task_id; + j["commit_quorum"] = "0"; + auto r = http_helper.del("/api/v1/pg_replacemember_task", j.dump()); + ASSERT_EQ(r.code(), Pistache::Http::Code::Ok); + } + }); + // wait for replication settled + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // SyncPoint6 verify + g_helper->sync(); + LOGINFO("verify rollback result") + ASSERT_TRUE(verify_rollback_replace_member_result(pg_id, task_id, out_member_id, in_member_id)); + } + // SyncPoint7 + g_helper->sync(); + LOGINFO("After rollback, wait for new member to leave pg={}", pg_id); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // Since in member is stuck in snapshot previouly, so it may not response to remove_member request, lead to being + // force removed after timeout. In that case, after it recovers, it can not discover itself being removed because + // is_catching_up flag is set, so we need to manually remove it. + uuid_t group_id = get_group_id(pg_id); + if (in_member_id == g_helper->my_replica_id()) { + while (am_i_in_pg(pg_id)) { + LOGINFO("New member is going to exit pg={}", pg_id); + if (http_enabled) { + HttpHelper http_helper("127.0.0.1", 5000 + g_helper->replica_num()); + auto resource = "/api/v1/pg?group_id=" + boost::uuids::to_string(group_id) + + "&replica_id=" + boost::uuids::to_string(g_helper->my_replica_id_); + auto r = http_helper.del(resource, ""); + ASSERT_EQ(r.code(), Pistache::Http::Code::Ok); + } else { + _obj_inst->pg_manager()->exit_pg(group_id, g_helper->my_replica_id(), 0); + } + LOGINFO("New member is waiting to leave pg={}", pg_id); + } + // Test idempotence of exit_pg + LOGINFO("Try to call exit_pg again pg={}", pg_id); + auto ret = _obj_inst->pg_manager()->exit_pg(group_id, g_helper->my_replica_id(), 0); + ASSERT_FALSE(ret.hasError()); + } + g_helper->sync(); +} + SISL_OPTION_GROUP( test_homeobject_repl_common, (spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"), diff --git a/src/lib/memory_backend/mem_homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp index 794819d8f..e6704953c 100644 --- a/src/lib/memory_backend/mem_homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -62,6 +62,16 @@ class MemoryHomeObject : public HomeObjectImpl { HomeObjectStats _get_stats() const override; void _destroy_pg(pg_id_t pg_id) override; + PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) override; + + PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) override; + PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum, + trace_id_t trace_id) override; + PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum, + trace_id_t trace_id) override; + PGManager::Result< std::vector< replace_member_task > > + _list_all_replace_member_tasks(trace_id_t trace_id) override; ShardIndex& _find_index(shard_id_t) const; diff --git a/src/lib/memory_backend/mem_pg_manager.cpp b/src/lib/memory_backend/mem_pg_manager.cpp index c128e0a01..943d208fd 100644 --- a/src/lib/memory_backend/mem_pg_manager.cpp +++ b/src/lib/memory_backend/mem_pg_manager.cpp @@ -83,4 +83,48 @@ void MemoryHomeObject::_destroy_pg(pg_id_t pg_id) { _pg_map.erase(pg_id); } +PGManager::NullResult MemoryHomeObject::_exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) { + auto lg = std::unique_lock(_pg_lock); + auto iter = std::find_if(_pg_map.begin(), _pg_map.end(), [group_id](const auto& entry) { + return entry.second->pg_info_.replica_set_uuid == group_id; + }); + if (iter != _pg_map.end()) { _pg_map.erase(iter); } + return folly::Unit(); +} + +PGManager::NullAsyncResult MemoryHomeObject::_flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, + bool is_learner, uint32_t commit_quorum, + trace_id_t trace_id) { + (void)pg_id; + (void)member_id; + (void)is_learner; + (void)commit_quorum; + (void)trace_id; + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +} + +PGManager::NullAsyncResult MemoryHomeObject::_remove_member(pg_id_t pg_id, peer_id_t const& member_id, + uint32_t commit_quorum, trace_id_t trace_id) { + (void)pg_id; + (void)member_id; + (void)commit_quorum; + (void)trace_id; + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +} + +PGManager::NullAsyncResult MemoryHomeObject::_clean_replace_member_task(pg_id_t pg_id, std::string& task_id, + uint32_t commit_quorum, trace_id_t trace_id) { + (void)pg_id; + (void)task_id; + (void)commit_quorum; + (void)trace_id; + return folly::makeSemiFuture< PGManager::NullResult >(folly::makeUnexpected(PGError::UNSUPPORTED_OP)); +} + +PGManager::Result< std::vector< replace_member_task > > +MemoryHomeObject::_list_all_replace_member_tasks(trace_id_t trace_id) { + (void)trace_id; + return folly::makeUnexpected(PGError::UNSUPPORTED_OP); +} + } // namespace homeobject diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index 4d9e272e4..47dc4a4f1 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -44,4 +44,28 @@ PGReplaceMemberStatus HomeObjectImpl::get_replace_member_status(pg_id_t id, std: bool HomeObjectImpl::get_stats(pg_id_t id, PGStats& stats) const { return _get_stats(id, stats); } void HomeObjectImpl::get_pg_ids(std::vector< pg_id_t >& pg_ids) const { return _get_pg_ids(pg_ids); } void HomeObjectImpl::destroy_pg(pg_id_t pg_id) { return _destroy_pg(pg_id); } +PGManager::NullResult HomeObjectImpl::exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) { + return _exit_pg(group_id, peer_id, trace_id); +} + +PGManager::NullAsyncResult HomeObjectImpl::flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner, + uint32_t commit_quorum, trace_id_t trace_id) { + return _flip_learner_flag(pg_id, member_id, is_learner, commit_quorum, trace_id); +} + +PGManager::NullAsyncResult HomeObjectImpl::remove_member(pg_id_t pg_id, peer_id_t const& member_id, + uint32_t commit_quorum, trace_id_t trace_id) { + return _remove_member(pg_id, member_id, commit_quorum, trace_id); +} + +PGManager::NullAsyncResult HomeObjectImpl::clean_replace_member_task(pg_id_t pg_id, std::string& task_id, + uint32_t commit_quorum, trace_id_t trace_id) { + return _clean_replace_member_task(pg_id, task_id, commit_quorum, trace_id); +} + +PGManager::Result< std::vector< replace_member_task > > +HomeObjectImpl::list_all_replace_member_tasks(trace_id_t trace_id) { + return _list_all_replace_member_tasks(trace_id); +} + } // namespace homeobject