Skip to content

Commit 54ffb00

Browse files
committed
fix: reset progress when heartbeat response indicates conflict
With `allow_log_reversion` enabled: Before this fix, `RaftCore` ignored `conflict` messages from `Heartbeat` RPCs, preventing leaders from discovering follower state changes. When a follower's state reverted and responded with a conflict message, the leader wouldn't retransmit necessary data to the follower. This commit ensures `conflict` responses are always processed properly and progress is reset to trigger data retransmission to the follower. In this commit, a `Heartbeat` message uses `committed` log id as the `prev_log_id` to detect Follower state reversion.
1 parent e69e64b commit 54ffb00

File tree

15 files changed

+216
-67
lines changed

15 files changed

+216
-67
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod raft_core_closed;
2+
mod stopped;
3+
4+
pub(crate) use raft_core_closed::RaftCoreClosed;
5+
pub(crate) use stopped::Stopped;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
2+
#[error("RaftCore closed receiver")]
3+
pub(crate) struct RaftCoreClosed;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
use crate::core::heartbeat::errors::raft_core_closed::RaftCoreClosed;
2+
3+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
4+
pub enum Stopped {
5+
#[error("HeartbeatWorkerStopped: {0}")]
6+
RaftCoreClosed(#[from] RaftCoreClosed),
7+
8+
#[error("HeartbeatWorkerStopped: received shutdown signal")]
9+
ReceivedShutdown,
10+
}

openraft/src/core/heartbeat/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub(crate) mod errors;
12
pub(crate) mod event;
23
pub(crate) mod handle;
34
pub(crate) mod worker;

openraft/src/core/heartbeat/worker.rs

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@ use futures::FutureExt;
66

77
use crate::async_runtime::watch::WatchReceiver;
88
use crate::async_runtime::MpscUnboundedSender;
9+
use crate::core::heartbeat::errors::RaftCoreClosed;
10+
use crate::core::heartbeat::errors::Stopped;
911
use crate::core::heartbeat::event::HeartbeatEvent;
1012
use crate::core::notification::Notification;
1113
use crate::network::v2::RaftNetworkV2;
1214
use crate::network::RPCOption;
1315
use crate::raft::AppendEntriesRequest;
16+
use crate::raft::AppendEntriesResponse;
17+
use crate::replication::response::ReplicationResult;
18+
use crate::replication::Progress;
1419
use crate::type_config::alias::MpscUnboundedSenderOf;
1520
use crate::type_config::alias::OneshotReceiverOf;
1621
use crate::type_config::alias::WatchReceiverOf;
@@ -59,14 +64,19 @@ where
5964
C: RaftTypeConfig,
6065
N: RaftNetworkV2<C>,
6166
{
62-
pub(crate) async fn run(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) {
67+
pub(crate) async fn run(self, rx_shutdown: OneshotReceiverOf<C, ()>) {
68+
let res = self.do_run(rx_shutdown).await;
69+
tracing::info!("HeartbeatWorker finished with result: {:?}", res);
70+
}
71+
72+
pub(crate) async fn do_run(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), Stopped> {
6373
loop {
6474
tracing::debug!("{} is waiting for a new heartbeat event.", self);
6575

6676
futures::select! {
6777
_ = (&mut rx_shutdown).fuse() => {
6878
tracing::info!("{} is shutdown.", self);
69-
return;
79+
return Err(Stopped::ReceivedShutdown);
7080
},
7181
_ = self.rx.changed().fuse() => {},
7282
}
@@ -83,7 +93,9 @@ where
8393

8494
let payload = AppendEntriesRequest {
8595
vote: heartbeat.session_id.leader_vote.clone().into_vote(),
86-
prev_log_id: None,
96+
// Use committed log id as prev_log_id to detect follower state reversion.
97+
// prev_log_id == None does not conflict.
98+
prev_log_id: heartbeat.committed.clone(),
8799
leader_commit: heartbeat.committed.clone(),
88100
entries: vec![],
89101
};
@@ -92,22 +104,65 @@ where
92104
tracing::debug!("{} sent a heartbeat: {}, result: {:?}", self, heartbeat, res);
93105

94106
match res {
95-
Ok(Ok(_)) => {
96-
let res = self.tx_notification.send(Notification::HeartbeatProgress {
107+
Ok(Ok(x)) => {
108+
let response: AppendEntriesResponse<C> = x;
109+
110+
match response {
111+
AppendEntriesResponse::Success => {}
112+
AppendEntriesResponse::PartialSuccess(_matching) => {}
113+
AppendEntriesResponse::HigherVote(vote) => {
114+
tracing::debug!(
115+
"seen a higher vote({vote}) from {}; when:(sending heartbeat)",
116+
self.target
117+
);
118+
119+
let noti = Notification::HigherVote {
120+
target: self.target.clone(),
121+
higher: vote,
122+
leader_vote: heartbeat.session_id.committed_vote(),
123+
};
124+
125+
self.send_notification(noti, "Seeing higher Vote")?;
126+
}
127+
AppendEntriesResponse::Conflict => {
128+
let conflict = heartbeat.committed.unwrap();
129+
130+
let noti = Notification::ReplicationProgress {
131+
has_payload: false,
132+
progress: Progress {
133+
session_id: heartbeat.session_id.clone(),
134+
target: self.target.clone(),
135+
result: Ok(ReplicationResult(Err(conflict))),
136+
},
137+
};
138+
139+
self.send_notification(noti, "Seeing conflict")?;
140+
}
141+
}
142+
143+
let noti = Notification::HeartbeatProgress {
97144
session_id: heartbeat.session_id.clone(),
98145
sending_time: heartbeat.time,
99146
target: self.target.clone(),
100-
});
147+
};
101148

102-
if res.is_err() {
103-
tracing::error!("{} failed to send a heartbeat progress to RaftCore. quit", self);
104-
return;
105-
}
149+
self.send_notification(noti, "send HeartbeatProgress")?;
106150
}
107151
_ => {
108152
tracing::warn!("{} failed to send a heartbeat: {:?}", self, res);
109153
}
110154
}
111155
}
112156
}
157+
158+
fn send_notification(&self, notification: Notification<C>, when: impl fmt::Display) -> Result<(), RaftCoreClosed> {
159+
let res = self.tx_notification.send(notification);
160+
161+
if let Err(e) = res {
162+
let notification = e.0;
163+
tracing::error!("{self} failed to send {notification} to RaftCore; when:({when})");
164+
return Err(RaftCoreClosed);
165+
}
166+
Ok(())
167+
}
113168
}

openraft/src/core/notification.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ where C: RaftTypeConfig
5050
LocalIO { io_id: IOId<C> },
5151

5252
/// Result of executing a command sent from network worker.
53-
ReplicationProgress { progress: replication::Progress<C> },
53+
ReplicationProgress {
54+
/// If this progress from RPC with payload.
55+
///
56+
/// `has_payload`: contain payload and should reset `inflight` state if conflict.
57+
has_payload: bool,
58+
progress: replication::Progress<C>,
59+
},
5460

5561
HeartbeatProgress {
5662
session_id: ReplicationSessionId<C>,
@@ -105,8 +111,9 @@ where C: RaftTypeConfig
105111
}
106112
Self::StorageError { error } => write!(f, "StorageError: {}", error),
107113
Self::LocalIO { io_id } => write!(f, "IOFlushed: {}", io_id),
108-
Self::ReplicationProgress { progress } => {
109-
write!(f, "{}", progress)
114+
Self::ReplicationProgress { has_payload, progress } => {
115+
let payload = if *has_payload { "no-payload" } else { "has-payload" };
116+
write!(f, "{payload}: {}", progress)
110117
}
111118
Self::HeartbeatProgress {
112119
session_id: leader_vote,

openraft/src/core/raft_core.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,15 +1399,15 @@ where
13991399
}
14001400
}
14011401

1402-
Notification::ReplicationProgress { progress } => {
1402+
Notification::ReplicationProgress { has_payload, progress } => {
14031403
// If vote or membership changes, ignore the message.
14041404
// There is chance delayed message reports a wrong state.
14051405
if self.does_replication_session_match(&progress.session_id, "ReplicationProgress") {
14061406
tracing::debug!(progress = display(&progress), "recv Notification::ReplicationProgress");
14071407

14081408
// replication_handler() won't panic because:
14091409
// The leader is still valid because progress.session_id.leader_vote does not change.
1410-
self.engine.replication_handler().update_progress(progress.target, progress.result);
1410+
self.engine.replication_handler().update_progress(progress.target, progress.result, has_payload);
14111411
}
14121412
}
14131413

openraft/src/engine/handler/replication_handler/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -213,15 +213,18 @@ where C: RaftTypeConfig
213213

214214
/// Update progress when replicated data(logs or snapshot) does not match follower/learner state
215215
/// and is rejected.
216+
///
217+
/// If `has_payload` is true, the `inflight` state is reset because AppendEntries RPC
218+
/// manages the inflight state.
216219
#[tracing::instrument(level = "debug", skip_all)]
217-
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogIdOf<C>) {
220+
pub(crate) fn update_conflicting(&mut self, target: C::NodeId, conflict: LogIdOf<C>, has_payload: bool) {
218221
// TODO(2): test it?
219222

220223
let prog_entry = self.leader.progress.get_mut(&target).unwrap();
221224

222225
let mut updater = progress::entry::update::Updater::new(self.config, prog_entry);
223226

224-
updater.update_conflicting(conflict.index());
227+
updater.update_conflicting(conflict.index(), has_payload);
225228
}
226229

227230
/// Enable one-time replication reset for a specific node upon log reversion detection.
@@ -254,13 +257,17 @@ where C: RaftTypeConfig
254257

255258
/// Update replication progress when a response is received.
256259
#[tracing::instrument(level = "debug", skip_all)]
257-
pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result<ReplicationResult<C>, String>) {
260+
pub(crate) fn update_progress(
261+
&mut self,
262+
target: C::NodeId,
263+
repl_res: Result<ReplicationResult<C>, String>,
264+
has_payload: bool,
265+
) {
258266
tracing::debug!(
259-
target = display(&target),
260-
result = display(repl_res.display()),
261-
progress = display(&self.leader.progress),
262-
"{}",
263-
func_name!()
267+
"{}: target={target}, result={}, has_payload={has_payload}, current progresses={}",
268+
func_name!(),
269+
repl_res.display(),
270+
self.leader.progress
264271
);
265272

266273
match repl_res {
@@ -269,7 +276,7 @@ where C: RaftTypeConfig
269276
self.update_matching(target, matching);
270277
}
271278
Err(conflict) => {
272-
self.update_conflicting(target, conflict);
279+
self.update_conflicting(target, conflict, has_payload);
273280
}
274281
},
275282
Err(err_str) => {

openraft/src/progress/entry/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ fn test_update_conflicting() -> anyhow::Result<()> {
7979
pe.inflight = inflight_logs(5, 10);
8080

8181
let engine_config = EngineConfig::new_default(1);
82-
pe.new_updater(&engine_config).update_conflicting(5);
82+
pe.new_updater(&engine_config).update_conflicting(5, true);
8383

8484
assert_eq!(Inflight::None, pe.inflight);
8585
assert_eq!(&Some(log_id(3)), pe.borrow());

openraft/src/progress/entry/update.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,42 @@ where C: RaftTypeConfig
2020
Self { engine_config, entry }
2121
}
2222

23-
/// Update conflicting log index.
23+
/// Update the conflicting log index for this follower.
2424
///
25-
/// Conflicting log index is the last found log index on a follower that is not matching the
26-
/// leader log.
25+
/// The conflicting log index is the last log index found on a follower that does not match
26+
/// the leader's log at that position.
2727
///
28-
/// Usually if follower's data is not lost, `conflict` is always greater than or equal
29-
/// `matching`. But for testing purpose, a follower is allowed to clean its data and wait
30-
/// for leader to replicate all data to it.
28+
/// If `has_payload` is true, the `inflight` state is reset because AppendEntries RPC
29+
/// manages the inflight state.
3130
///
32-
/// To allow a follower to clean its data, set the config [`Config::allow_log_reversion`] .
31+
/// Normally, the `conflict` index should be greater than or equal to the `matching` index
32+
/// when follower data is intact. However, for testing purposes, a follower may clean its
33+
/// data and require the leader to replicate all data from the beginning.
34+
///
35+
/// To allow follower log reversion, enable [`Config::allow_log_reversion`].
3336
///
3437
/// [`Config::allow_log_reversion`]: `crate::config::Config::allow_log_reversion`
35-
pub(crate) fn update_conflicting(&mut self, conflict: u64) {
38+
pub(crate) fn update_conflicting(&mut self, conflict: u64, has_payload: bool) {
3639
tracing::debug!(
3740
"update_conflict: current progress_entry: {}; conflict: {}",
3841
self.entry,
3942
conflict
4043
);
4144

42-
self.entry.inflight.conflict(conflict);
45+
// The inflight may be None if the conflict is caused by a heartbeat response.
46+
if has_payload {
47+
self.entry.inflight.conflict(conflict);
48+
}
49+
50+
if conflict >= self.entry.searching_end {
51+
tracing::debug!(
52+
"conflict {} >= searching_end {}; no need to update",
53+
conflict,
54+
self.entry.searching_end
55+
);
56+
return;
57+
}
4358

44-
debug_assert!(conflict < self.entry.searching_end);
4559
self.entry.searching_end = conflict;
4660

4761
// An already matching log id is found lost:

0 commit comments

Comments
 (0)