Skip to content

Commit c7a4da0

Browse files
yuwmaoyuwmao
andauthored
Fix pg membership inconsistency (#386)
- Implement on_pg_clean_replace_member_task callback which actually rollback the membership change - Add a reconcile_membership API which is used to correct the existing inconsistency problem. Co-authored-by: yuwmao <yuwmao@ebaychina.com>
1 parent 6822380 commit c7a4da0

File tree

7 files changed

+165
-2
lines changed

7 files changed

+165
-2
lines changed

conanfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class HomeObjectConan(ConanFile):
1212
name = "homeobject"
13-
version = "3.0.17"
13+
version = "3.0.18"
1414

1515
homepage = "https://github.com/eBay/HomeObject"
1616
description = "Blob Store built on HomeStore"
@@ -50,7 +50,7 @@ def build_requirements(self):
5050

5151
def requirements(self):
5252
self.requires("sisl/[^13.0]@oss/master", transitive_headers=True)
53-
self.requires("homestore/[^7.1]@oss/master")
53+
self.requires("homestore/[^7.3]@oss/master")
5454
self.requires("iomgr/[^12.0]@oss/master")
5555

5656
def validate(self):

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,8 +789,26 @@ class HSHomeObject : public HomeObjectImpl {
789789
const homestore::replica_member_info& member_out,
790790
const homestore::replica_member_info& member_in, trace_id_t tid);
791791

792+
/**
793+
* @brief Called when clean replace member task (rollback)
794+
* @param group_id Group ID
795+
* @param task_id Task ID
796+
* @param member_out Member which should be restored to group
797+
* @param member_in Member which should be removed from group
798+
* */
799+
void on_pg_clean_replace_member_task(homestore::group_id_t group_id, const std::string& task_id,
800+
const homestore::replica_member_info& member_out,
801+
const homestore::replica_member_info& member_in, trace_id_t tid);
802+
792803
void on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid = 0);
793804

805+
/**
806+
* @brief Reconcile PG membership by synchronizing with raft's actual member list
807+
* @param pg_id PG ID to reconcile
808+
* @return true if reconciled successfully, false otherwise
809+
*/
810+
bool reconcile_membership(pg_id_t pg_id);
811+
794812
/**
795813
* @brief Cleans up and recycles resources for the PG identified by the given pg_id on the current node.
796814
*

src/lib/homestore_backend/hs_http_manager.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
4848
Pistache::Rest::Routes::bind(&HttpManager::clean_replace_member_task, this)},
4949
{Pistache::Http::Method::Get, "/api/v1/pg_replacemember_tasks",
5050
Pistache::Rest::Routes::bind(&HttpManager::list_pg_replace_member_task, this)},
51+
{Pistache::Http::Method::Post, "/api/v1/reconcile_membership",
52+
Pistache::Rest::Routes::bind(&HttpManager::reconcile_membership, this)},
5153
{Pistache::Http::Method::Delete, "/api/v1/pg", Pistache::Rest::Routes::bind(&HttpManager::exit_pg, this)},
5254
#ifdef _PRERELEASE
5355
{Pistache::Http::Method::Post, "/api/v1/crashSystem",
@@ -333,6 +335,35 @@ void HttpManager::clean_replace_member_task(const Pistache::Rest::Request& reque
333335
response.send(Pistache::Http::Code::Bad_Request, std::string("Invalid JSON: ") + e.what());
334336
}
335337
}
338+
void HttpManager::reconcile_membership(const Pistache::Rest::Request& request,
339+
Pistache::Http::ResponseWriter response) {
340+
try {
341+
auto body = request.body();
342+
auto j = nlohmann::json::parse(body);
343+
344+
std::string pg_id_str = j.at("pg_id").get< std::string >();
345+
pg_id_t pg_id = std::stoull(pg_id_str);
346+
347+
LOGINFO("Reconcile membership for pg_id={}", pg_id);
348+
349+
bool success = ho_.reconcile_membership(pg_id);
350+
if (!success) {
351+
response.send(Pistache::Http::Code::Internal_Server_Error,
352+
fmt::format("Failed to reconcile membership for pg_id={}", pg_id));
353+
return;
354+
}
355+
356+
nlohmann::json result;
357+
result["status"] = "success";
358+
result["pg_id"] = pg_id;
359+
result["message"] = "Membership reconciled successfully";
360+
361+
response.send(Pistache::Http::Code::Ok, result.dump());
362+
} catch (const std::exception& e) {
363+
response.send(Pistache::Http::Code::Bad_Request, std::string("Invalid JSON: ") + e.what());
364+
}
365+
}
366+
336367
void HttpManager::list_pg_replace_member_task(const Pistache::Rest::Request& request,
337368
Pistache::Http::ResponseWriter response) {
338369
auto tid = generateRandomTraceId();

src/lib/homestore_backend/hs_http_manager.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class HttpManager {
3737
void remove_member(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
3838
void clean_replace_member_task(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
3939
void list_pg_replace_member_task(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
40+
void reconcile_membership(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
4041
void get_pg_quorum(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
4142
void exit_pg(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response);
4243

src/lib/homestore_backend/hs_pg_manager.cpp

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,108 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std:
398398
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
399399
}
400400

401+
//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
402+
void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id,
403+
const replica_member_info& member_out,
404+
const replica_member_info& member_in, trace_id_t tid) {
405+
std::unique_lock lck(_pg_lock);
406+
for (const auto& iter : _pg_map) {
407+
auto& pg = iter.second;
408+
if (pg_repl_dev(*pg).group_id() == group_id) {
409+
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());
410+
411+
// Remove the in_member (the one that was added but now needs to be removed)
412+
auto removed_count = pg->pg_info_.members.erase(PGMember(member_in.id));
413+
414+
// Ensure out_member exists
415+
// Using emplace is safe - it won't overwrite if already exists
416+
auto out_pg_member = to_pg_member(member_out);
417+
auto [it, inserted] = pg->pg_info_.members.emplace(std::move(out_pg_member));
418+
419+
// Update superblock
420+
hs_pg->update_membership(pg->pg_info_.members);
421+
422+
LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), "
423+
"ensured out_member={} (inserted={}), member_nums={}, trace_id={}",
424+
task_id, boost::uuids::to_string(member_in.id), removed_count,
425+
boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid);
426+
return;
427+
}
428+
}
429+
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id,
430+
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
431+
}
432+
433+
bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
434+
std::unique_lock lck(_pg_lock);
435+
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
436+
if (!hs_pg) {
437+
LOGE("PG {} not found for reconcile_membership", pg_id);
438+
return false;
439+
}
440+
441+
// Get actual members from replication layer
442+
auto& repl_dev = hs_pg->repl_dev_;
443+
auto actual_members = repl_dev->get_replication_quorum();
444+
if (actual_members.empty()) {
445+
LOGW("Failed to get replication quorum for PG {}, repl_dev may not be ready", pg_id);
446+
return false;
447+
}
448+
449+
// Build new member set from actual members
450+
std::set< PGMember > new_members;
451+
for (auto& member_id : actual_members) {
452+
auto existing = hs_pg->pg_info_.members.find(PGMember(member_id));
453+
if (existing != hs_pg->pg_info_.members.end()) {
454+
// Keep the existing member with its name and priority
455+
new_members.insert(*existing);
456+
} else {
457+
// New member not in our records, add with default name
458+
PGMember new_member(member_id);
459+
new_members.insert(std::move(new_member));
460+
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id);
461+
}
462+
}
463+
// Check if membership changed
464+
if (new_members == hs_pg->pg_info_.members) {
465+
LOGD("Membership already in sync for pg={}, no update needed", hs_pg->pg_info_.id);
466+
return true;
467+
}
468+
469+
// Log the differences
470+
std::vector< peer_id_t > removed_members;
471+
std::vector< peer_id_t > added_members;
472+
473+
for (auto& old_member : hs_pg->pg_info_.members) {
474+
if (new_members.find(old_member) == new_members.end()) {
475+
removed_members.push_back(old_member.id);
476+
}
477+
}
478+
479+
for (auto& new_member : new_members) {
480+
if (hs_pg->pg_info_.members.find(new_member) == hs_pg->pg_info_.members.end()) {
481+
added_members.push_back(new_member.id);
482+
}
483+
}
484+
485+
LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new membership count: {}",
486+
pg_id, removed_members.size(), added_members.size(), hs_pg->pg_info_.members.size(), new_members.size());
487+
488+
for (auto& member_id : removed_members) {
489+
LOGI(" - Removing member: {}", boost::uuids::to_string(member_id));
490+
}
491+
for (auto& member_id : added_members) {
492+
LOGI(" - Adding member: {}", boost::uuids::to_string(member_id));
493+
}
494+
495+
// Update membership
496+
hs_pg->pg_info_.members = std::move(new_members);
497+
hs_pg->update_membership(hs_pg->pg_info_.members);
498+
499+
LOGI("Successfully reconciled PG {} membership, new member_count={}", pg_id, hs_pg->pg_info_.members.size());
500+
return true;
501+
}
502+
401503
void HSHomeObject::on_remove_member(homestore::group_id_t group_id, const peer_id_t& member, trace_id_t tid) {
402504
LOGINFO("PG remove member, member={}, trace_id={}", boost::uuids::to_string(member), tid);
403505
std::unique_lock lck(_pg_lock);

src/lib/homestore_backend/replication_state_machine.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ void ReplicationStateMachine::on_complete_replace_member(const std::string& task
274274
home_object_->on_pg_complete_replace_member(repl_dev()->group_id(), task_id, member_out, member_in, tid);
275275
}
276276

277+
void ReplicationStateMachine::on_clean_replace_member_task(const std::string& task_id,
278+
const homestore::replica_member_info& member_out,
279+
const homestore::replica_member_info& member_in,
280+
trace_id_t tid) {
281+
home_object_->on_pg_clean_replace_member_task(repl_dev()->group_id(), task_id, member_out, member_in, tid);
282+
}
283+
277284
void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) {
278285
auto PG_ID = home_object_->get_pg_id_with_group_id(group_id);
279286
if (!PG_ID.has_value()) {

src/lib/homestore_backend/replication_state_machine.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
183183
void on_complete_replace_member(const std::string& task_id, const homestore::replica_member_info& member_out,
184184
const homestore::replica_member_info& member_in, trace_id_t tid = 0) override;
185185

186+
/// @brief Called when clean replace member task (rollback)
187+
void on_clean_replace_member_task(const std::string& task_id, const homestore::replica_member_info& member_out,
188+
const homestore::replica_member_info& member_in, trace_id_t tid = 0) override;
189+
186190
/// @brief Called when the replica is being destroyed by nuraft;
187191
void on_destroy(const homestore::group_id_t& group_id) override;
188192

0 commit comments

Comments
 (0)