Skip to content

Latest commit

 

History

History
440 lines (290 loc) · 25.3 KB

File metadata and controls

440 lines (290 loc) · 25.3 KB

Raft Consensus

Raft is a distributed consensus protocol which replicates data across a cluster of nodes in a consistent and durable manner. It is described in the very readable Raft paper, and in the more comprehensive Raft thesis.

The toyDB Raft implementation is in the raft module, and is described in the module documentation:

toydb/src/raft/mod.rs

Lines 1 to 240 in d96c6dd

//! Implements the Raft distributed consensus protocol.
//!
//! For details, see Diego Ongaro's original writings:
//!
//! * Raft paper: <https://raft.github.io/raft.pdf>
//! * Raft thesis: <https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf>
//! * Raft website: <https://raft.github.io>
//!
//! Raft is a protocol for a group of computers to agree on some data -- or more
//! simply, to replicate the data. It is broadly equivalent to [Paxos] and
//! [Viewstamped Replication], but more prescriptive and simpler to understand.
//!
//! Raft has three main properties:
//!
//! * Fault tolerance: the system tolerates node failures as long as a majority
//! of nodes (>50%) remain operational.
//!
//! * Linearizability (aka strong consistency): once a client write has been
//! accepted, it is visible to all clients -- they never see outdated data.
//!
//! * Durability: a write is never lost as long as a majority of nodes remain.
//!
//! It does this by electing a single leader node which serves client requests
//! and replicates writes to other nodes. Requests are executed once they have
//! been confirmed by a strict majority of nodes (a quorum). If a leader fails,
//! a new leader is elected. Clusters have 3 or more nodes, since a two-node
//! cluster can't tolerate failures (1/2 is not a majority and would lead to
//! split brain).
//!
//! Notably, Raft does not provide horizontal scalability. Client requests are
//! processed by a single leader node which can quickly become a bottleneck, and
//! each node stores a complete copy of the entire dataset. Systems often handle
//! this by sharding the data into multiple Raft clusters and using a
//! distributed transaction protocol across them, but this is out of scope here.
//!
//! toyDB follows the Raft paper fairly closely, but, like most implementations,
//! takes some minor artistic liberties.
//!
//! [Paxos]: https://www.microsoft.com/en-us/research/uploads/prod/2016/12/paxos-simple-Copy.pdf
//! [Viewstamped Replication]: https://pmg.csail.mit.edu/papers/vr-revisited.pdf
//!
//! RAFT LOG AND STATE MACHINE
//! ==========================
//!
//! Raft maintains an ordered command log containing arbitrary write commands
//! submitted by clients. It attempts to reach consensus on this log by
//! replicating it to a majority of nodes. If successful, the log is considered
//! committed and immutable up to that point.
//!
//! Once committed, the commands in the log are applied sequentially to a local
//! state machine on each node. Raft itself doesn't care what the state machine
//! and commands are -- in toyDB's case it's a SQL database, but it could be
//! anything. Raft simply passes opaque commands to an opaque state machine.
//!
//! Each log entry contains an index, the leader's term (see next section), and
//! the command. For example, a naïve illustration of a toyDB Raft log might be:
//!
//! Index | Term | Command
//! ------|------|------------------------------------------------------
//! 1 | 1 | CREATE TABLE table (id INT PRIMARY KEY, value STRING)
//! 2 | 1 | INSERT INTO table VALUES (1, 'foo')
//! 3 | 2 | UPDATE table SET value = 'bar' WHERE id = 1
//! 4 | 2 | DELETE FROM table WHERE id = 1
//!
//! The state machine must be deterministic, such that all nodes will reach the
//! same identical state. Raft will apply the same commands in the same order
//! independently on all nodes, but if the commands have non-deterministic
//! behavior such as random number generation or communication with external
//! systems it can lead to state divergence causing different results.
//!
//! In toyDB, the Raft log is managed by `Log` and stored locally in a
//! `storage::Engine`. The state machine interface is the `State` trait. See
//! their documentation for more details.
//!
//! LEADER ELECTION
//! ===============
//!
//! Raft nodes can be in one of three states (or roles): follower, candidate,
//! and leader. toyDB models these as `Node::Follower`, `Node::Candidate`, and
//! `Node::Leader`.
//!
//! * Follower: replicates log entries from a leader. May not know a leader yet.
//! * Candidate: campaigns for leadership in an election.
//! * Leader: processes client requests and replicates writes to followers.
//!
//! Raft fundamentally relies on a single guarantee: there can be at most one
//! _valid_ leader at any point in time (old, since-replaced leaders may think
//! they're still a leader, e.g. during a network partition, but they won't be
//! able to do anything). It enforces this through the leader election protocol.
//!
//! Raft divides time into terms, which are monotonically increasing numbers.
//! Higher terms always take priority over lower terms. There can be at most one
//! leader in a term, and it can't change. Nodes keep track of their last known
//! term and store it on disk (see `Log.set_term()`). Messages between nodes are
//! tagged with the current term (as `Envelope.term`) -- old terms are ignored,
//! and future terms cause the node to become a follower in that term.
//!
//! Nodes start out as leaderless followers. If they receive a message from a
//! leader (in a current or future term), they follow it. Otherwise, they wait
//! out the election timeout (a few seconds), become candidates, and hold a
//! leader election.
//!
//! Candidates increase their term by 1 and send `Message::Campaign` to all
//! nodes, requesting their vote. Nodes respond with `Message::CampaignResponse`
//! saying whether a vote was granted. A node can only grant a single vote in a
//! term (stored to disk via `Log.set_term()`), on a first-come first-serve
//! basis, and candidates implicitly vote for themselves.
//!
//! When a candidate receives a majority of votes (>50%), it becomes leader. It
//! sends a `Message::Heartbeat` to all nodes asserting its leadership, and all
//! nodes become followers when they receive it (regardless of who they voted
//! for). Leaders continue to send periodic heartbeats every second or so. The
//! new leader also appends an empty entry to its log in order to safely commit
//! all entries from previous terms (Raft paper section 5.4.2).
//!
//! The new leader must have all committed entries in its log (or the cluster
//! would lose data). To ensure this, there is one additional condition for
//! granting a vote: the candidate's log must be at least as up-to-date as the
//! voter. Because an entry must be replicated to a majority before being
//! committed, this ensures a candidate can only win a majority of votes if its
//! log is up-to-date with all committed entries (Raft paper section 5.4.1).
//!
//! It's possible that no candidate wins an election, for example due to a tie
//! or a majority of nodes being offline. After an election timeout passes,
//! candidates will again bump their term and start a new election, until a
//! leader can be established. To avoid frequent ties, nodes use different,
//! randomized election timeouts (Raft paper section 5.2).
//!
//! Similarly, if a follower doesn't hear from a leader in an election timeout
//! interval, it will become candidate and hold another election. The periodic
//! leader heartbeats prevent this as long as the leader is running and
//! connected. A node that becomes disconnected from the leader will continually
//! hold new elections by itself until the network heals, at which point a new
//! election will be held in its term (disrupting the current leader).
//!
//! REPLICATION AND CONSENSUS
//! =========================
//!
//! When the leader receives a client write request, it appends the command to
//! its local log via `Log.append()`, and sends the log entry to all peers in
//! a `Message::Append`. Followers will attempt to durably append the entry to
//! their local logs and respond with `Message::AppendResponse`.
//!
//! Once a majority have acknowledged the append, the leader commits the entry
//! via `Log.commit()` and applies it to its local state machine, returning the
//! result to the client. It will inform followers about the commit in the next
//! heartbeat as `Message::Heartbeat.commit_index` so they can apply it too, but
//! this is not necessary for correctness (they will commit and apply it if they
//! become leader, otherwise they have no need for applying it).
//!
//! Followers may not be able to append the entry to their log -- they may be
//! unreachable, lag behind the leader, or have divergent logs (see Raft paper
//! section 5.3). The `Append` contains the index and term of the log entry
//! immediately before the replicated entry as `base_index` and `base_term`. An
//! index/term pair uniquely identifies a command, and if two logs have the same
//! index/term pair then the logs are identical up to and including that entry
//! (Raft paper section 5.3). If the base index/term matches the follower's log,
//! it appends the entry (potentially replacing any conflicting entries),
//! otherwise it rejects it.
//!
//! When a follower rejects an append, the leader must try to find a common log
//! entry that exists in both its and the follower's log where it can resume
//! replication. It does this by sending `Message::Append` probes only
//! containing a base index/term but no entries -- it will continue to probe
//! decreasing indexes one by one until the follower responds with a match, then
//! send an `Append` with the missing entries (Raft paper section 5.3). It keeps
//! track of each follower's `match_index` and `next_index` in a `Progress`
//! struct to manage this.
//!
//! In case `Append` messages or responses are lost, leaders also send their
//! `last_index` and term in each `Heartbeat`. If followers don't have that
//! index/term pair in their log, they'll say so in the `HeartbeatResponse` and
//! the leader can begin probing their logs as with append rejections.
//!
//! CLIENT REQUESTS
//! ===============
//!
//! Client requests are submitted as `Message::ClientRequest` to the local Raft
//! node. They are only processed on the leader, but followers will proxy them
//! to the leader (Raft thesis section 6.2). To avoid complications with message
//! replays (Raft thesis section 6.3), requests are not retried internally, and
//! are explicitly aborted with `Error::Abort` on leader/term changes as well as
//! elections.
//!
//! Write requests, `Request::Write`, are appended to the Raft log and
//! replicated. The leader keeps track of the request and its log index in a
//! `Write` struct. Once the command is committed and applied to the local state
//! machine, the leader looks up the write request by its log index and sends
//! the result to the client. Deterministic errors (e.g. foreign key violations)
//! are also returned to the client, but non-deterministic errors (e.g. IO
//! errors) must panic the node to avoid state divergence.
//!
//! Read requests, `Request::Read`, are only executed on the leader and don't
//! need to be replicated via the Raft log. However, to ensure linearizability,
//! the leader has to confirm with a quorum that it's actually still the leader.
//! Otherwise, it's possible that a new leader has been elected elsewhere and
//! executed writes without us knowing about it. It does this by assigning an
//! incrementing sequence number to each read, keeping track of the request in a
//! `Read` struct, and immediately sending a `Read` message with the latest
//! sequence number. Followers respond with the sequence number, and once a
//! quorum have confirmed a sequence number the read is executed and the result
//! returned to the client.
//!
//! IMPLEMENTATION CAVEATS
//! ======================
//!
//! For simplicity, toyDB implements the bare minimum for a functional and
//! correct Raft protocol, and omits several advanced mechanisms that would be
//! needed for a real production system. In particular:
//!
//! * No leases: for linearizability, every read request requires the leader to
//! confirm with followers that it's still the leader. This could be avoided
//! with a leader lease for a predefined time interval (Raft paper section 8,
//! Raft thesis section 6.3).
//!
//! * No cluster membership changes: to add or remove nodes, the entire cluster
//! must be stopped and restarted with the new configuration, otherwise it
//! risks multiple leaders (Raft paper section 6).
//!
//! * No snapshots: new or lagging nodes must be caught up by replicating and
//! replaying the entire log, instead of sending a state machine snapshot
//! (Raft paper section 7).
//!
//! * No log truncation: because snapshots aren't supported, the entire Raft
//! log must be retained forever in order to catch up new/lagging nodes,
//! leading to excessive storage use (Raft paper section 7).
//!
//! * No pre-vote or check-quorum: a node that's partially partitioned (can
//! reach some but not all nodes) can cause persistent unavailability with
//! spurious elections or heartbeats. A node rejoining after a partition can
//! also temporarily disrupt a leader. This requires additional pre-vote and
//! check-quorum protocol extensions (Raft thesis section 4.2.3 and 9.6).
//!
//! * No request retries: client requests will not be retried on leader changes
//! or message loss, and will be aggressively aborted, to ignore problems
//! related to message replay (Raft thesis section 6.3).
//!
//! * No reject hints: if a follower has a divergent log, the leader will probe
//! entries one by one until a match is found. The replication protocol could
//! instead be extended with rejection hints (Raft paper section 5.3).

Raft is fundamentally the same protocol as Paxos and Viewstamped Replication, but an opinionated variant designed to be simple, understandable, and practical. It is widely used in the industry: CockroachDB, TiDB, etcd, Consul, and many others.

Briefly, Raft elects a leader node which coordinates writes and replicates them to followers. Once a majority (>50%) of nodes have acknowledged a write, it is considered durably committed. It is common for the leader to also serve reads, since it always has the most recent data and is thus strongly consistent.

A cluster must have a majority of nodes (known as a quorum) live and connected to remain available, otherwise it will not commit writes in order to guarantee data consistency and durability. Since there can only be one majority in the cluster, this prevents a split brain scenario where two active leaders can exist concurrently (e.g. during a network partition) and store conflicting values.

The Raft leader appends writes to an ordered command log, which is then replicated to followers. Once a majority has replicated the log up to a given entry, that log prefix is committed and then applied to a state machine. This ensures that all nodes will apply the same commands in the same order and eventually reach the same state (assuming the commands are deterministic). Raft itself doesn't care what the state machine and commands are, but in toyDB's case it's SQL tables and rows stored in an MVCC key/value store.

This diagram from the Raft paper illustrates how a Raft node receives a command from a client (1), adds it to its log and reaches consensus with other nodes (2), then applies it to its state machine (3) before returning a result to the client (4):

Raft node

You may notice that Raft is not very scalable, since all reads and writes go via the leader node, and every node must store the entire dataset. Raft solves replication and availability, but not scalability. Real-world systems typically provide horizontal scalability by splitting a large dataset across many separate Raft clusters (i.e. sharding), but this is out of scope for toyDB.

For simplicitly, toyDB implements the bare minimum of Raft, and omits optimizations described in the paper such as state snapshots, log truncation, leader leases, and more. The implementation is in the raft module, and we'll walk through the main components next.

There is a comprehensive set of Raft test scripts in src/raft/testscripts/node, which illustrate the protocol in a wide variety of scenarios.

Log Storage

Raft replicates an ordered command log consisting of raft::Entry:

toydb/src/raft/log.rs

Lines 10 to 28 in 90a6cae

/// A log index (entry position). Starts at 1. 0 indicates no index.
pub type Index = u64;
/// A log entry containing a state machine command.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Entry {
/// The entry index.
///
/// We could omit the index in the encoded value, since it's also stored in
/// the key, but we keep it simple.
pub index: Index,
/// The term in which the entry was added.
pub term: Term,
/// The state machine command. None (noop) commands are used during leader
/// election to commit old entries, see section 5.4.2 in the Raft paper.
pub command: Option<Vec<u8>>,
}
impl encoding::Value for Entry {}

index specifies the position in the log, and command contains the binary command to apply to the state machine. The term identifies the leadership term in which the command was proposed: a new term begins when a new leader election is held (we'll get back to this later).

Entries are appended to the log by the leader and replicated to followers. Once acknowledged by a quorum, the log up to that index is committed and will never change. Entries that are not yet committed may be replaced or removed if the leader changes.

The Raft log enforces the following invariants:

toydb/src/raft/log.rs

Lines 80 to 91 in 8782c2b

/// The Raft log has the following invariants:
///
/// * Entry indexes are contiguous starting at 1 (no index gaps).
/// * Entry terms never decrease from the previous entry.
/// * Entry terms are at or below the current term.
/// * Appended entries are durable (flushed to disk).
/// * Appended entries use the current term.
/// * Committed entries are never changed or removed (no log truncation).
/// * Committed entries will eventually be replicated to all nodes.
/// * Entries with the same index/term contain the same command.
/// * If two logs contain a matching index/term, all previous entries
/// are identical (see section 5.3 in the Raft paper).

raft::Log implements a Raft log, and stores log entries in a storage::Engine key/value store:

toydb/src/raft/log.rs

Lines 43 to 116 in 8782c2b

/// The Raft log stores a sequence of arbitrary commands (typically writes) that
/// are replicated across nodes and applied sequentially to the local state
/// machine. Each entry contains an index, command, and the term in which the
/// leader proposed it. Commands may be noops (None), which are added when a
/// leader is elected (see section 5.4.2 in the Raft paper). For example:
///
/// Index | Term | Command
/// ------|------|------------------------------------------------------
/// 1 | 1 | None
/// 2 | 1 | CREATE TABLE table (id INT PRIMARY KEY, value STRING)
/// 3 | 1 | INSERT INTO table VALUES (1, 'foo')
/// 4 | 2 | None
/// 5 | 2 | UPDATE table SET value = 'bar' WHERE id = 1
/// 6 | 2 | DELETE FROM table WHERE id = 1
///
/// Note that this is for illustration only, and the actual toyDB Raft commands
/// are not SQL statements but lower-level write operations.
///
/// A key/value store is used to store the log entries on disk, keyed by index,
/// along with a few other metadata keys (e.g. who we voted for in this term).
///
/// In the steady state, the log is append-only: when a client submits a
/// command, the leader appends it to its own log (via [`Log::append`]) and
/// replicates it to followers who append it to their logs (via
/// [`Log::splice`]). When an index has been replicated to a majority of nodes
/// it becomes committed, making the log immutable up to that index and
/// guaranteeing that all nodes will eventually contain it. Nodes keep track of
/// the commit index via [`Log::commit`] and apply committed commands to the
/// state machine.
///
/// However, uncommitted entries can be replaced or removed. A leader may append
/// entries to its log, but then be unable to reach consensus on them (e.g.
/// because it is unable to communicate with a majority of nodes). If a
/// different leader is elected and writes different commands to those same
/// indexes, then the uncommitted entries will be replaced with entries from the
/// new leader once the old leader (or a follower) discovers it.
///
/// The Raft log has the following invariants:
///
/// * Entry indexes are contiguous starting at 1 (no index gaps).
/// * Entry terms never decrease from the previous entry.
/// * Entry terms are at or below the current term.
/// * Appended entries are durable (flushed to disk).
/// * Appended entries use the current term.
/// * Committed entries are never changed or removed (no log truncation).
/// * Committed entries will eventually be replicated to all nodes.
/// * Entries with the same index/term contain the same command.
/// * If two logs contain a matching index/term, all previous entries
/// are identical (see section 5.3 in the Raft paper).
pub struct Log {
/// The underlying storage engine. Uses a trait object instead of generics,
/// to allow runtime selection of the engine and avoid propagating the
/// generic type parameters throughout Raft.
pub engine: Box<dyn storage::Engine>,
/// The current term.
term: Term,
/// Our leader vote in the current term, if any.
vote: Option<NodeID>,
/// The index of the last stored entry.
last_index: Index,
/// The term of the last stored entry.
last_term: Term,
/// The index of the last committed entry.
commit_index: Index,
/// The term of the last committed entry.
commit_term: Term,
/// If true, fsync entries to disk when appended. This is mandated by Raft,
/// but comes with a hefty performance penalty (especially since we don't
/// optimize for it by batching entries before fsyncing). Disabling it will
/// yield much better write performance, but may lose data on crashes, which
/// in some scenarios can cause log entries to become "uncommitted" and
/// state machines diverging.
fsync: bool,
}

It also stores some additional metadata that we'll need later: the current term, vote, and commit index. These are stored as separate keys:

toydb/src/raft/log.rs

Lines 30 to 39 in 8782c2b

/// A log storage key.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Key {
/// A log entry, storing the term and command.
Entry(Index),
/// Stores the current term and vote (if any).
TermVote,
/// Stores the current commit index (if any).
CommitIndex,
}

Individual entries are appended to the log via Log::append, typically when the leader wants to replicate a new write:

toydb/src/raft/log.rs

Lines 190 to 203 in 8782c2b

/// Appends a command to the log at the current term, and flushes it to
/// disk, returning its index. None implies a noop command, typically after
/// Raft leader changes.
pub fn append(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
assert!(self.term > 0, "can't append entry in term 0");
let entry = Entry { index: self.last_index + 1, term: self.term, command };
self.engine.set(&Key::Entry(entry.index).encode(), entry.encode())?;
if self.fsync {
self.engine.flush()?;
}
self.last_index = entry.index;
self.last_term = entry.term;
Ok(entry.index)
}

Entries can also be appended in bulk via Log::splice, typically when entries are replicated to followers. This also allows replacing existing uncommitted entries, e.g. after a leader change:

toydb/src/raft/log.rs

Lines 269 to 343 in 8782c2b

/// Splices a set of entries into the log and flushes it to disk. New
/// indexes will be appended. Overlapping indexes with the same term must be
/// equal and will be ignored. Overlapping indexes with different terms will
/// truncate the existing log at the first conflict and then splice the new
/// entries.
///
/// The entries must have contiguous indexes and equal/increasing terms, and
/// the first entry must be in the range [1,last_index+1] with a term at or
/// above the previous (base) entry's term and at or below the current term.
pub fn splice(&mut self, entries: Vec<Entry>) -> Result<Index> {
let (Some(first), Some(last)) = (entries.first(), entries.last()) else {
return Ok(self.last_index); // empty input is noop
};
// Check that the entries are well-formed.
assert!(first.index > 0 && first.term > 0, "spliced entry has index or term 0",);
assert!(
entries.windows(2).all(|w| w[0].index + 1 == w[1].index),
"spliced entries are not contiguous"
);
assert!(
entries.windows(2).all(|w| w[0].term <= w[1].term),
"spliced entries have term regression",
);
// Check that the entries connect to the existing log (if any), and that the
// term doesn't regress.
assert!(last.term <= self.term, "splice term {} beyond current {}", last.term, self.term);
match self.get(first.index - 1)? {
Some(base) if first.term < base.term => {
panic!("splice term regression {} → {}", base.term, first.term)
}
Some(_) => {}
None if first.index == 1 => {}
None => panic!("first index {} must touch existing log", first.index),
}
// Skip entries that are already in the log.
let mut entries = entries.as_slice();
let mut scan = self.scan(first.index..=last.index);
while let Some(entry) = scan.next().transpose()? {
// [0] is ok, because the scan has the same size as entries.
assert!(entry.index == entries[0].index, "index mismatch at {entry:?}");
if entry.term != entries[0].term {
break;
}
assert!(entry.command == entries[0].command, "command mismatch at {entry:?}");
entries = &entries[1..];
}
drop(scan);
// If all entries already exist then we're done.
let Some(first) = entries.first() else {
return Ok(self.last_index);
};
// Write the entries that weren't already in the log, and remove the
// tail of the old log if any. We can't write below the commit index,
// since these entries must be immutable.
assert!(first.index > self.commit_index, "spliced entries below commit index");
for entry in entries {
self.engine.set(&Key::Entry(entry.index).encode(), entry.encode())?;
}
for index in last.index + 1..=self.last_index {
self.engine.delete(&Key::Entry(index).encode())?;
}
if self.fsync {
self.engine.flush()?;
}
self.last_index = last.index;
self.last_term = last.term;
Ok(self.last_index)
}

Committed entries are marked by Log::commit, making them immutable and eligible for state machine application:

toydb/src/raft/log.rs

Lines 205 to 222 in 8782c2b

/// Commits entries up to and including the given index. The index must
/// exist and be at or after the current commit index.
pub fn commit(&mut self, index: Index) -> Result<Index> {
let term = match self.get(index)? {
Some(entry) if entry.index < self.commit_index => {
panic!("commit index regression {} → {}", self.commit_index, entry.index);
}
Some(entry) if entry.index == self.commit_index => return Ok(index),
Some(entry) => entry.term,
None => panic!("commit index {index} does not exist"),
};
self.engine.set(&Key::CommitIndex.encode(), bincode::serialize(&(index, term)))?;
// NB: the commit index doesn't need to be fsynced, since the entries
// are fsynced and the commit index can be recovered from the quorum.
self.commit_index = index;
self.commit_term = term;
Ok(index)
}

The log also has methods to read entries from the log, either individually as Log::get or by iterating over a range with Log::scan:

toydb/src/raft/log.rs

Lines 224 to 267 in 8782c2b

/// Fetches an entry at an index, or None if it does not exist.
pub fn get(&mut self, index: Index) -> Result<Option<Entry>> {
self.engine.get(&Key::Entry(index).encode())?.map(|v| Entry::decode(&v)).transpose()
}
/// Checks if the log contains an entry with the given index and term.
pub fn has(&mut self, index: Index, term: Term) -> Result<bool> {
// Fast path: check against last_index. This is the common case when
// followers process appends or heartbeats.
if index == 0 || index > self.last_index {
return Ok(false);
}
if (index, term) == (self.last_index, self.last_term) {
return Ok(true);
}
Ok(self.get(index)?.map(|e| e.term == term).unwrap_or(false))
}
/// Returns an iterator over log entries in the given index range.
pub fn scan(&mut self, range: impl RangeBounds<Index>) -> Iterator {
let from = match range.start_bound() {
Bound::Excluded(&index) => Bound::Excluded(Key::Entry(index).encode()),
Bound::Included(&index) => Bound::Included(Key::Entry(index).encode()),
Bound::Unbounded => Bound::Included(Key::Entry(0).encode()),
};
let to = match range.end_bound() {
Bound::Excluded(&index) => Bound::Excluded(Key::Entry(index).encode()),
Bound::Included(&index) => Bound::Included(Key::Entry(index).encode()),
Bound::Unbounded => Bound::Included(Key::Entry(Index::MAX).encode()),
};
Iterator::new(self.engine.scan_dyn((from, to)))
}
/// Returns an iterator over entries that are ready to apply, starting after
/// the current applied index up to the commit index.
pub fn scan_apply(&mut self, applied_index: Index) -> Iterator {
// NB: we don't assert that commit_index >= applied_index, because the
// local commit index is not flushed to durable storage -- if lost on
// restart, it can be recovered from the logs of a quorum.
if applied_index >= self.commit_index {
return Iterator::new(Box::new(std::iter::empty()));
}
self.scan(applied_index + 1..=self.commit_index)
}

State Machine Interface

Raft doesn't know or care what the log commands are, nor what the state machine does with them. It simply takes raft::Entry from the log and gives them to the state machine.

The Raft state machine is represented by the raft::State trait. Raft will ask about the last applied entry via State::get_applied_index, and feed it newly committed entries via State::apply. It also allows reads via State::read, but we'll get back to that later.

toydb/src/raft/state.rs

Lines 4 to 51 in 8782c2b

/// A Raft-managed state machine. Raft itself does not care what the state
/// machine is, nor what the commands and results do -- it will simply apply
/// arbitrary binary commands sequentially from the Raft log, returning an
/// arbitrary binary result to the client.
///
/// Since commands are applied identically across all nodes, they must be
/// deterministic and yield the same state and result across all nodes too.
/// Otherwise, the nodes will diverge, such that different nodes will produce
/// different results.
///
/// Write commands (`Request::Write`) are replicated and applied on all nodes
/// via `State::apply`. The state machine must keep track of the last applied
/// index and return it via `State::get_applied_index`. Read commands
/// (`Request::Read`) are only executed on a single node via `State::read` and
/// must not make any state changes.
pub trait State: Send {
/// Returns the last applied log index from the state machine.
///
/// This must correspond to the current state of the state machine, since it
/// determines which command to apply next. In particular, a node crash may
/// result in partial command application or data loss, which must be
/// handled appropriately.
fn get_applied_index(&self) -> Index;
/// Applies a log entry to the state machine, returning a client result.
/// Errors are considered applied and propagated back to the client.
///
/// This is executed on all nodes, so the result must be deterministic: it
/// must yield the same state and result on all nodes, even if the command
/// is reapplied following a node crash.
///
/// Any non-deterministic apply error (e.g. an IO error) must panic and
/// crash the node -- if it instead returns an error to the client, the
/// command is considered applied and node states will diverge. The state
/// machine is responsible for panicing when appropriate.
///
/// The entry may contain a noop command, which is committed by Raft during
/// leader changes. This still needs to be applied to the state machine to
/// properly update the applied index, and should return an empty result.
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>>;
/// Executes a read command in the state machine, returning a client result.
/// Errors are also propagated back to the client.
///
/// This is only executed on a single node, so it must not result in any
/// state changes (i.e. it must not write).
fn read(&self, command: Vec<u8>) -> Result<Vec<u8>>;
}

The state machine does not have to flush its state to durable storage after each transition; on node crashes, the state machine is allowed to regress, and will be caught up by replaying the unapplied log entries. It is also possible to implement a purely in-memory state machine (and in fact, toyDB allows running the state machine with a Memory storage engine).

The state machine must take care to be deterministic: the same commands applied in the same order must result in the same state across all nodes. This means that a command can't e.g. read the current time or generate a random number -- these values must be included in the command. It also means that non-deterministic errors, such as an IO error, must halt command application (in toyDB's case, we just panic and crash the node).

In toyDB's, the state machine is an MVCC key/value store that stores SQL tables and rows, as we'll see in the SQL Raft replication section.

Node Roles

In Raft, a node can have one out of three roles:

  • Leader: replicates writes to followers and serves client requests.
  • Follower: replicates writes from a leader.
  • Candidate: campaigns for leadership.

The Raft paper summarizes these roles and transitions in the following diagram (we'll discuss leader election in detail below):

Raft states

In toyDB, a node is represented by the raft::Node enum, with variants for each state:

toydb/src/raft/node.rs

Lines 47 to 66 in 8782c2b

/// A Raft node with a dynamic role. This implements the Raft distributed
/// consensus protocol, see the `raft` module documentation for more info.
///
/// The node is driven synchronously by processing inbound messages via `step()`
/// and by advancing time via `tick()`. These methods consume the node and
/// return a new one with a possibly different role. Outbound messages are sent
/// via the given `tx` channel, and must be delivered to peers or clients.
///
/// This enum is the public interface to the node, with a closed set of roles.
/// It wraps the `RawNode<Role>` types, which implement the actual node logic.
/// The enum allows ergonomic use across role transitions since it can represent
/// all roles, e.g.: `node = node.step()?`.
pub enum Node {
/// A candidate campaigns for leadership.
Candidate(RawNode<Candidate>),
/// A follower replicates entries from a leader.
Follower(RawNode<Follower>),
/// A leader processes client requests and replicates entries to followers.
Leader(RawNode<Leader>),
}

This wraps the raft::RawNode<Role> type which contains the inner node state. It is generic over the role, and uses the typestate pattern to provide methods and transitions depending on the node's current role. This enforces state transitions and invariants at compile time via Rust's type system -- for example, only RawNode<Candidate> has an into_leader() method, since only candidates can transition to leaders (when they win an election).

toydb/src/raft/node.rs

Lines 156 to 177 in 8782c2b

/// A Raft node with role R.
///
/// This implements the typestate pattern, where individual node states (roles)
/// are encoded as RawNode<Role>. See http://cliffle.com/blog/rust-typestate/.
pub struct RawNode<R: Role> {
/// The node ID. Must be unique in the cluster.
id: NodeID,
/// The IDs of the other nodes in the cluster. Does not change while
/// running. Can change on restart, but all nodes must have the same set of
/// nodes, otherwise it can result in multiple leaders (split brain).
peers: HashSet<NodeID>,
/// The Raft log, which stores client commands to be executed.
log: Log,
/// The Raft state machine, which executes client commands from the log.
state: Box<dyn State>,
/// Channel for sending outbound messages to other nodes.
tx: Sender<Envelope>,
/// Node options.
opts: Options,
/// Role-specific state.
role: R,
}

The RawNode::role field contains role-specific state as structs implementing the Role marker trait:

toydb/src/raft/node.rs

Lines 661 to 680 in 8782c2b

/// A leader serves client requests and replicates the log to followers.
/// If the leader loses leadership, all client requests are aborted.
pub struct Leader {
/// Follower replication progress.
progress: HashMap<NodeID, Progress>,
/// Tracks pending write requests by log index. Added when the write is
/// proposed and appended to the leader's log, and removed when the command
/// is applied to the state machine, returning the result to the client.
writes: HashMap<Index, Write>,
/// Tracks pending read requests. For linearizability, read requests are
/// assigned a sequence number and only executed once a quorum of nodes have
/// confirmed that we're still the leader. Otherwise, an old leader could
/// serve stale reads if a new leader has been elected elsewhere.
reads: VecDeque<Read>,
/// The read sequence number used for the last read. Initialized to 0 in
/// this term, and incremented for every read command.
read_seq: ReadSequence,
/// Number of ticks since last heartbeat.
since_heartbeat: Ticks,
}

toydb/src/raft/node.rs

Lines 242 to 255 in 8782c2b

/// A follower replicates log entries from a leader and forwards client requests
/// to it. Nodes start as leaderless followers, until they either discover a
/// leader or hold an election.
pub struct Follower {
/// The leader, or None if we're a leaderless follower.
leader: Option<NodeID>,
/// The number of ticks since the last message from the leader.
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
// Local client requests that have been forwarded to the leader. These are
// aborted on leader/term changes.
forwarded: HashSet<RequestID>,
}

toydb/src/raft/node.rs

Lines 523 to 531 in 8782c2b

/// A candidate is campaigning to become a leader.
pub struct Candidate {
/// Votes received (including our own).
votes: HashSet<NodeID>,
/// Ticks elapsed since election start.
election_duration: Ticks,
/// Election timeout, in ticks.
election_timeout: Ticks,
}

We'll see what the various fields are used for in the following sections.

Node Interface and Communication

The raft::Node enum has two main methods that drive the node: tick() and step(). These consume the current node and return a new node, possibly with a different role.

tick() advances time by a logical tick. This is used to measure the passage of time, e.g. to trigger election timeouts or periodic leader heartbeats. toyDB uses a tick interval of 100 milliseconds (see raft::TICK_INTERVAL), and will call tick() on the node at this rate.

toydb/src/raft/node.rs

Lines 125 to 132 in 8782c2b

/// Advances time by a tick.
pub fn tick(self) -> Result<Self> {
match self {
Self::Candidate(node) => node.tick(),
Self::Follower(node) => node.tick(),
Self::Leader(node) => node.tick(),
}
}

step() processes an inbound message from a different node or client:

toydb/src/raft/node.rs

Lines 107 to 123 in 8782c2b

/// Processes an inbound message.
pub fn step(self, msg: Envelope) -> Result<Self> {
let peers = match &self {
Self::Candidate(node) => &node.peers,
Self::Follower(node) => &node.peers,
Self::Leader(node) => &node.peers,
};
assert_eq!(msg.to, self.id(), "message to other node: {msg:?}");
assert!(peers.contains(&msg.from) || msg.from == self.id(), "unknown sender: {msg:?}");
debug!("Stepping {msg:?}");
match self {
Self::Candidate(node) => node.step(msg),
Self::Follower(node) => node.step(msg),
Self::Leader(node) => node.step(msg),
}
}

Outbound messages to other nodes are sent via the RawNode::tx channel:

toydb/src/raft/node.rs

Lines 171 to 172 in 8782c2b

/// Channel for sending outbound messages to other nodes.
tx: Sender<Envelope>,

Nodes are identified by a unique node ID, which is given at node startup:

toydb/src/raft/node.rs

Lines 17 to 18 in 90a6cae

/// A node ID, unique within a cluster. Assigned manually when started.
pub type NodeID = u8;

Messages are wrapped in a raft::Envelope specifying the sender and recipient:

/// A message envelope specifying the sender and receiver.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Envelope {
/// The sender.
pub from: NodeID,
/// The sender's current term.
pub term: Term,
/// The recipient.
pub to: NodeID,
/// The message.
pub message: Message,
}

The envelope contains a raft::Message, an enum which encodes the Raft message protocol. We won't dwell on the specific message types here, but discuss them invididually in the following sections. Raft does not require reliable message delivery, so messages may be dropped or reordered at any time, although toyDB's use of TCP provides stronger delivery guarantees.

toydb/src/raft/message.rs

Lines 25 to 152 in d96c6dd

/// A message sent between Raft nodes. Messages are sent asynchronously (i.e.
/// they are not request/response) and may be dropped or reordered.
///
/// In practice, they are sent across a TCP connection and crossbeam channel
/// which ensures messages are not dropped or reordered as long as the
/// connection remains intact. A message and its response are sent across
/// separate TCP connections (outbound from the respective sender).
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Message {
/// Candidates campaign for leadership by soliciting votes from peers.
/// Votes will only be granted if the candidate's log is at least as
/// up-to-date as the voter.
Campaign {
/// The index of the candidate's last log entry.
last_index: Index,
/// The term of the candidate's last log entry.
last_term: Term,
},
/// Followers may vote for a single candidate per term, but only if the
/// candidate's log is at least as up-to-date as the follower. Candidates
/// implicitly vote for themselves.
CampaignResponse {
/// If true, the follower granted the candidate a vote. A false response
/// isn't necessary, but is emitted for clarity.
vote: bool,
},
/// Leaders send periodic heartbeats. This serves several purposes:
///
/// * Inform nodes about the leader, and prevent elections.
/// * Detect lost appends and reads, as a retry mechanism.
/// * Advance followers' commit indexes, so they can apply entries.
///
/// The Raft paper does not have a distinct heartbeat message, and instead
/// uses an empty AppendEntries RPC, but we choose to add one for better
/// separation of concerns.
Heartbeat {
/// The index of the leader's last log entry. The term is the leader's
/// current term, since it appends a noop entry on election win. The
/// follower compares this to its own log to determine if it's
/// up-to-date.
last_index: Index,
/// The index of the leader's last committed log entry. Followers use
/// this to advance their commit index and apply entries. It's only safe
/// to commit this if the local log matches last_index, such that the
/// follower's log is identical to the leader at the commit index.
commit_index: Index,
/// The leader's latest read sequence number in this term.
read_seq: ReadSequence,
},
/// Followers respond to leader heartbeats if they still consider it leader.
HeartbeatResponse {
/// If non-zero, the heartbeat's last_index which was matched in the
/// follower's log. Otherwise, the follower is either divergent or
/// lagging behind the leader.
match_index: Index,
/// The heartbeat's read sequence number.
read_seq: ReadSequence,
},
/// Leaders replicate log entries to followers by appending to their logs
/// after the given base entry.
///
/// If the base entry matches the follower's log then their logs are
/// identical up to it (see section 5.3 in the Raft paper), and the entries
/// can be appended -- possibly replacing conflicting entries. Otherwise,
/// the append is rejected and the leader must retry an earlier base index
/// until a common base is found.
///
/// Empty appends messages (no entries) are used to probe follower logs for
/// a common match index in the case of divergent logs, restarted nodes, or
/// dropped messages. This is typically done by sending probes with a
/// decrementing base index until a match is found, at which point the
/// subsequent entries can be sent.
Append {
/// The index of the log entry to append after.
base_index: Index,
/// The term of the base entry.
base_term: Term,
/// Log entries to append. Must start at base_index + 1.
entries: Vec<Entry>,
},
/// Followers accept or reject appends from the leader depending on whether
/// the base entry matches their log.
AppendResponse {
/// If non-zero, the follower appended entries up to this index. The
/// entire log up to this index is consistent with the leader. If no
/// entries were sent (a probe), this will be the matching base index.
match_index: Index,
/// If non-zero, the follower rejected an append at this base index
/// because the base index/term did not match its log. If the follower's
/// log is shorter than the base index, the reject index will be lowered
/// to the index after its last local index, to avoid probing each
/// missing index.
reject_index: Index,
},
/// Leaders need to confirm they are still the leader before serving reads,
/// to guarantee linearizability in case a different leader has been
/// estalished elsewhere. Read requests are served once the sequence number
/// has been confirmed by a quorum.
Read { seq: ReadSequence },
/// Followers confirm leadership at the read sequence numbers.
ReadResponse { seq: ReadSequence },
/// A client request. This can be submitted to the leader, or to a follower
/// which will forward it to its leader. If there is no leader, or the
/// leader or term changes, the request is aborted with an Error::Abort
/// ClientResponse and the client must retry.
ClientRequest {
/// The request ID. Must be globally unique for the request duration.
id: RequestID,
/// The request itself.
request: Request,
},
/// A client response.
ClientResponse {
/// The ID of the original ClientRequest.
id: RequestID,
/// The response, or an error.
response: Result<Response>,
},
}

This is an entirely synchronous and deterministic model -- the same sequence of calls on a given node in a given initial state will always produce the same result. This is very convenient for testing and understandability. We will see in the server section how toyDB drives the node on a separate thread, provides a network transport for messages, and ticks it at regular intervals.

Leader Election and Terms

In the steady state, Raft simply has a leader which replicates writes to followers. But to reach this steady state, we must elect a leader, which is where much of the subtle complexity lies. See the Raft paper for comprehensive details and safety arguments, we'll summarize it briefly below.

Raft divides time into terms. The term is a monotonically increasing number starting at 1. There can only be one leader in a term (or none if an election fails), and the term can never regress. Replicated commands belong to the specific term under which they were proposed.

toydb/src/raft/node.rs

Lines 20 to 21 in 8782c2b

/// A leader term number. Increases monotonically on elections.
pub type Term = u64;

Let's walk through an election, where we bootstrap a brand new, empty toyDB cluster with 3 nodes.

Nodes are initialized by calling Node::new(). Since this is a new cluster, they are given an empty raft::Log and raft::State, at term 0. Nodes start with role Follower, but without a leader.

toydb/src/raft/node.rs

Lines 68 to 87 in 8782c2b

impl Node {
/// Creates a new Raft node. It starts as a leaderless follower, waiting to
/// hear from a leader or otherwise transitioning to candidate and
/// campaigning for leadership. In the case of a single-node cluster (no
/// peers), the node immediately transitions to leader when created.
pub fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let node = RawNode::new(id, peers, log, state, tx, opts)?;
// If this is a single-node cluster, become leader immediately.
if node.cluster_size() == 1 {
return Ok(node.into_candidate()?.into_leader()?.into());
}
Ok(node.into())
}

toydb/src/raft/node.rs

Lines 266 to 290 in 8782c2b

impl RawNode<Follower> {
/// Creates a new node as a leaderless follower.
fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
if peers.contains(&id) {
return errinput!("node ID {id} can't be in peers");
}
let role = Follower::new(None, 0);
let mut node = Self { id, peers, log, state, tx, opts, role };
node.role.election_timeout = node.random_election_timeout();
// Apply any pending entries following restart. State machine writes are
// not flushed to durable storage, so a tail of writes may be lost if
// the host crashes or restarts. The Raft log is durable, so we can
// always recover the state from it. We reapply any missing entries here
// if that should happen.
node.maybe_apply()?;
Ok(node)
}

Now, nothing really happens for a while, as the nodes are waiting to maybe hear from an existing leader (there is none). Every 100 ms we call tick(), until we reach election_timeout:

toydb/src/raft/node.rs

Lines 489 to 497 in 8782c2b

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
// Campaign if we haven't heard from the leader in a while.
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
}
Ok(self.into())
}

Notice how new() set election_timeout to a random value (in the range ELECTION_TIMEOUT_RANGE of 10-20 ticks, i.e. 1-2 seconds). If all nodes had the same timeout, they would likely campaign for leadership simultaneously, resulting in an election tie -- Raft uses randomized election timeouts to avoid such ties.

Once a node reaches election_timeout it transitions to role Candidate:

toydb/src/raft/node.rs

Lines 292 to 312 in 8782c2b

/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;
// Apply any pending log entries, so that we're caught up if we win.
self.maybe_apply()?;
// Become candidate and campaign.
let election_timeout = self.random_election_timeout();
let mut node = self.into_role(Candidate::new(election_timeout));
node.campaign()?;
let (term, vote) = node.log.get_term_vote();
assert!(node.role.votes.contains(&node.id), "candidate did not vote for self");
assert_ne!(term, 0, "candidate can't have term 0");
assert_eq!(vote, Some(node.id), "log vote does not match self");
Ok(node)
}

When it becomes a candidate it campaigns for leadership by increasing its term to 1, voting for itself, and sending Message::Campaign to all peers asking for their vote:

toydb/src/raft/node.rs

Lines 647 to 658 in 8782c2b

/// Hold a new election by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
fn campaign(&mut self) -> Result<()> {
let term = self.term() + 1;
info!("Starting new election for term {term}");
self.role = Candidate::new(self.random_election_timeout());
self.role.votes.insert(self.id); // vote for ourself
self.log.set_term_vote(term, Some(self.id))?;
let (last_index, last_term) = self.log.get_last_index();
self.broadcast(Message::Campaign { last_index, last_term })
}

In Raft, the term can't regress, and a node can only cast a single vote in each term (even across restarts), so both of these are persisted to disk via Log::set_term_vote().

When the two other nodes (still in state Follower) receive the Message::Campaign asking for a vote, they will first increase their term to 1 (since this is a newer term than their local term 0):

toydb/src/raft/node.rs

Lines 347 to 351 in 8782c2b

// Future term: newer leader or candidate, become leaderless follower
// and step the message.
if msg.term > self.term() {
return self.into_follower(msg.term, None)?.step(msg);
}

They then grant the vote since they haven't yet voted for anyone else in term 1. They persist the vote to disk via Log::set_term_vote() and return a Message::CampaignResponse { vote: true } to the candidate:

toydb/src/raft/node.rs

Lines 424 to 449 in 8782c2b

// A candidate is requesting our vote. We only grant one per term.
Message::Campaign { last_index, last_term } => {
// Don't vote if we already voted for someone else in this term.
// We can repeat our vote for the same node though.
if let (_, Some(vote)) = self.log.get_term_vote() {
if msg.from != vote {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
}
// Only vote if the candidate's log is at least as long as ours.
// At least one node in any quorum must have all committed
// entries, and this ensures we'll only elect a leader that has
// all committed entries. See section 5.4.1 in the Raft paper.
let (log_index, log_term) = self.log.get_last_index();
if log_term > last_term || log_term == last_term && log_index > last_index {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
}
// Grant the vote.
info!("Voting for {} in term {} election", msg.from, msg.term);
self.log.set_term_vote(msg.term, Some(msg.from))?;
self.send(msg.from, Message::CampaignResponse { vote: true })?;
}

They also check that the candidate's log is at least as long as theirs, which is trivially true in this case since the log is empty. This is necessary to ensure that a leader has all committed entries (see section 5.4.1 in the Raft paper).

When the candidate receives the Message::CampaignResponse it records the vote from each node. Once it has a quorum (in this case 2 out of 3 votes including its own vote) it becomes leader in term 1:

toydb/src/raft/node.rs

Lines 599 to 606 in 8782c2b

// If we received a vote, record it. If the vote gives us quorum,
// assume leadership.
Message::CampaignResponse { vote: true } => {
self.role.votes.insert(msg.from);
if self.role.votes.len() >= self.quorum_size() {
return Ok(self.into_leader()?.into());
}
}

When it becomes leader, it sends a Message::Heartbeat to all peers to tell them it is now the leader in term 1. It also appends an empty entry to its log and replicates it, but we will ignore this for now (see section 5.4.2 in the Raft paper for why).

toydb/src/raft/node.rs

Lines 563 to 583 in 8782c2b

/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
let (term, vote) = self.log.get_term_vote();
assert_ne!(term, 0, "leaders can't have term 0");
assert_eq!(vote, Some(self.id), "leader did not vote for self");
info!("Won election for term {term}, becoming leader");
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
// Propose an empty command when assuming leadership, to disambiguate
// previous entries in the log. See section 5.4.2 in the Raft paper.
// We do this prior to the heartbeat, to avoid a wasted replication
// roundtrip if the heartbeat response indicates the peer is behind.
node.propose(None)?;
node.maybe_commit_and_apply()?;
node.heartbeat()?;
Ok(node)
}

When the other nodes receive the heartbeat, they become followers of the new leader in its term:

toydb/src/raft/node.rs

Lines 359 to 384 in 8782c2b

// The leader sends periodic heartbeats. If we don't have a leader
// yet, follow it. If the commit_index advances, apply commands.
Message::Heartbeat { last_index, commit_index, read_seq } => {
assert!(commit_index <= last_index, "commit_index after last_index");
// Make sure the heartbeat is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// Check if our log matches the leader's log up to last_index,
// and respond to the heartbeat. last_index always has the
// leader's term, since it only appends entries in its term.
let match_index = if self.log.has(last_index, msg.term)? { last_index } else { 0 };
self.send(msg.from, Message::HeartbeatResponse { match_index, read_seq })?;
// Advance the commit index and apply entries. We can only do
// this if we matched the leader's last_index, which implies
// that the logs are identical up to match_index. This also
// implies that the commit_index is present in our log.
if match_index != 0 && commit_index > self.log.get_commit_index().0 {
self.log.commit(commit_index)?;
self.maybe_apply()?;
}
}

From now on, the leader will send periodic Message::Heartbeat every 4 ticks (see HEARTBEAT_INTERVAL) to assert its leadership:

toydb/src/raft/node.rs

Lines 945 to 953 in 8782c2b

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
// Send periodic heartbeats.
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= self.opts.heartbeat_interval {
self.heartbeat()?;
}
Ok(self.into())
}

The followers record when they last received any message from the leader (including heartbeats), and will hold a new election if they haven't heard from the leader in an election timeout (e.g. due to a leader crash or network partition):

toydb/src/raft/node.rs

Lines 353 to 356 in 8782c2b

// Record when we last saw a message from the leader (if any).
if Some(msg.from) == self.role.leader {
self.role.leader_seen = 0
}

toydb/src/raft/node.rs

Lines 489 to 497 in 8782c2b

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
// Campaign if we haven't heard from the leader in a while.
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
}
Ok(self.into())
}

This entire process is illustrated in the test script election, along with several other test scripts that show e.g. election ties, contested elections, and other scenarios:

# A node campaigns and wins leadership once the election timeout passes. Uses
# ticks directly to also test tick handling.
cluster nodes=3 heartbeat_interval=1 election_timeout=2
---
n1@0 follower() last=0@0 commit=0@0 applied=0
n2@0 follower() last=0@0 commit=0@0 applied=0
n3@0 follower() last=0@0 commit=0@0 applied=0
# Tick all nodes. Then tick n1 again to make it campaign.
tick
---
ok
tick 1
---
n1@0 follower() ⇨ n1@1 candidate
n1@1 → n2 Campaign last=0@0
n1@1 → n3 Campaign last=0@0
# n2,n3 grant n1 their votes.
deliver
---
n2@0 follower() ⇨ n2@1 follower()
n2@1 → n1 CampaignResponse vote=true
n3@0 follower() ⇨ n3@1 follower()
n3@1 → n1 CampaignResponse vote=true
# n1 wins the election and becomes leader.
deliver
---
n1@1 candidate ⇨ n1@1 leader
n1@1 append 1@1 None
n1@1 → n2 Append base=0@0 [1@1]
n1@1 → n3 Append base=0@0 [1@1]
n1@1 → n2 Heartbeat last_index=1 commit_index=0 read_seq=0
n1@1 → n3 Heartbeat last_index=1 commit_index=0 read_seq=0
# All nodes become n1 followers.
stabilize
---
n2@1 follower() ⇨ n2@1 follower(n1)
n2@1 append 1@1 None
n2@1 → n1 AppendResponse match_index=1
n2@1 → n1 HeartbeatResponse match_index=1 read_seq=0
n3@1 follower() ⇨ n3@1 follower(n1)
n3@1 append 1@1 None
n3@1 → n1 AppendResponse match_index=1
n3@1 → n1 HeartbeatResponse match_index=1 read_seq=0
n1@1 commit 1@1
n1@1 apply 1@1 None
# n1's heartbeats are accepted by followers, who commit and apply the entry.
tick 1
---
n1@1 → n2 Heartbeat last_index=1 commit_index=1 read_seq=0
n1@1 → n3 Heartbeat last_index=1 commit_index=1 read_seq=0
stabilize
---
n2@1 commit 1@1
n2@1 apply 1@1 None
n2@1 → n1 HeartbeatResponse match_index=1 read_seq=0
n3@1 commit 1@1
n3@1 apply 1@1 None
n3@1 → n1 HeartbeatResponse match_index=1 read_seq=0
status
---
n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1

Client Requests and Forwarding

Once a leader has been elected, we can submit read and write requests to it. This is done by stepping a Message::ClientRequest into the node using the local node ID, with a unique request ID (toyDB uses UUIDv4), and waiting for an outbound response message with the same ID:

toydb/src/raft/message.rs

Lines 134 to 151 in d96c6dd

/// A client request. This can be submitted to the leader, or to a follower
/// which will forward it to its leader. If there is no leader, or the
/// leader or term changes, the request is aborted with an Error::Abort
/// ClientResponse and the client must retry.
ClientRequest {
/// The request ID. Must be globally unique for the request duration.
id: RequestID,
/// The request itself.
request: Request,
},
/// A client response.
ClientResponse {
/// The ID of the original ClientRequest.
id: RequestID,
/// The response, or an error.
response: Result<Response>,
},

toydb/src/raft/message.rs

Lines 164 to 188 in d96c6dd

/// A client request, typically passed through to the state machine.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Request {
/// A state machine read command, executed via `State::read`. This is not
/// replicated, and only evaluated on the leader.
Read(Vec<u8>),
/// A state machine write command, executed via `State::apply`. This is
/// replicated across all nodes, and must produce a deterministic result.
Write(Vec<u8>),
/// Requests Raft cluster status from the leader.
Status,
}
impl encoding::Value for Request {}
/// A client response. This will be wrapped in a Result for error handling.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Response {
/// A state machine read result.
Read(Vec<u8>),
/// A state machine write result.
Write(Vec<u8>),
/// The current Raft leader status.
Status(Status),
}

The requests and responses themselves are arbitrary binary data which is interpreted by the state machine. For our purposes here, let's pretend the requests are:

  • Request::Write("key=value")Response::Write("ok")
  • Request::Read("key")Response::Read("value")

The fundamental difference between read and write requests are that write requests are replicated through Raft and executed on all nodes, while read requests are only executed on the leader without being appended to the log. It would be possible to execute reads on followers too, for load balancing, but these reads would be eventually consistent and thus violate linearizability, so toyDB only executes reads on the leader.

If a request is submitted to a follower, it will be forwarded to the leader and the response forwarded back to the client (distinguished by the sender/recipient node ID -- a local client always uses the local node ID):

toydb/src/raft/node.rs

Lines 451 to 474 in 8782c2b

// Forward client requests to the leader, or abort them if there is
// none. These will not be retried, the client should use timeouts
// instead. Local client requests use our node ID as the sender.
Message::ClientRequest { id, request: _ } => {
assert_eq!(msg.from, self.id, "client request from other node");
if let Some(leader) = self.role.leader {
debug!("Forwarding request to leader {leader}: {msg:?}");
self.role.forwarded.insert(id);
self.send(leader, msg.message)?
} else {
let response = Err(Error::Abort);
self.send(msg.from, Message::ClientResponse { id, response })?
}
}
// Client responses from the leader are passed on to the client.
Message::ClientResponse { id, response } => {
assert_eq!(Some(msg.from), self.role.leader, "client response from non-leader");
if self.role.forwarded.remove(&id) {
self.send(self.id, Message::ClientResponse { id, response })?;
}
}

For simplicity, we cancel the request with Error::Abort if a request is submitted to a candidate, and similarly if a follower changes its role to candidate or discovers a new leader. We could have held on to these and redirected them to a new leader, but we keep it simple and ask the client to retry.

We'll look at the actual read and write request processing next.

Write Replication and Application

When the leader receives a write request, it proposes the command for replication to followers. It keeps track of the in-flight write and its log entry index in writes, such that it can respond to the client with the command result once the entry has been committed and applied.

toydb/src/raft/node.rs

Lines 895 to 904 in 8782c2b

// A client submitted a write request. Propose it, and wait until
// it's replicated and applied to the state machine before returning
// the response to the client.
Message::ClientRequest { id, request: Request::Write(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id });
if self.cluster_size() == 1 {
self.maybe_commit_and_apply()?;
}
}

To propose the command, the leader appends it to its log and sends a Message::Append to each follower to replicate it to their logs:

toydb/src/raft/node.rs

Lines 966 to 980 in 8782c2b

/// Proposes a command for consensus by appending it to our log and
/// replicating it to peers. If successful, it will eventually be committed
/// and applied to the state machine.
fn propose(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(command)?;
for peer in self.peers.iter().copied().sorted() {
// Eagerly send the entry to the peer if it's in steady state and
// we've sent all previous entries. Otherwise, the peer is lagging
// and we're probing past entries for a match.
if index == self.progress(peer).next_index {
self.maybe_send_append(peer, false)?;
}
}
Ok(index)
}

In steady state, Message::Append just contains the single log entry we appended above:

toydb/src/raft/message.rs

Lines 87 to 108 in d96c6dd

/// Leaders replicate log entries to followers by appending to their logs
/// after the given base entry.
///
/// If the base entry matches the follower's log then their logs are
/// identical up to it (see section 5.3 in the Raft paper), and the entries
/// can be appended -- possibly replacing conflicting entries. Otherwise,
/// the append is rejected and the leader must retry an earlier base index
/// until a common base is found.
///
/// Empty appends messages (no entries) are used to probe follower logs for
/// a common match index in the case of divergent logs, restarted nodes, or
/// dropped messages. This is typically done by sending probes with a
/// decrementing base index until a match is found, at which point the
/// subsequent entries can be sent.
Append {
/// The index of the log entry to append after.
base_index: Index,
/// The term of the base entry.
base_term: Term,
/// Log entries to append. Must start at base_index + 1.
entries: Vec<Entry>,
},

However, sometimes followers may be lagging behind the leader (e.g. after a crash), or their log may have diverged from the leader (e.g. unsuccessful proposals from a stale leader after a network partition). To handle these cases, the leader tracks the replication progress of each follower as raft::Progress:

toydb/src/raft/node.rs

Lines 682 to 698 in 8782c2b

/// Per-follower replication progress (in this term).
struct Progress {
/// The highest index where the follower's log is known to match the leader.
/// Initialized to 0, increases monotonically.
match_index: Index,
/// The next index to replicate to the follower. Initialized to
/// last_index+1, decreased when probing log mismatches. Always in
/// the range [match_index+1, last_index+1].
///
/// Entries not yet sent are in the range [next_index, last_index].
/// Entries not acknowledged are in the range [match_index+1, next_index).
next_index: Index,
/// The last read sequence number confirmed by this follower. To avoid stale
/// reads on leader changes, a read is only served once its sequence number
/// is confirmed by a quorum.
read_seq: ReadSequence,
}

We'll gloss over these cases here (see the Raft paper and the code in raft::Progress and maybe_send_append() for details). In the steady state, where each entry is successfully appended and replicated one at a time, maybe_send_append() will fall through to the bottom and send a single entry:

toydb/src/raft/node.rs

Lines 1068 to 1128 in 8782c2b

/// Sends a batch of pending log entries to a follower, in the
/// [next_index,last_index] range. Limited by max_append_entries.
///
/// If probe is true, we're trying to find a log index on the follower where
/// it matches our log. To do this, we send an empty append probe with
/// base_index of next_index-1. If the follower confirms the base_index
/// matches its log, the actual entries are sent next -- otherwise,
/// next_index is decremented and another probe is sent until a match is
/// found. See section 5.3 in the Raft paper.
///
/// The probe is skipped if the follower is up-to-date (according to
/// match_index and last_index). If the probe's base_index has already been
/// confirmed via match_index, an actual append is sent instead.
fn maybe_send_append(&mut self, peer: NodeID, mut probe: bool) -> Result<()> {
let (last_index, _) = self.log.get_last_index();
let progress = self.role.progress.get_mut(&peer).expect("unknown node");
assert_ne!(progress.next_index, 0, "invalid next_index");
assert!(progress.next_index > progress.match_index, "invalid next_index <= match_index");
assert!(progress.match_index <= last_index, "invalid match_index > last_index");
assert!(progress.next_index <= last_index + 1, "invalid next_index > last_index + 1");
// If the peer is caught up, there's no point sending an append.
if progress.match_index == last_index {
return Ok(());
}
// If a probe was requested, but the base_index has already been
// confirmed via match_index, there is no point in probing. Just send
// the entries instead.
probe = probe && progress.next_index > progress.match_index + 1;
// If there are no pending entries, and this is not a probe, there's
// nothing more to send until we get a response from the follower.
if progress.next_index > last_index && !probe {
return Ok(());
}
// Fetch the base and entries.
let (base_index, base_term) = match progress.next_index {
0 => panic!("next_index=0 for node {peer}"),
1 => (0, 0), // first entry, there is no base
next => self.log.get(next - 1)?.map(|e| (e.index, e.term)).expect("missing base entry"),
};
let entries = match probe {
false => self
.log
.scan(progress.next_index..)
.take(self.opts.max_append_entries)
.try_collect()?,
true => Vec::new(),
};
// Optimistically assume the entries will be accepted by the follower,
// and bump next_index to avoid resending them until a response.
if let Some(last) = entries.last() {
progress.next_index = last.index + 1;
}
debug!("Replicating {} entries with base {base_index} to {peer}", entries.len());
self.send(peer, Message::Append { base_index, base_term, entries })
}

The Message::Append contains the index/term of the entry immediately before the new entry as base_index and base_term. If the follower's log also contains an entry with this index and term then its log is guaranteed to match (be equal to) the leader's log up to this entry (see section 5.3 in the Raft paper). The follower can then append the new log entry and return a Message::AppendResponse confirming that the entry was appended and that its log matches the leader's log up to match_index:

toydb/src/raft/node.rs

Lines 386 to 410 in 8782c2b

// Append log entries from the leader to the local log.
Message::Append { base_index, base_term, entries } => {
if let Some(first) = entries.first() {
assert_eq!(base_index, first.index - 1, "base index mismatch");
}
// Make sure the append is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// If the base entry matches our log, append the entries.
if base_index == 0 || self.log.has(base_index, base_term)? {
let match_index = entries.last().map(|e| e.index).unwrap_or(base_index);
self.log.splice(entries)?;
self.send(msg.from, Message::AppendResponse { match_index, reject_index: 0 })?;
} else {
// Otherwise, reject the base index. If the local log is
// shorter than the base index, lower the reject index to
// skip all missing entries.
let reject_index = min(base_index, self.log.get_last_index().0 + 1);
self.send(msg.from, Message::AppendResponse { reject_index, match_index: 0 })?;
}
}

When the leader receives the Message::AppendResponse, it will update its view of the follower's match_index.

toydb/src/raft/node.rs

Lines 844 to 858 in 8782c2b

// A follower appended our log entries (or a probe found a match).
// Record its progress and attempt to commit and apply.
Message::AppendResponse { match_index, reject_index: 0 } if match_index > 0 => {
let (last_index, _) = self.log.get_last_index();
assert!(match_index <= last_index, "future match index");
if self.progress(msg.from).advance(match_index) {
self.maybe_commit_and_apply()?;
}
// Eagerly send any further pending entries. This may be a
// successful probe response, or the peer may be lagging and
// we're catching it up one MAX_APPEND_ENTRIES batch at a time.
self.maybe_send_append(msg.from, false)?;
}

toydb/src/raft/node.rs

Lines 701 to 710 in 8782c2b

/// Attempts to advance a follower's match index, returning true if it did.
/// If next_index is below it, it is advanced to the following index.
fn advance(&mut self, match_index: Index) -> bool {
if match_index <= self.match_index {
return false;
}
self.match_index = match_index;
self.next_index = max(self.next_index, match_index + 1);
true
}

Once a quorum of nodes (in our case 2 out of 3 including the leader) have the entry in their log, the leader can commit the entry and apply it to the state machine. It also looks up the in-flight write request from writes and sends the command result back to the client as Message::ClientResponse:

toydb/src/raft/node.rs

Lines 982 to 1032 in 8782c2b

/// Commits new entries that have been replicated to a quorum and applies
/// them to the state machine, returning results to clients.
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
// Determine the new commit index by quorum.
let (last_index, _) = self.log.get_last_index();
let commit_index = self.quorum_value(
self.role.progress.values().map(|p| p.match_index).chain([last_index]).collect(),
);
// If the commit index doesn't advance, do nothing. We don't assert on
// this, since the quorum value may regress e.g. following a restart or
// leader change where followers are initialized with match index 0.
let (old_index, old_term) = self.log.get_commit_index();
if commit_index <= old_index {
return Ok(old_index);
}
// We can only safely commit an entry from our own term (see section
// 5.4.2 in Raft paper).
match self.log.get(commit_index)? {
Some(entry) if entry.term == self.term() => {}
Some(_) => return Ok(old_index),
None => panic!("commit index {commit_index} missing"),
}
// Commit entries.
self.log.commit(commit_index)?;
// Apply entries and respond to clients.
let term = self.term();
let mut iter = self.log.scan_apply(self.state.get_applied_index());
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
let write = self.role.writes.remove(&entry.index);
let result = self.state.apply(entry);
if let Some(Write { id, from: to }) = write {
let message = Message::ClientResponse { id, response: result.map(Response::Write) };
Self::send_via(&self.tx, Envelope { from: self.id, term, to, message })?;
}
}
drop(iter);
// If the commit term changed, there may be pending reads waiting for us
// to commit and apply an entry from our own term. Execute them.
if old_term != self.term() {
self.maybe_read()?;
}
Ok(commit_index)
}

The leader will also propagate the new commit index to followers via the next heartbeat, so that they can also apply any pending log entries to their state machine. This isn't strictly necessary, since reads are executed on the leader and nodes have to apply pending entries before becoming leaders, but we do it anyway so that they don't fall too far behind on application.

toydb/src/raft/node.rs

Lines 359 to 384 in 8782c2b

// The leader sends periodic heartbeats. If we don't have a leader
// yet, follow it. If the commit_index advances, apply commands.
Message::Heartbeat { last_index, commit_index, read_seq } => {
assert!(commit_index <= last_index, "commit_index after last_index");
// Make sure the heartbeat is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// Check if our log matches the leader's log up to last_index,
// and respond to the heartbeat. last_index always has the
// leader's term, since it only appends entries in its term.
let match_index = if self.log.has(last_index, msg.term)? { last_index } else { 0 };
self.send(msg.from, Message::HeartbeatResponse { match_index, read_seq })?;
// Advance the commit index and apply entries. We can only do
// this if we matched the leader's last_index, which implies
// that the logs are identical up to match_index. This also
// implies that the commit_index is present in our log.
if match_index != 0 && commit_index > self.log.get_commit_index().0 {
self.log.commit(commit_index)?;
self.maybe_apply()?;
}
}

This process is illustrated in the test scripts append and heartbeat_commits_follower (along with many other scenarios):

# Can append single entries in steady state.
cluster nodes=3 leader=1
---
n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1
# Propose a single write.
put 1 foo=bar
---
c1@1 → n1 ClientRequest id=0x01 write 0x0103666f6f03626172
n1@1 append 2@1 put foo=bar
n1@1 → n2 Append base=1@1 [2@1]
n1@1 → n3 Append base=1@1 [2@1]
status
---
n1@1 leader last=2@1 commit=1@1 applied=1 progress={2:1→3 3:1→3}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1
# Append it to both followers.
deliver
---
n2@1 append 2@1 put foo=bar
n2@1 → n1 AppendResponse match_index=2
n3@1 append 2@1 put foo=bar
n3@1 → n1 AppendResponse match_index=2
# The leader commits and applies the write.
stabilize
---
n1@1 commit 2@1
n1@1 apply 2@1 put foo=bar
n1@1 → c1 ClientResponse id=0x01 write 0x0102
c1@1 put foo=bar ⇒ 2
status
---
n1@1 leader last=2@1 commit=2@1 applied=2 progress={2:2→3 3:2→3}
n2@1 follower(n1) last=2@1 commit=1@1 applied=1
n3@1 follower(n1) last=2@1 commit=1@1 applied=1

# A heartbeat will commit and apply an entry on a follower.
cluster nodes=3 leader=1
---
n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1
# Write on the leader, which replicates then commits and applies locally.
put 1 foo=bar
stabilize
---
c1@1 → n1 ClientRequest id=0x01 write 0x0103666f6f03626172
n1@1 append 2@1 put foo=bar
n1@1 → n2 Append base=1@1 [2@1]
n1@1 → n3 Append base=1@1 [2@1]
n2@1 append 2@1 put foo=bar
n2@1 → n1 AppendResponse match_index=2
n3@1 append 2@1 put foo=bar
n3@1 → n1 AppendResponse match_index=2
n1@1 commit 2@1
n1@1 apply 2@1 put foo=bar
n1@1 → c1 ClientResponse id=0x01 write 0x0102
c1@1 put foo=bar ⇒ 2
# The write has been replicated, but not yet committed and applied on followers.
status
---
n1@1 leader last=2@1 commit=2@1 applied=2 progress={2:2→3 3:2→3}
n2@1 follower(n1) last=2@1 commit=1@1 applied=1
n3@1 follower(n1) last=2@1 commit=1@1 applied=1
# A heartbeat commits and applies on followers.
heartbeat 1
stabilize
---
n1@1 → n2 Heartbeat last_index=2 commit_index=2 read_seq=0
n1@1 → n3 Heartbeat last_index=2 commit_index=2 read_seq=0
n2@1 commit 2@1
n2@1 apply 2@1 put foo=bar
n2@1 → n1 HeartbeatResponse match_index=2 read_seq=0
n3@1 commit 2@1
n3@1 apply 2@1 put foo=bar
n3@1 → n1 HeartbeatResponse match_index=2 read_seq=0
status
---
n1@1 leader last=2@1 commit=2@1 applied=2 progress={2:2→3 3:2→3}
n2@1 follower(n1) last=2@1 commit=2@1 applied=2
n3@1 follower(n1) last=2@1 commit=2@1 applied=2

Read Processing

For linearizable (aka strongly consistent) reads, we must execute read requests on the leader, as mentioned above. However, this is not sufficient: under e.g. a network partition, a node may think it's still the leader while in fact a different leader has been elected elsewhere (in a later term) and executed writes there.

To handle this case, the leader must confirm that it is still the leader for each read, by sending a Message::Read to its followers containing a read sequence number. Only if a quorum confirms that it is still the leader can the read be executed. This incurs an additional network roundtrip, which is clearly inefficient, so real-world systems often use leader leases instead (see section 6.4.1 of the Raft thesis, not the paper) -- but it's fine for toyDB.

toydb/src/raft/message.rs

Lines 125 to 132 in d96c6dd

/// Leaders need to confirm they are still the leader before serving reads,
/// to guarantee linearizability in case a different leader has been
/// estalished elsewhere. Read requests are served once the sequence number
/// has been confirmed by a quorum.
Read { seq: ReadSequence },
/// Followers confirm leadership at the read sequence numbers.
ReadResponse { seq: ReadSequence },

When the leader receives the read request, it increments the read sequence number, stores the pending read request in reads, and sends a Message::Read to all followers:

toydb/src/raft/node.rs

Lines 906 to 917 in 8782c2b

// A client submitted a read request. To ensure linearizability, we
// must confirm that we are still the leader by sending the read's
// sequence number and wait for quorum confirmation.
Message::ClientRequest { id, request: Request::Read(command) } => {
self.role.read_seq += 1;
let read = Read { seq: self.role.read_seq, from: msg.from, id, command };
self.role.reads.push_back(read);
self.broadcast(Message::Read { seq: self.role.read_seq })?;
if self.cluster_size() == 1 {
self.maybe_read()?;
}
}

When the followers receive the Message::Read, they simply respond with a Message::ReadResponse if it's from their current leader (messages from stale terms are ignored):

toydb/src/raft/node.rs

Lines 342 to 346 in 8782c2b

// Past term: outdated peer, drop the message.
if msg.term < self.term() {
debug!("Dropping message from past term: {msg:?}");
return Ok(self.into());
}

toydb/src/raft/node.rs

Lines 412 to 422 in 8782c2b

// Confirm the leader's read sequence number.
Message::Read { seq } => {
// Make sure the read is from our leader, or follow it.
match self.role.leader {
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(msg.term, Some(msg.from))?,
}
// Confirm the read.
self.send(msg.from, Message::ReadResponse { seq })?;
}

When the leader receives the Message::ReadResponse it records it in the peer's Progress, and executes the read once a quorum have confirmed the sequence number:

toydb/src/raft/node.rs

Lines 860 to 866 in 8782c2b

// A follower confirmed our read sequence number. If it advances,
// try to execute reads.
Message::ReadResponse { seq } => {
if self.progress(msg.from).advance_read(seq) {
self.maybe_read()?;
}
}

toydb/src/raft/node.rs

Lines 1034 to 1066 in 8782c2b

/// Executes any ready read requests, where a quorum have confirmed that
/// we're still the leader for the read sequences.
fn maybe_read(&mut self) -> Result<()> {
if self.role.reads.is_empty() {
return Ok(());
}
// It's only safe to read if we've committed and applied an entry from
// our own term (the leader appends an entry when elected). Otherwise we
// may be behind on application and serve stale reads.
let (commit_index, commit_term) = self.log.get_commit_index();
let applied_index = self.state.get_applied_index();
if commit_term < self.term() || applied_index < commit_index {
return Ok(());
}
// Determine the maximum read sequence confirmed by quorum.
let quorum_read_seq = self.quorum_value(
self.role.progress.values().map(|p| p.read_seq).chain([self.role.read_seq]).collect(),
);
// Execute ready reads. The VecDeque is ordered by read_seq, so we
// can keep pulling until we hit quorum_read_seq.
while let Some(read) = self.role.reads.front() {
if read.seq > quorum_read_seq {
break;
}
let read = self.role.reads.pop_front().unwrap();
let response = self.state.read(read.command).map(Response::Read);
self.send(read.from, Message::ClientResponse { id: read.id, response })?;
}
Ok(())
}

We now have a Raft-managed state machine with replicated writes and linearizable reads.


MVCC Transactions   |   SQL Engine