Skip to content

Commit d607bcc

Browse files
yuwmaoyuwmao
andauthored
Fix membership inconsistency caused by clean_replace_member_task (eBay#856)
- Add on_clean_replace_member_task callback - The current get_replication_status only provides config of peer itself on the follower side. So, add a get_replication_quorum function for the upper layer to correct the membership. Co-authored-by: yuwmao <yuwmao@ebaychina.com>
1 parent 0ce3124 commit d607bcc

File tree

7 files changed

+84
-10
lines changed

7 files changed

+84
-10
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "7.2.2"
12+
version = "7.3.0"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/include/homestore/replication/repl_dev.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,10 @@ class ReplDevListener {
404404
virtual void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
405405
const replica_member_info& member_in, trace_id_t tid) = 0;
406406

407+
/// @brief Called when clean replace member task (rollback).
408+
virtual void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
409+
const replica_member_info& member_in, trace_id_t tid) = 0;
410+
407411
/// @brief Called when remove a member.
408412
virtual void on_remove_member(const replica_id_t& member, trace_id_t tid) = 0;
409413

@@ -553,6 +557,10 @@ class ReplDev {
553557
/// this API can return empty result.
554558
virtual std::vector< peer_info > get_replication_status() const = 0;
555559

560+
/// @brief Get all members in the replication quorum
561+
/// @return List of replica IDs that are part of the replication group
562+
virtual std::vector< replica_id_t > get_replication_quorum() = 0;
563+
556564
/// @brief Gets the group_id this repldev is working for
557565
/// @return group_id
558566
virtual group_id_t group_id() const = 0;

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include "fetch_data_rpc_generated.h"
2525
#include <nuraft_mesg/common.hpp>
2626

27+
#include <boost/uuid/string_generator.hpp>
28+
2729
namespace homestore {
2830
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};
2931

@@ -1853,16 +1855,50 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) {
18531855
auto task_id = std::string(r_cast< const char* >(rreq->header().cbytes()));
18541856
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task commit, task_id={}", task_id);
18551857

1856-
std::unique_lock lg{m_sb_mtx};
1857-
auto persisted_task_id = get_replace_member_task_id();
1858-
if (!persisted_task_id.empty()) {
1859-
RD_DBG_ASSERT(persisted_task_id == task_id,
1860-
"Invalid task_id in clean_replace_member_task message, received {}, persisted {}", task_id,
1861-
persisted_task_id);
1862-
m_rd_sb->replace_member_task = replace_member_task_superblk{};
1863-
m_rd_sb.write();
1858+
replica_member_info member_out;
1859+
replica_member_info member_in;
1860+
1861+
// Step 1: Check and read member info from superblk
1862+
{
1863+
std::unique_lock lg{m_sb_mtx};
1864+
auto persisted_task_id = get_replace_member_task_id();
1865+
if (persisted_task_id.empty()) {
1866+
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task: task not found, task_id={}", task_id);
1867+
return;
1868+
}
1869+
1870+
if (persisted_task_id != task_id) {
1871+
RD_LOGW(rreq->traceID(),
1872+
"Raft repl clean_replace_member_task: task_id mismatch, received={}, persisted={}, skip cleaning",
1873+
task_id, persisted_task_id);
1874+
return;
1875+
}
1876+
1877+
// Read member info from superblk
1878+
member_out.id = m_rd_sb->replace_member_task.replica_out;
1879+
member_in.id = m_rd_sb->replace_member_task.replica_in;
1880+
}
1881+
1882+
// Step 2: Call listener callback to rollback membership in HomeObject
1883+
if (member_out.id != boost::uuids::nil_uuid() && member_in.id != boost::uuids::nil_uuid()) {
1884+
RD_LOGI(rreq->traceID(),
1885+
"Raft repl clean_replace_member_task, callback to listener, task_id={}, member_out={}, member_in={}",
1886+
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
1887+
m_listener->on_clean_replace_member_task(task_id, member_out, member_in, rreq->traceID());
1888+
} else {
1889+
RD_LOGW(rreq->traceID(), "Raft repl clean_replace_member_task: invalid member info, skip callback");
1890+
}
1891+
1892+
// Step 3: Clear the replace_member task from superblk
1893+
{
1894+
std::unique_lock lg{m_sb_mtx};
1895+
auto persisted_task_id = get_replace_member_task_id();
1896+
if (!persisted_task_id.empty()) {
1897+
m_rd_sb->replace_member_task = replace_member_task_superblk{};
1898+
m_rd_sb.write();
1899+
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
1900+
}
18641901
}
1865-
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
18661902
}
18671903

18681904
void RaftReplDev::update_truncation_boundary(repl_lsn_t truncation_upper_limit) {
@@ -1980,6 +2016,24 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {
19802016
return pi;
19812017
}
19822018

2019+
std::vector< replica_id_t > RaftReplDev::get_replication_quorum() {
2020+
std::vector< replica_id_t > member_ids;
2021+
auto msg_service = group_msg_service();
2022+
2023+
if (msg_service) {
2024+
std::list< nuraft_mesg::replica_config > cluster_config;
2025+
msg_service->get_cluster_config(cluster_config);
2026+
for (auto const& config : cluster_config) {
2027+
member_ids.push_back(boost::uuids::string_generator()(config.peer_id));
2028+
}
2029+
RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in cluster config", member_ids.size());
2030+
} else {
2031+
RD_LOGW(NO_TRACE_ID, "get_replication_quorum: msg_service is null, returning empty member list");
2032+
}
2033+
2034+
return member_ids;
2035+
}
2036+
19832037
void RaftReplDev::reconcile_leader() {
19842038
int32_t my_priority = raft_server()->get_srv_config(m_raft_server_id)->get_priority();
19852039
if (!is_leader()) {

src/lib/replication/repl_dev/raft_repl_dev.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ class RaftReplDev : public ReplDev,
290290
bool is_leader() const override;
291291
replica_id_t get_leader_id() const override;
292292
std::vector< peer_info > get_replication_status() const override;
293+
std::vector< replica_id_t > get_replication_quorum() override;
293294
std::set< replica_id_t > get_active_peers() const;
294295
group_id_t group_id() const override { return m_group_id; }
295296
void reconcile_leader() override;

src/lib/replication/repl_dev/solo_repl_dev.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class SoloReplDev : public ReplDev {
7878
return std::vector< peer_info >{
7979
peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0, .priority_ = 1}};
8080
}
81+
std::vector< replica_id_t > get_replication_quorum() override {
82+
return std::vector< replica_id_t >{m_group_id};
83+
}
8184
void reconcile_leader() override {}
8285
void yield_leadership(bool immediate_yield, replica_id_t candidate) override {}
8386
bool is_ready_for_traffic() const override { return true; }

src/tests/test_common/raft_repl_test_base.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
374374
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
375375
}
376376

377+
void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
378+
const replica_member_info& member_in, trace_id_t tid) override {
379+
LOGINFO("[Replica={}] clean replace member task {} out {} in {}", g_helper->replica_num(), task_id,
380+
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
381+
}
382+
377383
void on_remove_member(const replica_id_t& member, trace_id_t tid) override {
378384
LOGINFO("[Replica={}] remove member, member {}", g_helper->replica_num(), member);
379385
}

src/tests/test_solo_repl_dev.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ class SoloReplDevTest : public testing::Test {
137137
const replica_member_info& member_in, trace_id_t tid) override {}
138138
void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
139139
const replica_member_info& member_in, trace_id_t tid) override {}
140+
void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
141+
const replica_member_info& member_in, trace_id_t tid) override {}
140142
void on_remove_member(const replica_id_t& member, trace_id_t tid) override {}
141143
void on_destroy(const group_id_t& group_id) override {}
142144
void notify_committed_lsn(int64_t lsn) override {}

0 commit comments

Comments
 (0)