Skip to content

Commit 50cb1cc

Browse files
committed
refactor: rename and reorganize log I/O state handling
- Replaced `accepted_io` with `accepted_log_io` in `RaftState` for clearer semantics. - Introduced `log_progress_mut` and `apply_progress_mut` methods to streamline access to log and apply progress. - Improved naming conventions for log I/O progress tracking, e.g., renamed methods like `update_committed` to `accept` for consistency. - Updated `IOProgress` to include a `name` field for better debugging and logging context. - Adjusted test cases and internal logic to align with the new naming and structure.
1 parent 30f1ae6 commit 50cb1cc

21 files changed

+110
-105
lines changed

openraft/src/core/raft_core.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ where
543543

544544
// --- data ---
545545
current_term: st.vote_ref().term(),
546-
vote: st.io_state().log_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
546+
vote: st.log_progress().flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
547547
last_log_index: st.last_log_id().index(),
548548
last_applied: st.io_applied().cloned(),
549549
snapshot: st.io_snapshot_last_log_id().cloned(),
@@ -575,7 +575,7 @@ where
575575

576576
let server_metrics = RaftServerMetrics {
577577
id: self.id.clone(),
578-
vote: st.io_state().log_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
578+
vote: st.log_progress().flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
579579
state: st.server_state,
580580
current_leader,
581581
membership_config,
@@ -861,7 +861,7 @@ where
861861
tracing::debug!(
862862
"RAFT_stats id={:<2} log_io: {}",
863863
self.id,
864-
self.engine.state.io_state.log_progress
864+
self.engine.state.log_progress()
865865
);
866866

867867
// In each loop, it does not have to check rx_shutdown and flush metrics for every RaftMsg
@@ -1368,7 +1368,7 @@ where
13681368
}
13691369

13701370
Notification::LocalIO { io_id } => {
1371-
self.engine.state.io_state.log_progress.flush(io_id.clone());
1371+
self.engine.state.log_progress_mut().flush(io_id.clone());
13721372

13731373
match io_id {
13741374
IOId::Log(log_io_id) => {
@@ -1448,16 +1448,18 @@ where
14481448
func_name!()
14491449
);
14501450

1451-
self.engine.state.io_state_mut().log_progress.flush(io_id);
1451+
self.engine.state.log_progress_mut().flush(io_id);
14521452

14531453
if let Some(meta) = meta {
1454+
if let Some(last) = &meta.last_log_id {
1455+
self.engine.state.apply_progress_mut().flush(last.clone());
1456+
}
14541457
let st = self.engine.state.io_state_mut();
1455-
st.update_applied(meta.last_log_id.clone());
14561458
st.update_snapshot(meta.last_log_id);
14571459
}
14581460
}
14591461
sm::Response::Apply(res) => {
1460-
self.engine.state.io_state_mut().update_applied(Some(res.last_applied.clone()));
1462+
self.engine.state.apply_progress_mut().flush(res.last_applied);
14611463
}
14621464
}
14631465
}
@@ -1613,7 +1615,7 @@ where
16131615
if let Some(condition) = condition {
16141616
match condition {
16151617
Condition::IOFlushed { io_id } => {
1616-
let curr = self.engine.state.io_state().log_progress.flushed();
1618+
let curr = self.engine.state.log_progress().flushed();
16171619
if curr < Some(&io_id) {
16181620
tracing::debug!(
16191621
"io_id: {} has not yet flushed, currently flushed: {} postpone cmd: {}",
@@ -1625,7 +1627,7 @@ where
16251627
}
16261628
}
16271629
Condition::LogFlushed { log_id } => {
1628-
let curr = self.engine.state.io_state().log_progress.flushed();
1630+
let curr = self.engine.state.log_progress().flushed();
16291631
let curr = curr.and_then(|x| x.last_log_id());
16301632
if curr < log_id.as_ref() {
16311633
tracing::debug!(
@@ -1664,7 +1666,7 @@ where
16641666

16651667
match cmd {
16661668
Command::UpdateIOProgress { io_id, .. } => {
1667-
self.engine.state.io_state.log_progress.submit(io_id.clone());
1669+
self.engine.state.log_progress_mut().submit(io_id.clone());
16681670

16691671
let notify = Notification::LocalIO { io_id: io_id.clone() };
16701672

@@ -1689,13 +1691,13 @@ where
16891691
//
16901692
// The `submit` state must be updated before calling `append()`,
16911693
// because `append()` may call the callback before returning.
1692-
self.engine.state.io_state.log_progress.submit(io_id);
1694+
self.engine.state.log_progress_mut().submit(io_id);
16931695

16941696
// Submit IO request, do not wait for the response.
16951697
self.log_store.append(entries, callback).await?;
16961698
}
16971699
Command::SaveVote { vote } => {
1698-
self.engine.state.io_state_mut().log_progress.submit(IOId::new(&vote));
1700+
self.engine.state.log_progress_mut().submit(IOId::new(&vote));
16991701
self.log_store.save_vote(&vote).await?;
17001702

17011703
let _ = self.tx_notification.send(Notification::LocalIO {
@@ -1758,7 +1760,7 @@ where
17581760
already_committed,
17591761
upto,
17601762
} => {
1761-
self.engine.state.io_state.apply_progress.submit(upto.clone());
1763+
self.engine.state.apply_progress_mut().submit(upto.clone());
17621764
let first = self.engine.state.get_log_id(already_committed.next_index()).unwrap();
17631765
self.apply_to_state_machine(first, upto).await?;
17641766
}
@@ -1786,15 +1788,15 @@ where
17861788
self.heartbeat_handle.spawn_workers(&mut self.network_factory, &self.tx_notification, nodes).await;
17871789
}
17881790
Command::StateMachine { command } => {
1789-
let io_id = command.get_submit_io();
1791+
let io_id = command.get_log_progress();
17901792

17911793
if let Some(io_id) = io_id {
1792-
self.engine.state.io_state.log_progress.submit(io_id);
1794+
self.engine.state.log_progress_mut().submit(io_id);
17931795
}
17941796

17951797
// If this command update the last-applied log id, mark it as submitted(to state machine).
17961798
if let Some(log_id) = command.get_apply_progress() {
1797-
self.engine.state.io_state.apply_progress.submit(log_id);
1799+
self.engine.state.apply_progress_mut().submit(log_id);
17981800
}
17991801

18001802
// Just forward a state machine command to the worker.

openraft/src/core/sm/command.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ where C: RaftTypeConfig
9191
}
9292
}
9393

94-
/// Return the IOId if this command submit any IO.
95-
pub(crate) fn get_submit_io(&self) -> Option<IOId<C>> {
94+
/// Return the [`IOId`] of the log-related I/O progress to submit, if this command submits any
95+
/// log I/O.
96+
///
97+
/// Log-related I/O progress includes both Vote and AppendEntries operations.
98+
pub(crate) fn get_log_progress(&self) -> Option<IOId<C>> {
9699
match self {
97100
Command::BuildSnapshot => None,
98101
Command::GetSnapshot { .. } => None,

openraft/src/engine/engine_impl.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ where C: RaftTypeConfig
403403

404404
let condition = if is_ok {
405405
Some(Condition::IOFlushed {
406-
io_id: self.state.accepted_io().unwrap().clone(),
406+
io_id: self.state.accepted_log_io().unwrap().clone(),
407407
})
408408
} else {
409409
None
@@ -439,7 +439,7 @@ where C: RaftTypeConfig
439439
pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option<LogIdOf<C>>) {
440440
tracing::debug!(
441441
leader_committed = display(leader_committed.display()),
442-
my_accepted = display(self.state.accepted_io().display()),
442+
my_accepted = display(self.state.accepted_log_io().display()),
443443
my_committed = display(self.state.committed().display()),
444444
"{}",
445445
func_name!()
@@ -649,7 +649,7 @@ where C: RaftTypeConfig
649649
let _res = self.vote_handler().update_vote(&vote.clone().into_vote());
650650
debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res);
651651

652-
self.state.accept_io(IOId::new_log_io(vote, last_log_id));
652+
self.state.accept_log_io(IOId::new_log_io(vote, last_log_id));
653653

654654
// No need to submit UpdateIOProgress command,
655655
// IO progress is updated by the new blank log

openraft/src/engine/handler/following_handler/append_entries_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {
7373
Vote::new(2, 1).into_committed(),
7474
Some(log_id(3, 1, 5))
7575
)),
76-
eng.state.accepted_io()
76+
eng.state.accepted_log_io()
7777
);
7878
assert_eq!(eng.output.take_commands(), vec![
7979
//
@@ -101,7 +101,7 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {
101101
Vote::new(3, 1).into_committed(),
102102
Some(log_id(3, 1, 4))
103103
)),
104-
eng.state.accepted_io()
104+
eng.state.accepted_log_io()
105105
);
106106
assert_eq!(eng.output.take_commands(), vec![
107107
//
@@ -123,7 +123,7 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {
123123
Vote::new(3, 1).into_committed(),
124124
Some(log_id(3, 1, 4))
125125
)),
126-
eng.state.accepted_io()
126+
eng.state.accepted_log_io()
127127
);
128128

129129
assert_eq!(eng.output.take_commands(), vec![

openraft/src/engine/handler/following_handler/commit_entries_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ fn eng() -> Engine<UTConfig> {
3333
Duration::from_millis(500),
3434
Vote::new_committed(2, 1),
3535
);
36-
eng.state.io_state.update_committed(log_id(1, 1, 1));
36+
eng.state.apply_progress_mut().accept(log_id(1, 1, 1));
3737
eng.state.membership_state = MembershipState::new(
3838
Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())),
3939
Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m23())),
@@ -64,7 +64,7 @@ fn test_following_handler_commit_entries_empty() -> anyhow::Result<()> {
6464
fn test_following_handler_commit_entries_ge_accepted() -> anyhow::Result<()> {
6565
let mut eng = eng();
6666
let committed_vote = eng.state.vote_ref().into_committed();
67-
eng.state.io_state.log_progress.accept(IOId::new_log_io(committed_vote, Some(log_id(1, 1, 2))));
67+
eng.state.log_progress_mut().accept(IOId::new_log_io(committed_vote, Some(log_id(1, 1, 2))));
6868

6969
eng.following_handler().commit_entries(Some(log_id(2, 1, 3)));
7070

@@ -96,7 +96,7 @@ fn test_following_handler_commit_entries_ge_accepted() -> anyhow::Result<()> {
9696
fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> {
9797
let mut eng = eng();
9898
let committed_vote = eng.state.vote_ref().into_committed();
99-
eng.state.io_state.log_progress.accept(IOId::new_log_io(committed_vote, Some(log_id(3, 1, 4))));
99+
eng.state.log_progress_mut().accept(IOId::new_log_io(committed_vote, Some(log_id(3, 1, 4))));
100100

101101
eng.following_handler().commit_entries(Some(log_id(2, 1, 3)));
102102

openraft/src/engine/handler/following_handler/install_snapshot_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fn eng() -> Engine<UTConfig> {
3838
let now = UTConfig::<()>::now();
3939
let vote = VoteOf::<UTConfig>::new_committed(2, 1);
4040
eng.state.vote.update(now, Duration::from_millis(500), vote);
41-
eng.state.io_state_mut().update_committed(log_id(4, 1, 5));
41+
eng.state.apply_progress_mut().accept(log_id(4, 1, 5));
4242
eng.state.log_ids = LogIdList::new(vec![
4343
//
4444
log_id(2, 1, 2),
@@ -188,7 +188,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
188188
Duration::from_millis(500),
189189
Vote::new_committed(2, 1),
190190
);
191-
eng.state.io_state_mut().update_committed(log_id(2, 1, 3));
191+
eng.state.apply_progress_mut().accept(log_id(2, 1, 3));
192192
eng.state.log_ids = LogIdList::new(vec![
193193
//
194194
log_id(2, 1, 2),
@@ -349,7 +349,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
349349
Vote::new(2, 1).into_committed(),
350350
Some(log_id(100, 1, 100))
351351
)),
352-
eng.state.accepted_io()
352+
eng.state.accepted_log_io()
353353
);
354354

355355
Ok(())

openraft/src/engine/handler/following_handler/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ where C: RaftTypeConfig
7676
let last_log_id = entries.last().map(|ent| ent.log_id());
7777
let last_log_id = std::cmp::max(prev_log_id, last_log_id);
7878

79-
let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
79+
let prev_accepted = self.state.accept_log_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
8080

8181
let l = entries.len();
8282
let since = self.state.first_conflicting_index(&entries);
@@ -160,7 +160,7 @@ where C: RaftTypeConfig
160160
/// Commit entries that are already committed by the leader.
161161
#[tracing::instrument(level = "debug", skip_all)]
162162
pub(crate) fn commit_entries(&mut self, leader_committed: Option<LogIdOf<C>>) {
163-
let accepted = self.state.accepted_io().cloned();
163+
let accepted = self.state.accepted_log_io().cloned();
164164
let accepted = accepted.and_then(|x| x.last_log_id().cloned());
165165
let committed = std::cmp::min(accepted.clone(), leader_committed.clone());
166166

@@ -314,8 +314,10 @@ where C: RaftTypeConfig
314314
}
315315

316316
let io_id = IOId::new_log_io(self.leader_vote.clone(), Some(snap_last_log_id.clone()));
317-
self.state.accept_io(io_id.clone());
318-
self.state.io_state.update_committed(snap_last_log_id.clone());
317+
self.state.accept_log_io(io_id.clone());
318+
319+
self.state.apply_progress_mut().accept(snap_last_log_id.clone());
320+
319321
self.update_committed_membership(EffectiveMembership::new_from_stored_membership(
320322
meta.last_membership.clone(),
321323
));

openraft/src/engine/handler/leader_handler/append_entries_test.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ fn eng() -> Engine<UTConfig> {
5252
eng.state.enable_validation(false); // Disable validation for incomplete state
5353

5454
eng.config.id = 1;
55-
eng.state.io_state_mut().update_committed(log_id(0, 1, 0));
55+
eng.state.apply_progress_mut().accept(log_id(0, 1, 0));
5656
eng.state.vote = Leased::new(
5757
UTConfig::<()>::now(),
5858
Duration::from_millis(500),
@@ -79,7 +79,7 @@ fn test_leader_append_entries_empty() -> anyhow::Result<()> {
7979

8080
assert_eq!(
8181
None,
82-
eng.state.accepted_io(),
82+
eng.state.accepted_log_io(),
8383
"no accepted log updated for empty entries"
8484
);
8585
assert_eq!(
@@ -119,7 +119,7 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
119119
Vote::new(3, 1).into_committed(),
120120
Some(log_id(3, 1, 6))
121121
)),
122-
eng.state.accepted_io()
122+
eng.state.accepted_log_io()
123123
);
124124
assert_eq!(
125125
&[
@@ -185,7 +185,7 @@ fn test_leader_append_entries_single_node_leader() -> anyhow::Result<()> {
185185
Vote::new(3, 1).into_committed(),
186186
Some(log_id(3, 1, 6))
187187
)),
188-
eng.state.accepted_io()
188+
eng.state.accepted_log_io()
189189
);
190190
assert_eq!(
191191
&[
@@ -243,7 +243,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
243243
Vote::new(3, 1).into_committed(),
244244
Some(log_id(3, 1, 6))
245245
)),
246-
eng.state.accepted_io()
246+
eng.state.accepted_log_io()
247247
);
248248
assert_eq!(
249249
&[

openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ fn eng() -> Engine<UTConfig> {
3232
eng.state.enable_validation(false); // Disable validation for incomplete state
3333

3434
eng.config.id = 1;
35-
eng.state.io_state_mut().update_committed(log_id(0, 1, 0));
35+
eng.state.apply_progress_mut().accept(log_id(0, 1, 0));
3636
eng.state.vote = Leased::new(
3737
UTConfig::<()>::now(),
3838
Duration::from_millis(500),
@@ -54,13 +54,13 @@ fn eng() -> Engine<UTConfig> {
5454
fn test_get_read_log_id() -> anyhow::Result<()> {
5555
let mut eng = eng();
5656

57-
eng.state.io_state_mut().update_committed(log_id(0, 1, 0));
57+
eng.state.apply_progress_mut().accept(log_id(0, 1, 0));
5858
eng.leader.as_mut().unwrap().noop_log_id = Some(log_id(1, 1, 2));
5959

6060
let got = eng.leader_handler()?.get_read_log_id();
6161
assert_eq!(Some(log_id(1, 1, 2)), got);
6262

63-
eng.state.io_state_mut().update_committed(log_id(2, 1, 3));
63+
eng.state.apply_progress_mut().accept(log_id(2, 1, 3));
6464
let got = eng.leader_handler()?.get_read_log_id();
6565
assert_eq!(Some(log_id(2, 1, 3)), got);
6666

openraft/src/engine/handler/leader_handler/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where C: RaftTypeConfig
7171
}
7272
}
7373

74-
self.state.accept_io(IOId::new_log_io(
74+
self.state.accept_log_io(IOId::new_log_io(
7575
self.leader.committed_vote.clone(),
7676
self.leader.last_log_id().cloned(),
7777
));

0 commit comments

Comments
 (0)