Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.1.4"
version = "7.2.1"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
33 changes: 32 additions & 1 deletion src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;
using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >;
using trace_id_t = u_int64_t;

using data_service_request_handler_t = std::function< void(boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;
ENUM(role_regex, uint8_t, LEADER, FOLLOWER, ALL, ANY);
using peer_id_t = boost::uuids::uuid;
using svr_id_t = int32_t;
using destination_t = std::variant< peer_id_t, role_regex, svr_id_t >;

ENUM(data_rpc_error_code, uint8_t, SUCCESS, TIMEOUT, SERVER_NOT_FOUND, CANCELLED, SERVER_ALREADY_EXISTS, TERM_MISMATCH,
BAD_REQUEST, FAILED, NOT_SUPPORTED);
template < typename T >
using DataRpcAsyncResult = folly::SemiFuture< Result< T, data_rpc_error_code > >;

using NullDataRpcAsyncResult = AsyncResult< folly::Unit, data_rpc_error_code >;

VENUM(repl_req_state_t, uint32_t,
INIT = 0, // Initial state
BLK_ALLOCATED = 1 << 0, // Local block is allocated
Expand Down Expand Up @@ -436,7 +449,11 @@ class ReplDevListener {
virtual void on_no_space_left(repl_lsn_t lsn, sisl::blob const& header) = 0;

/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
virtual void on_log_replay_done(const group_id_t& group_id) {};
virtual void on_log_replay_done(const group_id_t& group_id) = 0;

virtual void on_become_leader(const group_id_t& group_id) {};

virtual void on_become_follower(const group_id_t& group_id) {};

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand Down Expand Up @@ -624,6 +641,20 @@ class ReplDev {
// create a snapshot manually and try to compact logs upto compact_lsn
virtual void trigger_snapshot_creation(repl_lsn_t compact_lsn, bool wait_for_commit) = 0;

// add a data rpc service named request_name with request_handler
virtual bool add_data_rpc_service(std::string const& request_name,
data_service_request_handler_t const& request_handler) = 0;

// send a unidirectional data rpc to dest with request_name and cli_buf
virtual NullDataRpcAsyncResult data_request_unidirectional(destination_t const& dest,
std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) = 0;

// send a bidirectional data rpc to dest with request_name and cli_buf
virtual DataRpcAsyncResult< sisl::GenericClientResponse >
data_request_bidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) = 0;

protected:
shared< ReplDevListener > m_listener;

Expand Down
64 changes: 63 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "device/device.h"
#include "push_data_rpc_generated.h"
#include "fetch_data_rpc_generated.h"
#include <nuraft_mesg/common.hpp>

namespace homestore {
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};
Expand Down Expand Up @@ -138,6 +139,66 @@ bool RaftReplDev::join_group() {
return true;
}

data_rpc_error_code RaftReplDev::nuraft_to_data_rpc_error_code(nuraft::cmd_result_code const& nuraft_err) {
switch (nuraft_err) {
case nuraft::cmd_result_code::OK:
return data_rpc_error_code::SUCCESS;
case nuraft::cmd_result_code::SERVER_NOT_FOUND:
return data_rpc_error_code::SERVER_NOT_FOUND;
case nuraft::cmd_result_code::TIMEOUT:
return data_rpc_error_code::TIMEOUT;
case nuraft::cmd_result_code::SERVER_ALREADY_EXISTS:
return data_rpc_error_code::SERVER_ALREADY_EXISTS;
case nuraft::cmd_result_code::CANCELLED:
return data_rpc_error_code::CANCELLED;
case nuraft::cmd_result_code::TERM_MISMATCH:
return data_rpc_error_code::TERM_MISMATCH;
case nuraft::cmd_result_code::BAD_REQUEST:
return data_rpc_error_code::BAD_REQUEST;
case nuraft::cmd_result_code::FAILED:
return data_rpc_error_code::FAILED;
default:
return data_rpc_error_code::NOT_SUPPORTED;
}
}

nuraft_mesg::destination_t RaftReplDev::change_to_nuraft_mesg_destination(destination_t dest) {
if (std::holds_alternative< peer_id_t >(dest)) {
return nuraft_mesg::destination_t(std::get< peer_id_t >(dest));
} else if (std::holds_alternative< role_regex >(dest)) {
return nuraft_mesg::destination_t(static_cast< nuraft_mesg::role_regex >(std::get< role_regex >(dest)));
} else {
return nuraft_mesg::destination_t(std::get< svr_id_t >(dest));
}
}

bool RaftReplDev::add_data_rpc_service(std::string const& request_name,
data_service_request_handler_t const& request_handler) {
return m_msg_mgr.bind_data_service_request(request_name, m_group_id, request_handler);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, kind of concerning it will be a problem during shutdown, the HO will be deconstructed first then the handler is invalid.

It is not major atm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good point. but actually , we call homestore::shutdown in HSHomeObject::shutdown(), this means Homeobject is not destructed until homestore shutdown return.

}

NullDataRpcAsyncResult RaftReplDev::data_request_unidirectional(destination_t const& dest,
std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) {
return group_msg_service()
->data_service_request_unidirectional(change_to_nuraft_mesg_destination(dest), request_name, cli_buf)
.deferValue([this](auto&& r) -> Result< folly::Unit, data_rpc_error_code > {
if (r.hasError()) { return folly::makeUnexpected(nuraft_to_data_rpc_error_code(r.error())); }
return folly::unit;
});
}

DataRpcAsyncResult< sisl::GenericClientResponse >
RaftReplDev::data_request_bidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) {
return group_msg_service()
->data_service_request_bidirectional(change_to_nuraft_mesg_destination(dest), request_name, cli_buf)
.deferValue([this](auto&& r) -> Result< sisl::GenericClientResponse, data_rpc_error_code > {
if (r.hasError()) { return folly::makeUnexpected(nuraft_to_data_rpc_error_code(r.error())); }
return std::move(r.value());
});
}

// All the steps in the implementation should be idempotent and retryable.
AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum,
Expand Down Expand Up @@ -1204,7 +1265,7 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ
return nullptr;
}

RD_LOGD(rkey.traceID, , "in follower_create_req: rreq={}, addr=0x{:x}", rreq->to_string(),
RD_LOGD(rkey.traceID, "in follower_create_req: rreq={}, addr=0x{:x}", rreq->to_string(),
reinterpret_cast< uintptr_t >(rreq.get()));
return rreq;
}
Expand Down Expand Up @@ -2820,6 +2881,7 @@ void RaftReplDev::become_leader_cb() {
// becoming a leader.

RD_LOGD(NO_TRACE_ID, "become_leader_cb: setting traffic_ready_lsn from {} to {}", current_gate, new_gate);
m_listener->on_become_leader(m_group_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious about the purpose of on_become_xxx. Should we move it to the beginning of become_xxx_cb to call the upper callback first, similar to how handle_commit does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_become_leader and on_become_follower will be used for homeobject scrubbing. when scrubbing is on-going, if leader switch happens, the old leader will become follower and thus on_become_follower will be called on this node, where we stop the scrubbing thread that will request scrub results from the other two members. one of the follower will become leader and thus on_become_leader will be called on this node, where we will read the scrub superblk and start the scrubbing thread to request scrub result from the other two memebers.

Should we move it to the beginning
no need to do this now, what we want is to make sure that we can be notified with the leader change event, it does not matter it is called in the beginning or end.

}

bool RaftReplDev::is_ready_for_traffic() const {
Expand Down
11 changes: 11 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ class RaftReplDev : public ReplDev,
void resume_state_machine() override;
bool is_state_machine_paused() override;

bool add_data_rpc_service(std::string const& request_name,
data_service_request_handler_t const& request_handler) override;
NullDataRpcAsyncResult data_request_unidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) override;
DataRpcAsyncResult< sisl::GenericClientResponse >
data_request_bidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) override;

std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override {
return std::make_shared< nuraft_snapshot_context >(snp_ctx);
}
Expand Down Expand Up @@ -355,6 +363,7 @@ class RaftReplDev : public ReplDev,
void become_follower_cb() {
m_traffic_ready_lsn.store(0);
RD_LOGD(NO_TRACE_ID, "become_follower_cb called!");
m_listener->on_become_follower(m_group_id);
}

/// @brief This method is called when the data journal is compacted
Expand Down Expand Up @@ -474,6 +483,8 @@ class RaftReplDev : public ReplDev,
void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs);
bool is_resync_mode();
data_rpc_error_code nuraft_to_data_rpc_error_code(nuraft::cmd_result_code const& nuraft_err);
nuraft_mesg::destination_t change_to_nuraft_mesg_destination(destination_t dest);

/**
* \brief This method handles errors that occur during append entries or data receiving.
Expand Down
16 changes: 16 additions & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ class SoloReplDev : public ReplDev {
void truncate();

void trigger_snapshot_creation(repl_lsn_t compact_lsn, bool wait_for_commit) override { return; }
bool add_data_rpc_service(std::string const& request_name,
data_service_request_handler_t const& request_handler) override {
// not supported yet
return false;
}

NullDataRpcAsyncResult data_request_unidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) override {
return folly::makeUnexpected(data_rpc_error_code::NOT_SUPPORTED);
}

DataRpcAsyncResult< sisl::GenericClientResponse >
data_request_bidirectional(destination_t const& dest, std::string const& request_name,
sisl::io_blob_list_t const& cli_buf) override {
return folly::makeUnexpected(data_rpc_error_code::NOT_SUPPORTED);
}

private:
void write_journal(repl_req_ptr_t rreq);
Expand Down
2 changes: 2 additions & 0 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class TestReplicatedDB : public homestore::ReplDevListener {
repl_dev()->reset_latch_lsn();
}

void on_log_replay_done(const group_id_t& group_id) override {};

AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override {
std::lock_guard< std::mutex > lock(m_snapshot_lock);
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot();
Expand Down
1 change: 1 addition & 0 deletions src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class SoloReplDevTest : public testing::Test {
void notify_committed_lsn(int64_t lsn) override {}
void on_config_rollback(int64_t lsn) override {}
void on_no_space_left(repl_lsn_t lsn, sisl::blob const& header) override {}
void on_log_replay_done(const group_id_t& group_id) override {};
};

class Application : public ReplApplication {
Expand Down
Loading