Skip to content

Commit d96c6dd

Browse files
committed
raft: minor tweaks
1 parent 3b86030 commit d96c6dd

File tree

7 files changed

+58
-52
lines changed

7 files changed

+58
-52
lines changed

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl Error {
4242
/// machine application needs to know whether a command failure is
4343
/// deterministic on the input command -- if it is, the command can be
4444
/// considered applied and the error returned to the client, but otherwise
45-
/// the state machine must panic to prevent replica divergence.
45+
/// the state machine must panic to prevent node divergence.
4646
pub fn is_deterministic(&self) -> bool {
4747
match self {
4848
// Aborts don't happen during application, only leader changes. But

src/raft/log.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ use crate::encoding::{self, Key as _, Value as _, bincode};
77
use crate::error::Result;
88
use crate::storage;
99

10-
/// A log index. Starts at 1, indicates no index if 0.
10+
/// A log index (entry position). Starts at 1. 0 indicates no index.
1111
pub type Index = u64;
1212

13-
/// A log entry.
13+
/// A log entry containing a state machine command.
1414
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1515
pub struct Entry {
1616
/// The entry index.
17+
///
18+
/// We could omit the index in the encoded value, since it's also stored in
19+
/// the key, but we keep it simple.
1720
pub index: Index,
1821
/// The term in which the entry was added.
1922
pub term: Term,
@@ -106,15 +109,16 @@ pub struct Log {
106109
/// If true, fsync entries to disk when appended. This is mandated by Raft,
107110
/// but comes with a hefty performance penalty (especially since we don't
108111
/// optimize for it by batching entries before fsyncing). Disabling it will
109-
/// yield much better write performance, but may lose data on host crashes,
110-
/// which in some scenarios can cause log entries to become "uncommitted"
111-
/// and state machines diverging.
112+
/// yield much better write performance, but may lose data on crashes, which
113+
/// in some scenarios can cause log entries to become "uncommitted" and
114+
/// state machines diverging.
112115
fsync: bool,
113116
}
114117

115118
impl Log {
116119
/// Initializes a log using the given storage engine.
117120
pub fn new(mut engine: Box<dyn storage::Engine>) -> Result<Self> {
121+
// Load some initial in-memory state from disk.
118122
let (term, vote) = engine
119123
.get(&Key::TermVote.encode())?
120124
.map(|v| bincode::deserialize(&v))
@@ -136,7 +140,8 @@ impl Log {
136140
.map(|v| bincode::deserialize(&v))
137141
.transpose()?
138142
.unwrap_or((0, 0));
139-
let fsync = true; // fsync by default (NB: BitCask::flush() is a noop in tests)
143+
144+
let fsync = true; // fsync by default
140145
Ok(Self { engine, term, vote, last_index, last_term, commit_index, commit_term, fsync })
141146
}
142147

@@ -168,11 +173,12 @@ impl Log {
168173
assert!(term > 0, "can't set term 0");
169174
assert!(term >= self.term, "term regression {} → {}", self.term, term);
170175
assert!(term > self.term || self.vote.is_none() || vote == self.vote, "can't change vote");
176+
171177
if term == self.term && vote == self.vote {
172178
return Ok(());
173179
}
174180
self.engine.set(&Key::TermVote.encode(), bincode::serialize(&(term, vote)))?;
175-
// Always fsync, even with Log.fsync = false. Term changes are rare, so
181+
// Always fsync, even with Log::fsync = false. Term changes are rare, so
176182
// this doesn't materially affect performance, and double voting could
177183
// lead to multiple leaders and split brain which is really bad.
178184
self.engine.flush()?;
@@ -186,8 +192,6 @@ impl Log {
186192
/// Raft leader changes.
187193
pub fn append(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
188194
assert!(self.term > 0, "can't append entry in term 0");
189-
// We could omit the index in the encoded value, since it's also stored
190-
// in the key, but we keep it simple.
191195
let entry = Entry { index: self.last_index + 1, term: self.term, command };
192196
self.engine.set(&Key::Entry(entry.index).encode(), entry.encode())?;
193197
if self.fsync {
@@ -202,16 +206,16 @@ impl Log {
202206
/// exist and be at or after the current commit index.
203207
pub fn commit(&mut self, index: Index) -> Result<Index> {
204208
let term = match self.get(index)? {
205-
Some(e) if e.index < self.commit_index => {
206-
panic!("commit index regression {} → {}", self.commit_index, e.index);
209+
Some(entry) if entry.index < self.commit_index => {
210+
panic!("commit index regression {} → {}", self.commit_index, entry.index);
207211
}
208-
Some(e) if e.index == self.commit_index => return Ok(index),
209-
Some(e) => e.term,
212+
Some(entry) if entry.index == self.commit_index => return Ok(index),
213+
Some(entry) => entry.term,
210214
None => panic!("commit index {index} does not exist"),
211215
};
212216
self.engine.set(&Key::CommitIndex.encode(), bincode::serialize(&(index, term)))?;
213217
// NB: the commit index doesn't need to be fsynced, since the entries
214-
// are fsynced and the commit index can be recovered from a log quorum.
218+
// are fsynced and the commit index can be recovered from the quorum.
215219
self.commit_index = index;
216220
self.commit_term = term;
217221
Ok(index)
@@ -255,21 +259,22 @@ impl Log {
255259
pub fn scan_apply(&mut self, applied_index: Index) -> Iterator {
256260
// NB: we don't assert that commit_index >= applied_index, because the
257261
// local commit index is not flushed to durable storage -- if lost on
258-
// restart, it can be recovered from a quorum of logs.
262+
// restart, it can be recovered from the logs of a quorum.
259263
if applied_index >= self.commit_index {
260264
return Iterator::new(Box::new(std::iter::empty()));
261265
}
262266
self.scan(applied_index + 1..=self.commit_index)
263267
}
264268

265-
/// Splices a set of entries into the log and flushes it to disk. The
266-
/// entries must have contiguous indexes and equal/increasing terms, and the
267-
/// first entry must be in the range [1,last_index+1] with a term at or
269+
/// Splices a set of entries into the log and flushes it to disk. New
270+
/// indexes will be appended. Overlapping indexes with the same term must be
271+
/// equal and will be ignored. Overlapping indexes with different terms will
272+
/// truncate the existing log at the first conflict and then splice the new
273+
/// entries.
274+
///
275+
/// The entries must have contiguous indexes and equal/increasing terms, and
276+
/// the first entry must be in the range [1,last_index+1] with a term at or
268277
/// above the previous (base) entry's term and at or below the current term.
269-
/// New indexes will be appended. Overlapping indexes with the same term
270-
/// must be equal and will be ignored. Overlapping indexes with different
271-
/// terms will truncate the existing log at the first conflict and then
272-
/// splice the new entries.
273278
pub fn splice(&mut self, entries: Vec<Entry>) -> Result<Index> {
274279
let (Some(first), Some(last)) = (entries.first(), entries.last()) else {
275280
return Ok(self.last_index); // empty input is noop

src/raft/message.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ impl encoding::Value for Envelope {}
2525
/// A message sent between Raft nodes. Messages are sent asynchronously (i.e.
2626
/// they are not request/response) and may be dropped or reordered.
2727
///
28-
/// In practice, they are sent across a TCP connection and crossbeam channels
29-
/// ensuring messages are not dropped or reordered as long as the connection
30-
/// remains intact. A message and its response are sent across separate TCP
31-
/// connections (outbound from their respective senders).
28+
/// In practice, they are sent across a TCP connection and crossbeam channel
29+
/// which ensures messages are not dropped or reordered as long as the
30+
/// connection remains intact. A message and its response are sent across
31+
/// separate TCP connections (outbound from the respective sender).
3232
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
3333
pub enum Message {
3434
/// Candidates campaign for leadership by soliciting votes from peers.
@@ -151,10 +151,11 @@ pub enum Message {
151151
},
152152
}
153153

154-
/// A client request ID. Must be globally unique for the duration of the
155-
/// request. For simplicity, a random UUIDv4 is used -- the node ID and process
156-
/// ID could be incorporated for further collision avoidance, but it does not
157-
/// matter at this scale.
154+
/// A client request ID. Must be globally unique while in flight.
155+
///
156+
/// For simplicity, a random UUIDv4 is used. We could incorporate the
157+
/// node/process/MAC ID and timestamp for better collision avoidance (e.g. via
158+
/// UUIDv6) but it doesn't matter at this scale.
158159
pub type RequestID = uuid::Uuid;
159160

160161
/// A read sequence number, used to confirm leadership for linearizable reads.

src/raft/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188
//! machine, the leader looks up the write request by its log index and sends
189189
//! the result to the client. Deterministic errors (e.g. foreign key violations)
190190
//! are also returned to the client, but non-deterministic errors (e.g. IO
191-
//! errors) must panic the node to avoid replica state divergence.
191+
//! errors) must panic the node to avoid state divergence.
192192
//!
193193
//! Read requests, `Request::Read`, are only executed on the leader and don't
194194
//! need to be replicated via the Raft log. However, to ensure linearizability,
@@ -252,15 +252,15 @@ pub use message::{Envelope, Message, ReadSequence, Request, RequestID, Response,
252252
pub use node::{Node, NodeID, Options, Term, Ticks};
253253
pub use state::State;
254254

255-
/// The interval between Raft ticks, the unit of time.
255+
/// The interval between Raft ticks, the Raft unit of time.
256256
pub const TICK_INTERVAL: Duration = Duration::from_millis(100);
257257

258258
/// The interval between leader heartbeats in ticks.
259259
const HEARTBEAT_INTERVAL: Ticks = 4;
260260

261-
/// The default election timeout range in ticks. This is randomized in this
262-
/// interval, to avoid election ties.
261+
/// The default election timeout range in ticks. To avoid election ties, a node
262+
/// chooses a random value in this interval.
263263
const ELECTION_TIMEOUT_RANGE: Range<Ticks> = 10..20;
264264

265-
/// The maximum number of entries to send in a single append message.
265+
/// The maximum number of log entries to send in a single append message.
266266
const MAX_APPEND_ENTRIES: usize = 100;

src/raft/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl RawNode<Follower> {
522522
debug!("Applying {entry:?}");
523523
// Throw away the result, since only the leader responds to clients.
524524
// This includes errors -- any non-deterministic errors (e.g. IO
525-
// errors) must panic instead to avoid replica divergence.
525+
// errors) must panic instead to avoid node divergence.
526526
_ = self.state.apply(entry);
527527
}
528528
Ok(())

src/raft/state.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ use crate::error::Result;
66
/// arbitrary binary commands sequentially from the Raft log, returning an
77
/// arbitrary binary result to the client.
88
///
9-
/// Since commands are applied identically across all replicas, they must be
10-
/// deterministic and yield the same state and result across all replicas too.
11-
/// Otherwise, the replicas will diverge, and different replicas will produce
9+
/// Since commands are applied identically across all nodes, they must be
10+
/// deterministic and yield the same state and result across all nodes too.
11+
/// Otherwise, the nodes will diverge, such that different nodes will produce
1212
/// different results.
1313
///
14-
/// Write commands (`Request::Write`) are replicated and applied on all replicas
14+
/// Write commands (`Request::Write`) are replicated and applied on all nodes
1515
/// via `State::apply`. The state machine must keep track of the last applied
1616
/// index and return it via `State::get_applied_index`. Read commands
17-
/// (`Request::Read`) are only executed on a single replica via `State::read`
18-
/// and must not make any state changes.
17+
/// (`Request::Read`) are only executed on a single node via `State::read` and
18+
/// must not make any state changes.
1919
pub trait State: Send {
20-
/// Returns the last applied index from the state machine.
20+
/// Returns the last applied log index from the state machine.
2121
///
2222
/// This must correspond to the current state of the state machine, since it
2323
/// determines which command to apply next. In particular, a node crash may
@@ -28,13 +28,13 @@ pub trait State: Send {
2828
/// Applies a log entry to the state machine, returning a client result.
2929
/// Errors are considered applied and propagated back to the client.
3030
///
31-
/// This is executed on all replicas, so the result must be deterministic:
32-
/// it must yield the same state and result on all replicas, even if the
33-
/// command is reapplied following a node crash.
31+
/// This is executed on all nodes, so the result must be deterministic: it
32+
/// must yield the same state and result on all nodes, even if the command
33+
/// is reapplied following a node crash.
3434
///
3535
/// Any non-deterministic apply error (e.g. an IO error) must panic and
3636
/// crash the node -- if it instead returns an error to the client, the
37-
/// command is considered applied and replica states will diverge. The state
37+
/// command is considered applied and node states will diverge. The state
3838
/// machine is responsible for panicing when appropriate.
3939
///
4040
/// The entry may contain a noop command, which is committed by Raft during
@@ -45,8 +45,8 @@ pub trait State: Send {
4545
/// Executes a read command in the state machine, returning a client result.
4646
/// Errors are also propagated back to the client.
4747
///
48-
/// This is only executed on a single replica/node, so it must not result in
49-
/// any state changes (i.e. it must not write).
48+
/// This is only executed on a single node, so it must not result in any
49+
/// state changes (i.e. it must not write).
5050
fn read(&self, command: Vec<u8>) -> Result<Vec<u8>>;
5151
}
5252

src/sql/engine/raft.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ impl<E: storage::Engine> raft::State for State<E> {
282282

283283
let result = match &entry.command {
284284
Some(command) => match self.write(Write::decode(command)?) {
285-
// Panic on non-deterministic apply failures, to prevent replica
286-
// divergence. See [`raft::State`] docs for details.
285+
// Panic on non-deterministic apply failures, to prevent node
286+
// state divergence. See [`raft::State`] docs for details.
287287
Err(e) if !e.is_deterministic() => panic!("non-deterministic apply failure: {e}"),
288288
result => result,
289289
},

0 commit comments

Comments
 (0)