Skip to content

Commit 742c85b

Browse files
committed
Implement on_rollback()
Signed-off-by: Xiaoxi Chen <xiaoxchen@ebay.com>
1 parent 48080c9 commit 742c85b

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

src/lib/replication/repl_dev/raft_state_machine.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa
179179

180180
repl_req_ptr_t rreq = lsn_to_req(lsn);
181181
RD_LOGD("Raft channel: Precommit rreq=[{}]", rreq->to_compact_string());
182+
// Fixme: Check return value of on_pre_commit
182183
m_rd.m_listener->on_pre_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq);
183184

184185
return m_success_ptr;
@@ -195,12 +196,23 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
195196
// This is the time to ensure flushing of journal happens in the proposer
196197
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
197198
}
198-
199+
// Fixme: Check return value of handle_commit
199200
m_rd.handle_commit(rreq);
200201

201202
return m_success_ptr;
202203
}
203204

205+
void RaftStateMachine::rollback_ext(const nuraft::state_machine::ext_op_params& params) {
206+
int64_t lsn = s_cast< int64_t >(params.log_idx);
207+
RD_LOGD("Raft channel: Received rollback message lsn {}, store {}, logdev {}", lsn,
208+
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id());
209+
repl_req_ptr_t rreq = lsn_to_req(lsn);
210+
RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq");
211+
RD_LOGD("Raft channel: rollback rreq=[{}]", rreq->to_compact_string());
212+
// Fixme: Check return value of on_rollback
213+
m_rd.m_listener->on_rollback(rreq->lsn(), rreq->header(), rreq->key(), rreq);
214+
}
215+
204216
void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
205217
for (auto [key, rreq] : m_lsn_req_map) {
206218
cb(key, rreq);

src/lib/replication/repl_dev/raft_state_machine.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class RaftStateMachine : public nuraft::state_machine {
108108
uint64_t last_commit_index() override;
109109
raft_buf_ptr_t pre_commit_ext(const nuraft::state_machine::ext_op_params& params) override;
110110
raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override;
111-
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
111+
void rollback_ext(const nuraft::state_machine::ext_op_params& params) override;
112+
// void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
112113
void become_ready();
113114

114115
bool apply_snapshot(nuraft::snapshot&) override { return false; }

0 commit comments

Comments
 (0)