Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "3.0.14"
version = "3.0.15"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
116 changes: 88 additions & 28 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace homeobject {
ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP,
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN, ROLL_BACK, CANCELLED,
QUORUM_NOT_MET);
ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND, UNKNOWN);
ENUM(PGReplaceMemberTaskStatus, uint16_t, COMPLETED = 0, IN_PROGRESS, NOT_LEADER, TASK_ID_MISMATCH, TASK_NOT_FOUND,
UNKNOWN);
// https://github.corp.ebay.com/SDS/nuobject_proto/blob/main/src/proto/pg.proto#L52
ENUM(PGStateMask, uint32_t, HEALTHY = 0, DISK_DOWN = 0x1, SCRUBBING = 0x2, BASELINE_RESYNC = 0x4, INCONSISTENT = 0x8,
REPAIR = 0x10, GC_IN_PROGRESS = 0x20, RESYNCING = 0x40);
Expand All @@ -40,6 +41,12 @@ struct PGMember {

using MemberSet = std::set< PGMember >;

struct replace_member_task {
std::string task_id; // Unique task id for this replace member operation
uuid_t replica_out; // The replica which is going to be replaced
uuid_t replica_in; // The replica which is going to be added in place of replica_out
};

struct PGInfo {
explicit PGInfo(pg_id_t _id) : id(_id) {}
pg_id_t id;
Expand All @@ -55,14 +62,10 @@ struct PGInfo {

// check if the PGInfo has same id, size and members with the rhs PGInfo.
bool is_equivalent_to(PGInfo const& rhs) const {
if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) {
return false;
}
if (id != rhs.id || size != rhs.size || members.size() != rhs.members.size()) { return false; }
for (auto const& m : members) {
auto it = rhs.members.find(m);
if (it == rhs.members.end() || it->priority != m.priority) {
return false;
}
if (it == rhs.members.end() || it->priority != m.priority) { return false; }
}
return true;
}
Expand All @@ -72,13 +75,13 @@ struct PGInfo {
uint32_t i = 0ul;
for (auto const& m : members) {
if (i++ > 0) { members_str += ", "; }
members_str += fmt::format("member-{}: id={}, name={}, priority={}",
i, boost::uuids::to_string(m.id), m.name, m.priority);
members_str += fmt::format("member-{}: id={}, name={}, priority={}", i, boost::uuids::to_string(m.id),
m.name, m.priority);
}
return fmt::format("PGInfo: id={}, replica_set_uuid={}, size={}, chunk_size={}, "
"expected_member_num={}, members={}",
id, boost::uuids::to_string(replica_set_uuid), size, chunk_size,
expected_member_num, members_str);
id, boost::uuids::to_string(replica_set_uuid), size, chunk_size, expected_member_num,
members_str);
}
};

Expand All @@ -91,25 +94,19 @@ struct peer_info {
};

struct pg_state {
std::atomic<uint64_t> state{0};
std::atomic< uint64_t > state{0};

explicit pg_state(uint64_t s) : state{s} {}

void set_state(PGStateMask mask) {
state.fetch_or(static_cast<uint64_t>(mask), std::memory_order_relaxed);
}
void set_state(PGStateMask mask) { state.fetch_or(static_cast< uint64_t >(mask), std::memory_order_relaxed); }

void clear_state(PGStateMask mask) {
state.fetch_and(~static_cast<uint64_t>(mask), std::memory_order_relaxed);
}
void clear_state(PGStateMask mask) { state.fetch_and(~static_cast< uint64_t >(mask), std::memory_order_relaxed); }

bool is_state_set(PGStateMask mask) const {
return (state.load(std::memory_order_relaxed) & static_cast<uint64_t>(mask)) != 0;
return (state.load(std::memory_order_relaxed) & static_cast< uint64_t >(mask)) != 0;
}

uint64_t get() const {
return state.load(std::memory_order_relaxed);
}
uint64_t get() const { return state.load(std::memory_order_relaxed); }
};

struct PGStats {
Expand Down Expand Up @@ -171,12 +168,13 @@ struct PGReplaceMemberStatus {
class PGManager : public Manager< PGError > {
public:
virtual NullAsyncResult create_pg(PGInfo&& pg_info, trace_id_t tid = 0) = 0;
virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member, PGMember const& new_member,
u_int32_t commit_quorum = 0, trace_id_t tid = 0) = 0;
virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id, const PGMember& old_member,
const PGMember& new_member,
const std::vector< PGMember >& others,
uint64_t trace_id = 0) const = 0;
virtual NullAsyncResult replace_member(pg_id_t id, std::string& task_id, peer_id_t const& old_member,
PGMember const& new_member, u_int32_t commit_quorum = 0,
trace_id_t tid = 0) = 0;
virtual PGReplaceMemberStatus get_replace_member_status(pg_id_t id, std::string& task_id,
const PGMember& old_member, const PGMember& new_member,
const std::vector< PGMember >& others,
uint64_t trace_id = 0) const = 0;

/**
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.
Expand All @@ -201,6 +199,68 @@ class PGManager : public Manager< PGError > {
* @param pg_id The ID of the PG.
*/
virtual void destroy_pg(pg_id_t pg_id) = 0;

/**
* @brief Single member exits a PG (Placement Group) identified by its ID.
* @param group_id The group ID of the pg.
* @param peer_id The peer ID of the member exiting the PG.
* @param trace_id The trace identifier for logging and tracking purposes.
*/
virtual NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, uint64_t trace_id) = 0;

/**
* @brief Toggle the learner flag for a specified member.
*
* This function changes the state of the learner flag for a given member in the PG.
* It is typically used to revert the learner flag back to false when rolling back pgmove.
*
* @param pg_id The ID of the PG where the member resides.
* @param member_id The ID of the member whose learner flag is to be toggled.
* @param is_learner The new state of the learner flag (true to set as learner, false to unset).
* @param commit_quorum The quorum required for committing the change.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

/**
* @brief Remove a member from the PG.
*
* This function removes a specified member from the PG, typically used to rollback the pgmove operation.
*
* @param pg_id The ID of the PG from which the member is to be removed.
* @param member_id The ID of the member to be removed.
* @param commit_quorum The quorum required for committing the removal.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

/**
* @brief Clean up the replace member task in the PG.
*
* This function cleans up the replace member task, typically used to rollback the pgmove operation.
*
* @param pg_id The ID of the PG where the task is to be cleaned.
* @param task_id The ID of the task to be cleaned.
* @param commit_quorum The quorum required for committing the task cleanup.
* @param trace_id The trace ID for tracking the operation.
* @return NullAsyncResult indicating the result of the operation.
*/
virtual NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

/**
* @brief List all replace member tasks happening on this homeobject instance.
*
* This function retrieves a list of all ongoing tasks on this homeobject instance.
*
* @param trace_id The trace ID for tracking the operation.
* @return Result containing a vector of replace member tasks.
*/
virtual Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) = 0;
};

} // namespace homeobject
22 changes: 22 additions & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ class HomeObjectImpl : public HomeObject,

virtual void _destroy_pg(pg_id_t pg_id) = 0;

virtual PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) = 0;

virtual PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id,
uint32_t commit_quorum, trace_id_t trace_id) = 0;

virtual PGManager::Result< std::vector< replace_member_task > >
_list_all_replace_member_tasks(trace_id_t trace_id) = 0;

protected:
std::mutex _repl_lock;
peer_id_t _our_id;
Expand Down Expand Up @@ -164,6 +178,14 @@ class HomeObjectImpl : public HomeObject,
bool get_stats(pg_id_t id, PGStats& stats) const final;
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
void destroy_pg(pg_id_t pg_id) final;
PGManager::NullResult exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) final;
PGManager::NullAsyncResult flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) final;
PGManager::NullAsyncResult remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) final;
PGManager::NullAsyncResult clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) final;
PGManager::Result< std::vector< replace_member_task > > list_all_replace_member_tasks(trace_id_t trace_id) final;

/// ShardManager
ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid) const final;
Expand Down
11 changes: 11 additions & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ add_test(NAME HomestoreResyncTestWithLeaderRestart
--override_config homestore_config.consensus.replace_member_sync_check_interval_ms=1000
--override_config homestore_config.consensus.laggy_threshold=2000
--gtest_filter=HomeObjectFixture.RestartLeader*)
#add_test(NAME HomestoreReplaceMemberRollbackTest
# COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
# --override_config homestore_config.consensus.snapshot_freq_distance=13
# --override_config homestore_config.consensus.num_reserved_log_items=13
# --override_config homestore_config.resource_limits.raft_logstore_reserve_threshold=13
# --override_config homestore_config.consensus.snapshot_sync_ctx_timeout_ms=5000
# --override_config homestore_config.generic.repl_dev_cleanup_interval_sec=5
# --override_config homestore_config.consensus.max_grpc_message_size=138412032
# --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=60000
# --override_config homestore_config.consensus.laggy_threshold=1
# --gtest_filter=HomeObjectFixture.RollbackReplaceMember)

# GC tests
add_test(NAME FetchDataWithOriginatorGC
Expand Down
17 changes: 17 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ class HSHomeObject : public HomeObjectImpl {

HomeObjectStats _get_stats() const override;
void _destroy_pg(pg_id_t pg_id) override;
PGManager::NullResult _exit_pg(uuid_t group_id, peer_id_t peer_id, trace_id_t trace_id) override;

PGManager::NullAsyncResult _flip_learner_flag(pg_id_t pg_id, peer_id_t const& member_id, bool is_learner,
uint32_t commit_quorum, trace_id_t trace_id) override;
PGManager::NullAsyncResult _remove_member(pg_id_t pg_id, peer_id_t const& member_id, uint32_t commit_quorum,
trace_id_t trace_id) override;
PGManager::NullAsyncResult _clean_replace_member_task(pg_id_t pg_id, std::string& task_id, uint32_t commit_quorum,
trace_id_t trace_id) override;
PGManager::Result< std::vector< replace_member_task > >
_list_all_replace_member_tasks(trace_id_t trace_id) override;

// Mapping from index table uuid to pg id.
std::shared_mutex index_lock_;
Expand Down Expand Up @@ -367,6 +377,11 @@ class HSHomeObject : public HomeObjectImpl {
* Returns all shards
*/
std::vector< Shard > get_chunk_shards(homestore::chunk_num_t v_chunk_id) const;

/**
* Update membership in pg's superblock.
*/
void update_membership(const MemberSet& members);
};

struct HS_Shard : public Shard {
Expand Down Expand Up @@ -778,6 +793,8 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::replica_member_info& member_out,
const homestore::replica_member_info& member_in, trace_id_t tid);

void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0);

/**
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
*
Expand Down
Loading