Skip to content

Commit d8fa192

Browse files
committed
add add_data_rpc_service to repl_dev
1 parent b2707cb commit d8fa192

File tree

4 files changed

+27
-1
lines changed

4 files changed

+27
-1
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "7.1.4"
12+
version = "7.2.1"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/include/homestore/replication/repl_dev.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <homestore/replication/repl_decls.h>
1818
#include <homestore/blkdata_service.hpp>
1919
#include <libnuraft/snapshot.hxx>
20+
#include <nuraft_mesg/mesg_state_mgr.hpp>
2021

2122
namespace nuraft {
2223
template < typename T >
@@ -438,10 +439,16 @@ class ReplDevListener {
438439
/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
439440
virtual void on_log_replay_done(const group_id_t& group_id) {};
440441

442+
virtual void on_become_leader(const group_id_t& group_id) {};
443+
444+
virtual void on_become_follower(const group_id_t& group_id) {};
445+
441446
private:
442447
std::weak_ptr< ReplDev > m_repl_dev;
443448
};
444449

450+
using data_service_request_handler_t = std::function< void(boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;
451+
445452
class ReplDev {
446453
public:
447454
ReplDev() = default;
@@ -624,6 +631,13 @@ class ReplDev {
624631
// create a snapshot manually and try to compact logs upto compact_lsn
625632
virtual void trigger_snapshot_creation(repl_lsn_t compact_lsn, bool wait_for_commit) = 0;
626633

634+
virtual bool add_data_rpc_service(std::string const& request_name,
635+
data_service_request_handler_t const& request_handler) {
636+
return true;
637+
}
638+
639+
virtual nuraft_mesg::repl_service_ctx* get_group_msg_service() { return nullptr; }
640+
627641
protected:
628642
shared< ReplDevListener > m_listener;
629643

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ bool RaftReplDev::join_group() {
138138
return true;
139139
}
140140

141+
bool RaftReplDev::add_data_rpc_service(std::string const& request_name,
142+
data_service_request_handler_t const& request_handler) {
143+
return m_msg_mgr.bind_data_service_request(request_name, m_group_id, request_handler);
144+
}
145+
146+
nuraft_mesg::repl_service_ctx* RaftReplDev::get_group_msg_service() { return group_msg_service(); }
147+
141148
// All the steps in the implementation should be idempotent and retryable.
142149
AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const replica_member_info& member_out,
143150
const replica_member_info& member_in, uint32_t commit_quorum,
@@ -2820,6 +2827,7 @@ void RaftReplDev::become_leader_cb() {
28202827
// becoming a leader.
28212828

28222829
RD_LOGD(NO_TRACE_ID, "become_leader_cb: setting traffic_ready_lsn from {} to {}", current_gate, new_gate);
2830+
m_listener->on_become_leader(m_group_id);
28232831
}
28242832

28252833
bool RaftReplDev::is_ready_for_traffic() const {

src/lib/replication/repl_dev/raft_repl_dev.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ class RaftReplDev : public ReplDev,
322322
void pause_state_machine(size_t timeout) override;
323323
void resume_state_machine() override;
324324
bool is_state_machine_paused() override;
325+
bool add_data_rpc_service(std::string const& request_name,
326+
data_service_request_handler_t const& request_handler) override;
327+
virtual nuraft_mesg::repl_service_ctx* get_group_msg_service() override;
325328

326329
std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override {
327330
return std::make_shared< nuraft_snapshot_context >(snp_ctx);
@@ -355,6 +358,7 @@ class RaftReplDev : public ReplDev,
355358
void become_follower_cb() {
356359
m_traffic_ready_lsn.store(0);
357360
RD_LOGD(NO_TRACE_ID, "become_follower_cb called!");
361+
m_listener->on_become_follower(m_group_id);
358362
}
359363

360364
/// @brief This method is called when the data journal is compacted

0 commit comments

Comments
 (0)