Now that we've gone over the individual components, we'll tie them all together in the toyDB
server toydb::Server, located in the server module.
The server wraps an inner Raft node raft::Node, which manages the SQL state machine, and is
responsible for routing network traffic between the Raft node, its Raft peers, and SQL clients.
|
/// A toyDB server. Routes messages to/from an inner Raft node. |
|
/// |
|
/// * Listens for inbound SQL connections from clients via TCP and passes |
|
/// requests to the local Raft node. |
|
/// |
|
/// * Listens for inbound Raft connections from other toyDB nodes via TCP and |
|
/// passes messages to the local Raft node. |
|
/// |
|
/// * Connects to other toyDB nodes via TCP and sends outbound Raft messages |
|
/// from the local Raft node. |
|
pub struct Server { |
|
/// The inner Raft node. |
|
node: raft::Node, |
|
/// Outbound messages from the Raft node. |
|
node_rx: Receiver<raft::Envelope>, |
|
/// Raft peer IDs and addresses. |
|
peers: HashMap<raft::NodeID, String>, |
|
} |
For network protocol, the server uses the Bincode encoding that we've discussed in the encoding
section, sent over a TCP connection. There's no need for any further framing, since Bincode knows
how many bytes to expect for each message depending on the type it's decoding into.
The server does not use async Rust and e.g.
Tokio, instead opting for regular OS threads. Async Rust can significantly
complicate the code, which would obscure the main concepts, and any efficiency gains would be
entirely irrelevant for toyDB.
Internally in the server, messages are passed around between threads using
Crossbeam channels.
The main server loop Server::serve() listens for inbound TCP connections on port 9705 for Raft
peers and 9605 for SQL clients, and spawns threads to process them. We'll look at Raft and SQL
services separately.
|
/// Serves Raft and SQL requests indefinitely. Consumes the server. |
|
pub fn serve(self, raft_addr: impl ToSocketAddrs, sql_addr: impl ToSocketAddrs) -> Result<()> { |
|
let raft_listener = TcpListener::bind(raft_addr)?; |
|
let sql_listener = TcpListener::bind(sql_addr)?; |
|
info!( |
|
"Listening on {} (SQL) and {} (Raft)", |
|
sql_listener.local_addr()?, |
|
raft_listener.local_addr()? |
|
); |
|
|
|
std::thread::scope(move |s| { |
|
let id = self.node.id(); |
|
let (raft_request_tx, raft_request_rx) = crossbeam::channel::unbounded(); |
|
let (raft_step_tx, raft_step_rx) = crossbeam::channel::unbounded(); |
|
|
|
// Serve inbound Raft connections. |
|
s.spawn(move || Self::raft_accept(raft_listener, raft_step_tx)); |
|
|
|
// Establish outbound Raft connections to peers. |
|
let mut raft_peers_tx = HashMap::new(); |
|
for (id, addr) in self.peers.into_iter() { |
|
let (raft_peer_tx, raft_peer_rx) = |
|
crossbeam::channel::bounded(RAFT_PEER_CHANNEL_CAPACITY); |
|
raft_peers_tx.insert(id, raft_peer_tx); |
|
s.spawn(move || Self::raft_send_peer(addr, raft_peer_rx)); |
|
} |
|
|
|
// Route Raft messages between the local node, peers, and clients. |
|
s.spawn(move || { |
|
Self::raft_route( |
|
self.node, |
|
self.node_rx, |
|
raft_step_rx, |
|
raft_peers_tx, |
|
raft_request_rx, |
|
) |
|
}); |
|
|
|
// Serve inbound SQL connections. |
|
let sql_engine = sql::engine::Raft::new(raft_request_tx); |
|
s.spawn(move || Self::sql_accept(id, sql_listener, sql_engine)); |
|
}); |
|
|
|
Ok(()) |
|
} |
The heart of the server is the Raft processing thread Server::raft_route(). This is responsible
for periodically ticking the Raft node via raft::Node::tick(), stepping inbound messages from
Raft peers into the node via raft::Node::step(), and sending outbound messages to peers.
It also takes inbound Raft client requests from the sql::engine::Raft SQL engine, steps them
into the Raft node via raft::Node::step(), and passes responses back to the appropriate client
as the node emits them.
|
/// Routes Raft messages: |
|
/// |
|
/// * node_rx: outbound messages from the local Raft node. Routed to peers |
|
/// via TCP, or to local clients via a response channel. |
|
/// |
|
/// * request_rx: inbound requests from local SQL clients. Stepped into |
|
/// the local Raft node as ClientRequest messages. Responses are returned |
|
/// via the provided response channel. |
|
/// |
|
/// * peers_rx: inbound messages from remote Raft peers. Stepped into the |
|
/// local Raft node. |
|
/// |
|
/// * peers_tx: outbound per-peer channels sent via TCP connections. |
|
/// Messages from the local node's node_rx are sent here. |
|
/// |
|
/// Panics on any errors, since the Raft node can't recover from failed |
|
/// state transitions. |
|
fn raft_route( |
|
mut node: raft::Node, |
|
node_rx: Receiver<raft::Envelope>, |
|
peers_rx: Receiver<raft::Envelope>, |
|
mut peers_tx: HashMap<raft::NodeID, Sender<raft::Envelope>>, |
|
request_rx: Receiver<(raft::Request, Sender<Result<raft::Response>>)>, |
|
) { |
|
// Track response channels by request ID. The Raft node will emit |
|
// ClientResponse messages that we forward to the response channel. |
|
let mut response_txs = HashMap::<raft::RequestID, Sender<Result<raft::Response>>>::new(); |
|
|
|
let ticker = crossbeam::channel::tick(raft::TICK_INTERVAL); |
|
loop { |
|
crossbeam::select! { |
|
// Periodically tick the node. |
|
recv(ticker) -> _ => node = node.tick().expect("tick failed"), |
|
|
|
// Step messages from peers into the node. |
|
recv(peers_rx) -> result => { |
|
let msg = result.expect("peers_rx disconnected"); |
|
node = node.step(msg).expect("step failed"); |
|
}, |
|
|
|
// Send outbound messages from the node to the appropriate peer. |
|
// If we receive a client response addressed to the local node, |
|
// forward it to the waiting client via the response channel. |
|
recv(node_rx) -> result => { |
|
let msg = result.expect("node_rx disconnected"); |
|
if msg.to == node.id() { |
|
if let raft::Message::ClientResponse{ id, response } = msg.message { |
|
if let Some(response_tx) = response_txs.remove(&id) { |
|
response_tx.send(response).expect("response_tx disconnected"); |
|
} |
|
continue |
|
} |
|
} |
|
let peer_tx = peers_tx.get_mut(&msg.to).expect("unknown peer"); |
|
match peer_tx.try_send(msg) { |
|
Ok(()) => {}, |
|
Err(crossbeam::channel::TrySendError::Full(_)) => { |
|
error!("Raft peer channel full, dropping message"); |
|
}, |
|
Err(crossbeam::channel::TrySendError::Disconnected(_)) => { |
|
panic!("peer_tx disconnected"); |
|
}, |
|
}; |
|
} |
|
|
|
// Track inbound client requests and step them into the node. |
|
recv(request_rx) -> result => { |
|
let (request, response_tx) = result.expect("request_rx disconnected"); |
|
let id = Uuid::new_v4(); |
|
let msg = raft::Envelope{ |
|
from: node.id(), |
|
to: node.id(), |
|
term: node.term(), |
|
message: raft::Message::ClientRequest{id, request}, |
|
}; |
|
node = node.step(msg).expect("step failed"); |
|
response_txs.insert(id, response_tx); |
|
} |
|
} |
|
} |
|
} |
When the node starts up, it spawns a Server::raft_send_peer() thread for each Raft peer to send
outbound messages to them.
|
// Establish outbound Raft connections to peers. |
|
let mut raft_peers_tx = HashMap::new(); |
|
for (id, addr) in self.peers.into_iter() { |
|
let (raft_peer_tx, raft_peer_rx) = |
|
crossbeam::channel::bounded(RAFT_PEER_CHANNEL_CAPACITY); |
|
raft_peers_tx.insert(id, raft_peer_tx); |
|
s.spawn(move || Self::raft_send_peer(addr, raft_peer_rx)); |
|
} |
These threads continually attempt to connect to the peer via TCP, and then read any outbound
raft::Envelope(raft::Message) messages from Server::raft_route() via a channel and writes the
messages into the TCP connection using Bincode:
|
/// Sends outbound messages to a peer via TCP. Retries indefinitely if the |
|
/// connection fails. |
|
fn raft_send_peer(addr: String, raft_node_rx: Receiver<raft::Envelope>) { |
|
loop { |
|
let mut socket = match TcpStream::connect(&addr) { |
|
Ok(socket) => BufWriter::new(socket), |
|
Err(err) => { |
|
error!("Failed connecting to Raft peer {addr}: {err}"); |
|
std::thread::sleep(RAFT_PEER_RETRY_INTERVAL); |
|
continue; |
|
} |
|
}; |
|
while let Ok(message) = raft_node_rx.recv() { |
|
if let Err(err) = message.encode_into(&mut socket).and_then(|_| Ok(socket.flush()?)) |
|
{ |
|
error!("Failed sending to Raft peer {addr}: {err}"); |
|
break; |
|
} |
|
} |
|
debug!("Disconnected from Raft peer {addr}"); |
|
} |
|
} |
The server also continually listens for inbound Raft TCP connections from peers in
Server::raft_accept():
|
/// Accepts new inbound Raft connections from peers and spawns threads |
|
/// routing inbound messages to the local Raft node. |
|
fn raft_accept(listener: TcpListener, raft_step_tx: Sender<raft::Envelope>) { |
|
std::thread::scope(|s| { |
|
loop { |
|
let (socket, peer) = match listener.accept() { |
|
Ok((socket, peer)) => (socket, peer), |
|
Err(err) => { |
|
error!("Raft peer accept failed: {err}"); |
|
continue; |
|
} |
|
}; |
|
let raft_step_tx = raft_step_tx.clone(); |
|
s.spawn(move || { |
|
debug!("Raft peer {peer} connected"); |
|
match Self::raft_receive_peer(socket, raft_step_tx) { |
|
Ok(()) => debug!("Raft peer {peer} disconnected"), |
|
Err(err) => error!("Raft peer {peer} error: {err}"), |
|
} |
|
}); |
|
} |
|
}); |
|
} |
When an inbound connection is accepted, a Server::raft_receive_peer() thread is spawned that reads
Bincode-encoded raft::Envelope(raft::Message) messages from the TCP connection and sends them to
Server::raft_route() via a channel.
|
/// Receives inbound messages from a peer via TCP, and queues them for |
|
/// stepping into the Raft node. |
|
fn raft_receive_peer(socket: TcpStream, raft_step_tx: Sender<raft::Envelope>) -> Result<()> { |
|
let mut socket = BufReader::new(socket); |
|
while let Some(message) = raft::Envelope::maybe_decode_from(&mut socket)? { |
|
raft_step_tx.send(message)?; |
|
} |
|
Ok(()) |
|
} |
The Raft cluster is now fully connected, and the nodes can all talk to each other.
Next, let's serve some SQL clients. The SQL service uses the enums toydb::Request and
toydb::Response as a client protocol, again Bincode-encoded over TCP.
The primary request type is Request::Execute which executes a SQL statement against a
sql::execution::Session and returns a sql::execution::StatementResult, as we've seen previously.
|
/// A SQL client request. |
|
#[derive(Debug, Serialize, Deserialize)] |
|
pub enum Request { |
|
/// Executes a SQL statement. |
|
Execute(String), |
|
/// Fetches the given table schema. |
|
GetTable(String), |
|
/// Lists all tables. |
|
ListTables, |
|
/// Returns server status. |
|
Status, |
|
} |
|
|
|
impl encoding::Value for Request {} |
|
|
|
/// A SQL server response. |
|
#[derive(Debug, Serialize, Deserialize)] |
|
pub enum Response { |
|
Execute(StatementResult), |
|
Row(Option<Row>), |
|
GetTable(Table), |
|
ListTables(Vec<String>), |
|
Status(Status), |
|
} |
|
|
|
impl encoding::Value for Response {} |
The server sets up a sql::engine::Raft SQL engine, with a Crossbeam channel that's used to send
raft::Request Raft client requests to Server::raft_route() and onwards to the local
raft::Node. It then spawns a Server::sql_accept() thread to listen for inbound SQL client
connections:
|
// Serve inbound SQL connections. |
|
let sql_engine = sql::engine::Raft::new(raft_request_tx); |
|
s.spawn(move || Self::sql_accept(id, sql_listener, sql_engine)); |
When a SQL client connection is accepted, a new client session sql::execution::Session is set up
for the client, and we spawn a Server::sql_session() thread to serve the connection:
|
/// Accepts new SQL client connections and spawns session threads for them. |
|
fn sql_accept(id: raft::NodeID, listener: TcpListener, sql_engine: sql::engine::Raft) { |
|
std::thread::scope(|s| { |
|
loop { |
|
let (socket, peer) = match listener.accept() { |
|
Ok((socket, peer)) => (socket, peer), |
|
Err(err) => { |
|
error!("Client accept failed: {err}"); |
|
continue; |
|
} |
|
}; |
|
let session = sql_engine.session(); |
|
s.spawn(move || { |
|
debug!("Client {peer} connected"); |
|
match Self::sql_session(id, socket, session) { |
|
Ok(()) => debug!("Client {peer} disconnected"), |
|
Err(err) => error!("Client {peer} error: {err}"), |
|
} |
|
}); |
|
} |
|
}) |
|
} |
These session threads continually read Request messages from the client, execute them against the
SQL session (and ultimately the Raft node), before sending a Response back to the client.
|
/// Processes a client SQL session, executing SQL statements against the |
|
/// Raft node. |
|
fn sql_session( |
|
id: raft::NodeID, |
|
socket: TcpStream, |
|
mut session: sql::execution::Session<sql::engine::Raft>, |
|
) -> Result<()> { |
|
let mut reader = BufReader::new(socket.try_clone()?); |
|
let mut writer = BufWriter::new(socket); |
|
|
|
while let Some(request) = Request::maybe_decode_from(&mut reader)? { |
|
// Execute request. |
|
debug!("Received request {request:?}"); |
|
let response = match request { |
|
Request::Execute(query) => session.execute(&query).map(Response::Execute), |
|
Request::GetTable(table) => { |
|
session.with_txn(true, |txn| txn.must_get_table(&table)).map(Response::GetTable) |
|
} |
|
Request::ListTables => session |
|
.with_txn(true, |txn| { |
|
Ok(txn.list_tables()?.into_iter().map(|t| t.name).collect()) |
|
}) |
|
.map(Response::ListTables), |
|
Request::Status => session |
|
.status() |
|
.map(|s| Status { server: id, raft: s.raft, mvcc: s.mvcc }) |
|
.map(Response::Status), |
|
}; |
|
|
|
// Process response. |
|
debug!("Returning response {response:?}"); |
|
response.encode_into(&mut writer)?; |
|
writer.flush()?; |
|
} |
|
Ok(()) |
|
} |
The toydb binary in src/bin/toydb.rs launches the server, and is a thin wrapper around
toydb::Server. It is a tiny clap command:
|
/// The toyDB server command. |
|
#[derive(clap::Parser)] |
|
#[command(about = "Starts a toyDB server.", version, propagate_version = true)] |
|
struct Command { |
|
/// The configuration file path. |
|
#[arg(short = 'c', long, default_value = "config/toydb.yaml")] |
|
config: String, |
|
} |
It first parses a server configuration from the toydb.yaml file:
|
/// The toyDB server configuration. Can be provided via config file (default |
|
/// config/toydb.yaml) or TOYDB_ environment variables. |
|
#[derive(Debug, Deserialize)] |
|
struct Config { |
|
/// The node ID. Must be unique in the cluster. |
|
id: raft::NodeID, |
|
/// The other nodes in the cluster, and their Raft TCP addresses. |
|
peers: HashMap<raft::NodeID, String>, |
|
/// The Raft listen address. |
|
listen_raft: String, |
|
/// The SQL listen address. |
|
listen_sql: String, |
|
/// The log level. |
|
log_level: String, |
|
/// The path to this node's data directory. The Raft log is stored in |
|
/// the file "raft", and the SQL state machine in "sql". |
|
data_dir: String, |
|
/// The Raft storage engine: bitcask or memory. |
|
storage_raft: String, |
|
/// The SQL storage engine: bitcask or memory. |
|
storage_sql: String, |
|
/// If false, don't fsync Raft log writes to disk. Disabling this |
|
/// will yield much better write performance, but may lose data on |
|
/// host crashes which compromises Raft safety guarantees. |
|
fsync: bool, |
|
/// The garbage fraction threshold at which to trigger compaction. |
|
compact_threshold: f64, |
|
/// The minimum bytes of garbage before triggering compaction. |
|
compact_min_bytes: u64, |
|
} |
Then it initializes the Raft log storage and SQL state machine:
|
// Initialize the Raft log storage engine. |
|
let datadir = Path::new(&cfg.data_dir); |
|
let mut raft_log = match cfg.storage_raft.as_str() { |
|
"bitcask" | "" => { |
|
let engine = storage::BitCask::new_maybe_compact( |
|
datadir.join("raft"), |
|
cfg.compact_threshold, |
|
cfg.compact_min_bytes, |
|
)?; |
|
raft::Log::new(Box::new(engine))? |
|
} |
|
"memory" => raft::Log::new(Box::new(storage::Memory::new()))?, |
|
name => return errinput!("invalid Raft storage engine {name}"), |
|
}; |
|
raft_log.enable_fsync(cfg.fsync); |
|
|
|
// Initialize the SQL storage engine. |
|
let raft_state: Box<dyn raft::State> = match cfg.storage_sql.as_str() { |
|
"bitcask" | "" => { |
|
let engine = storage::BitCask::new_maybe_compact( |
|
datadir.join("sql"), |
|
cfg.compact_threshold, |
|
cfg.compact_min_bytes, |
|
)?; |
|
Box::new(sql::engine::Raft::new_state(engine)?) |
|
} |
|
"memory" => Box::new(sql::engine::Raft::new_state(storage::Memory::new())?), |
|
name => return errinput!("invalid SQL storage engine {name}"), |
|
}; |
And finally it launches the toydb::Server:
|
// Start the server. |
|
Server::new(cfg.id, cfg.peers, raft_log, raft_state)? |
|
.serve(&cfg.listen_raft, &cfg.listen_sql) |
toyDB is now up and running!
← SQL Execution | Client →