Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "3.0.14"
version = "3.0.15"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
116 changes: 88 additions & 28 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace homeobject {
ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP,
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN, ROLL_BACK, CANCELLED,
QUORUM_NOT_MET);
ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND, UNKNOWN);
ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND,
UNKNOWN);
// https://github.corp.ebay.com/SDS/nuobject_proto/blob/main/src/proto/pg.proto#L52
ENUM(PGStateMask, uint32_t, HEALTHY = 0, DISK_DOWN = 0x1, SCRUBBING = 0x2, BASELINE_RESYNC = 0x4, INCONSISTENT = 0x8,
REPAIR = 0x10, GC_IN_PROGRESS = 0x20, RESYNCING = 0x40);
Expand All @@ -40,6 +41,12 @@ struct PGMember {

using MemberSet = std::set< PGMember >;

struct replace_member_task {
std::string task_id; // Unique task id for this replace member operation
uuid_t replica_out; // The replica which is going to be replaced
uuid_t replica_in; // The replica which is going to be added in place of replica_out
};

struct PGInfo {
explicit PGInfo(pg_id_t _id) : id(_id) {}
pg_id_t id;
Expand All @@ -55,14 +62,10 @@ struct PGInfo {

// check if the PGInfo has same id, size and members with the rhs PGInfo.
bool is_equivalent_to(PGInfo const& rhs) const {
if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) {
return false;
}
if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) { return false; }
for (auto const& m : members) {
auto it = rhs.members.find(m);
if (it == rhs.members.end() || it->priority != m.priority) {
return false;
}
if (it == rhs.members.end() || it->priority != m.priority) { return false; }
}
return true;
}
Expand All @@ -72,13 +75,13 @@ struct PGInfo {
uint32_t i = 0ul;
for (auto const& m : members) {
if (i++ > 0) { members_str += ", "; }
members_str += fmt::format("member-{}: id={}, name={}, priority={}",
i, boost::uuids::to_string(m.id), m.name, m.priority);
members_str += fmt::format("member-{}: id={}, name={}, priority={}", i, boost::uuids::to_string(m.id),
m.name, m.priority);
}
return fmt::format("PGInfo: id={}, replica_set_uuid={}, size={}, chunk_size={}, "
"expected_member_num={}, members={}",
id, boost::uuids::to_string(replica_set_uuid), size, chunk_size,
expected_member_num, members_str);
id, boost::uuids::to_string(replica_set_uuid), size, chunk_size, expected_member_num,
members_str);
}
};

Expand All @@ -91,25 +94,19 @@ struct peer_info {
};

struct pg_state {
std::atomic<uint64_t> state{0};
std::atomic< uint64_t > state{0};

explicit pg_state(uint64_t s) : state{s} {}

void set_state(PGStateMask mask) {
state.fetch_or(static_cast<uint64_t>(mask), std::memory_order_relaxed);
}
void set_state(PGStateMask mask) { state.fetch_or(static_cast< uint64_t >(mask), std::memory_order_relaxed); }

void clear_state(PGStateMask mask) {
state.fetch_and(~static_cast<uint64_t>(mask), std::memory_order_relaxed);
}
void clear_state(PGStateMask mask) { state.fetch_and(~static_cast< uint64_t >(mask), std::memory_order_relaxed); }

bool is_state_set(PGStateMask mask) const {
return (state.load(std::memory_order_relaxed) & static_cast<uint64_t>(mask)) != 0;
return (state.load(std::memory_order_relaxed) & static_cast< uint64_t >(mask)) != 0;
}

uint64_t get() const {
return state.load(std::memory_order_relaxed);
}
uint64_t get() const { return state.load(std::memory_order_relaxed); }
};

struct PGStats {
Expand Down Expand Up @@ -171,12 +168,13 @@ struct PGReplaceMemberStatus {
class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info, trace_id_t tid = 0) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum = 0, trace_id_t tid = 0) = 0;
virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id, const PGMember& old_member,
const PGMember& new_member,
const std::vector< PGMember >& others,
uint64_t trace_id = 0) const = 0;
virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum = 0,
trace_id_t tid = 0) = 0;
virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id,
const PGMember& old_member, const PGMember& new_member,
const std::vector< PGMember >& others,
uint64_t trace_id = 0) const = 0;

/**
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.
Expand All @@ -201,6 +199,68 @@ class PGManager : public Manager< PGError > {
* @param pg_id The ID of the PG.
*/
virtual void destroy_pg(pg_id_t pg_id) = 0;

/**
* @brief Single member exits a PG (Placement Group) identified by its ID.
* @param group_id The group ID of the pg.
* @param peer_id The peer ID of the member exiting the PG.
* @param trace_id The trace identifier for logging and tracking purposes.
*/
virtual NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, uint64_t trace_id) = 0;

/**
* @brief Toggle the learner flag for a specified member.
*
* This function changes the state of the learner flag for a given member in the PG.
* It is typically used to revert the learner flag back to false when rolling back pgmove.
*
* @param pg_id The ID of the PG where the member resides.
* @param member_id The ID of the member whose learner flag is to be toggled.
* @param is_learner The new state of the learner flag (true to set as learner, false to unset).
* @param commit_quorum The quorum required for committing the change.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

/**
* @brief Remove a member from the PG.
*
* This function removes a specified member from the PG, typically used to rollback the pgmove operation.
*
* @param pg_id The ID of the PG from which the member is to be removed.
* @param member_id The ID of the member to be removed.
* @param commit_quorum The quorum required for committing the removal.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

/**
* @brief Clean up the replace member task in the PG.
*
* This function cleans up the replace member task, typically used to rollback the pgmove operation.
*
* @param pg_id The ID of the PG where the task is to be cleaned.
* @param task_id The ID of the task to be cleaned.
* @param commit_quorum The quorum required for committing the task cleanup.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

/**
* @brief List all replace member tasks happening on this homeobject instance.
*
* This function retrieves a list of all ongoing tasks on this homeobject instance.
*
* @param trace_id The trace ID for tracking the operation.
* @return Result containing a vector of replace member tasks.
*/
virtual Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) = 0;
};

} // namespace homeobject
22 changes: 22 additions & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ class HomeObjectImpl : public HomeObject,

virtual void _destroy_pg(pg_id_t pg_id) = 0;

virtual PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

virtual PGManager::Result< std::vector< replace_member_task > >
_list_all_replace_member_tasks(trace_id_t trace_id) = 0;

protected:
std::mutex _repl_lock;
peer_id_t _our_id;
Expand Down Expand Up @@ -164,6 +178,14 @@ class HomeObjectImpl : public HomeObject,
bool get_stats(pg_id_t id, PGStats& stats) const final;
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
void destroy_pg(pg_id_t pg_id) final;
PGManager::NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) final;
PGManager::NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) final;
PGManager::NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) final;
PGManager::NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) final;
PGManager::Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) final;

/// ShardManager
ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid) const final;
Expand Down
11 changes: 11 additions & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ add_test(NAME HomestoreResyncTestWithLeaderRestart
--override_config homestore_config.consensus.replace_member_sync_check_interval_ms=1000
--override_config homestore_config.consensus.laggy_threshold=2000
--gtest_filter=HomeObjectFixture.RestartLeader*)
#add_test(NAME HomestoreReplaceMemberRollbackTest
# COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
# --override_config homestore_config.consensus.snapshot_freq_distance=13
# --override_config homestore_config.consensus.num_reserved_log_items=13
# --override_config homestore_config.resource_limits.raft_logstore_reserve_threshold=13
# --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
# --override_config homestore_config.generic.repl_dev_cleanup_interval_sec=5
# --override_config homestore_config.consensus.max_grpc_message_size=138412032
# --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=60000
# --override_config homestore_config.consensus.laggy_threshold=1
# --gtest_filter=HomeObjectFixture.RollbackReplaceMember)

# GC tests
add_test(NAME FetchDataWithOriginatorGC
Expand Down
17 changes: 17 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ class HSHomeObject : public HomeObjectImpl {

HomeObjectStats _get_stats() const override;
void _destroy_pg(pg_id_t pg_id) override;
PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) override;

PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) override;
PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) override;
PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) override;
PGManager::Result< std::vector< replace_member_task > >
_list_all_replace_member_tasks(trace_id_t trace_id) override;

// Mapping from index table uuid to pg id.
std::shared_mutex index_lock_;
Expand Down Expand Up @@ -367,6 +377,11 @@ class HSHomeObject : public HomeObjectImpl {
* Returns all shards
*/
std::vector< Shard > get_chunk_shards(homestore::chunk_num_t v_chunk_id) const;

/**
* Update membership in pg's superblock.
*/
void update_membership(const MemberSet& members);
};

struct HS_Shard : public Shard {
Expand Down Expand Up @@ -778,6 +793,8 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid);

void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
Expand Down
Loading