Skip to content

Latest commit

 

History

History
120 lines (77 loc) · 5.9 KB

File metadata and controls

120 lines (77 loc) · 5.9 KB

SQL Execution

Now that the planner and optimizer have done all the hard work of figuring out how to execute a query, it's time to actually execute it.

Plan Executor

Plan execution is done by sql::execution::Executor in the sql::execution module, using a sql::engine::Transaction to access the SQL storage engine.

/// Executes statement plans.
///
/// The plan root specifies the action to take (e.g. SELECT, INSERT, UPDATE,
/// etc). It has a nested tree of child nodes that process rows.
///
/// Nodes are executed recursively, and return row iterators. Parent nodes
/// recursively pull input rows from their child nodes, process them, and pass
/// them on to their parent node.
///
/// Below is an example of an (unoptimized) query plan:
///
/// SELECT title, released, genres.name AS genre
/// FROM movies INNER JOIN genres ON movies.genre_id = genres.id
/// WHERE released >= 2000
/// ORDER BY released
///
/// Select
/// └─ Order: movies.released desc
/// └─ Projection: movies.title, movies.released, genres.name as genre
/// └─ Filter: movies.released >= 2000
/// └─ NestedLoopJoin: inner on movies.genre_id = genres.id
/// ├─ Scan: movies
/// └─ Scan: genres
///
/// Rows flow from the tree leaves to the root:
///
/// 1. Scan nodes read rows from movies and genres.
/// 2. NestedLoopJoin joins the rows from movies and genres.
/// 3. Filter discards rows with release dates older than 2000.
/// 4. Projection picks out the requested column values from the rows.
/// 5. Order sorts the rows by release date.
/// 6. Select returns the final rows to the client.
pub struct Executor<'a, T: Transaction> {
/// The transaction used to execute the plan.
txn: &'a T,
}

The executor takes a sql::planner::Plan as input, and will return an ExecutionResult depending on the statement type.

/// A plan execution result.
pub enum ExecutionResult {
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
Select { columns: Vec<Label>, rows: Rows },
}

When executing the plan, the executor will branch off depending on the statement type:

/// Executes a plan, returning an execution result.
pub fn execute(&mut self, plan: Plan) -> Result<ExecutionResult> {
Ok(match plan {
// CREATE TABLE
Plan::CreateTable { schema } => {
let name = schema.name.clone();
self.txn.create_table(schema)?;
ExecutionResult::CreateTable { name }
}
// DROP TABLE
Plan::DropTable { name, if_exists } => {
let existed = self.txn.drop_table(&name, if_exists)?;
ExecutionResult::DropTable { name, existed }
}
// DELETE
Plan::Delete { table, primary_key, source } => {
let source = self.execute_node(source)?;
let count = self.delete(&table, primary_key, source)?;
ExecutionResult::Delete { count }
}
// INSERT
Plan::Insert { table, column_map, source } => {
let source = self.execute_node(source)?;
let count = self.insert(table, column_map, source)?;
ExecutionResult::Insert { count }
}
// SELECT
Plan::Select(root) => {
let columns = (0..root.columns()).map(|i| root.column_label(i)).collect();
let rows = self.execute_node(root)?;
ExecutionResult::Select { columns, rows }
}
// UPDATE
Plan::Update { table, primary_key, source, expressions } => {
let source = self.execute_node(source)?;
let count = self.update(&table.name, primary_key, source, expressions)?;
ExecutionResult::Update { count }
}
})
}

We'll focus on SELECT queries here, which are the most interesting.

toyDB uses the iterator model (also known as the volcano model) for query execution. In the case of a SELECT query, the result is a row iterator, and pulling from this iterator by calling next() will drive the entire execution pipeline by recursively calling next() on the child nodes' row iterators. This maps very naturally onto Rust's iterators, and we leverage these to construct the execution pipeline as nested iterators.

Execution itself is fairly straightforward, since we're just doing exactly what the planner tells us to do in the plan. We call Executor::execute_node recursively on each sql::planner:Node, starting with the root node. Each node returns a result row iterator that the parent node can pull its input rows from, process them, and output the resulting rows via its own row iterator (with the root node's iterator being returned to the caller):

/// Recursively executes a query plan node, returning a row iterator.
fn execute_node(&mut self, node: Node) -> Result<Rows> {

Executor::execute_node() will simply look at the type of Node, recursively call Executor::execute_node() on any child nodes, and then process the rows accordingly.

/// Recursively executes a query plan node, returning a row iterator.
fn execute_node(&mut self, node: Node) -> Result<Rows> {
Ok(match node {
// GROUP BY and aggregate functions.
Node::Aggregate { source, group_by, aggregates } => {
let source = self.execute_node(*source)?;
let mut aggregator = Aggregator::new(group_by, aggregates);
aggregator.add_rows(source)?;
aggregator.into_rows()
}
// WHERE and similar filtering.
Node::Filter { source, predicate } => {
let source = self.execute_node(*source)?;
Box::new(source.filter_map(move |result| {
result
.and_then(|row| match predicate.evaluate(Some(&row))? {
Value::Boolean(true) => Ok(Some(row)),
Value::Boolean(false) | Value::Null => Ok(None),
value => errinput!("filter returned {value}, expected boolean",),
})
.transpose()
}))
}
// JOIN using a hash join.
Node::HashJoin { left, left_column, right, right_column, outer } => {
let right_columns = right.columns();
let left = self.execute_node(*left)?;
let right = self.execute_node(*right)?;
Box::new(HashJoiner::new(
left,
left_column,
right,
right_column,
right_columns,
outer,
)?)
}
// Looks up primary keys by secondary index values.
Node::IndexLookup { table, column, values, alias: _ } => {
let column = table.columns.into_iter().nth(column).expect("invalid column").name;
let ids =
self.txn.lookup_index(&table.name, &column, &values)?.into_iter().collect_vec();
Box::new(self.txn.get(&table.name, &ids)?.into_iter().map(Ok))
}
// Looks up rows by primary key.
Node::KeyLookup { table, keys, alias: _ } => {
Box::new(self.txn.get(&table.name, &keys)?.into_iter().map(Ok))
}
// LIMIT
Node::Limit { source, limit } => Box::new(self.execute_node(*source)?.take(limit)),
// JOIN using a nested loop join.
Node::NestedLoopJoin { left, right, predicate, outer } => {
let right_columns = right.columns();
let left = self.execute_node(*left)?;
let right = self.execute_node(*right)?;
Box::new(NestedLoopJoiner::new(left, right, right_columns, predicate, outer))
}
// An empty row iterator.
Node::Nothing { .. } => Box::new(std::iter::empty()),
// OFFSET
Node::Offset { source, offset } => Box::new(self.execute_node(*source)?.skip(offset)),
// ORDER BY
Node::Order { source, key } => {
let source = self.execute_node(*source)?;
Box::new(Self::order(source, key)?)
}
// Projects columns from the source, and evaluates expressions.
Node::Projection { source, expressions, aliases: _ } => {
let source = self.execute_node(*source)?;
Box::new(source.map(move |result| {
let row = result?;
expressions.iter().map(|expr| expr.evaluate(Some(&row))).collect()
}))
}
// Remaps source column indexes to new target column indexes.
Node::Remap { source, targets } => {
let source = self.execute_node(*source)?;
let size = targets.iter().copied().flatten().map(|i| i + 1).max().unwrap_or(0);
Box::new(source.map_ok(move |row| {
let mut remapped = vec![Value::Null; size];
for (target, value) in targets.iter().copied().zip_eq(row) {
if let Some(target) = target {
remapped[target] = value;
}
}
remapped
}))
}
// Scans a table, optionally filtering rows.
Node::Scan { table, filter, alias: _ } => Box::new(self.txn.scan(&table.name, filter)?),
// Emits constant values.
Node::Values { rows } => Box::new(
rows.into_iter()
.map(|row| row.into_iter().map(|expr| expr.evaluate(None)).collect()),
),
})
}

We won't discuss every plan node in detail, but let's consider the movie plan we've looked at previously:

Select
└─ Order: movies.released desc
   └─ Projection: movies.title, movies.released, genres.name as genre
      └─ HashJoin: inner on movies.genre_id = genres.id
         ├─ Scan: movies (released >= 2000)
         └─ Scan: genres

We'll recursively call execute_node() until we end up in the two Scan nodes. These simply call through to the SQL engine (either using Raft or local disk) via Transaction::scan(), passing in the scan predicate if any, and return the resulting row iterator:

// Scans a table, optionally filtering rows.
Node::Scan { table, filter, alias: _ } => Box::new(self.txn.scan(&table.name, filter)?),

HashJoin will then join the output rows from the movies and genres iterators by using a hash join. This builds an in-memory table for genres and then iterates over movies, joining the rows:

// JOIN using a hash join.
Node::HashJoin { left, left_column, right, right_column, outer } => {
let right_columns = right.columns();
let left = self.execute_node(*left)?;
let right = self.execute_node(*right)?;
Box::new(HashJoiner::new(
left,
left_column,
right,
right_column,
right_columns,
outer,
)?)
}

/// HashJoiner implements hash joins.
///
/// This builds a hash table of rows from the right source keyed on the join
/// value, then iterates over the left source and looks up matching rows in the
/// hash table.
///
/// If outer is true, and there is no match in the right source for a row in the
/// left source, a row with NULL values for the right source is emitted instead.
#[derive(Clone)]
pub struct HashJoiner {
/// The left source.
left: Rows,
/// The left column to join on.
left_column: usize,
/// The right hash map to join on.
right: HashMap<Value, Vec<Row>>,
/// The number of columns in the right source.
right_columns: usize,
/// If true, emit a row when there is no match in the right source.
outer: bool,
/// Any pending matches to emit.
pending: Rows,
}
impl HashJoiner {
/// Creates a new hash joiner.
pub fn new(
left: Rows,
left_column: usize,
mut right: Rows,
right_column: usize,
right_columns: usize,
outer: bool,
) -> Result<Self> {
// Build a hash map from the right source.
let mut right_map: HashMap<Value, Vec<Row>> = HashMap::new();
while let Some(row) = right.next().transpose()? {
let value = row[right_column].clone();
if value.is_undefined() {
continue; // undefined will never match anything
}
right_map.entry(value).or_default().push(row);
}
let pending = Box::new(std::iter::empty());
Ok(Self { left, left_column, right: right_map, right_columns, outer, pending })
}
// Returns the next joined row, if any.
fn try_next(&mut self) -> Result<Option<Row>> {
// If there's a pending row stashed from a previous call, return it.
if let Some(row) = self.pending.next().transpose()? {
return Ok(Some(row));
}
// Find the next left row to join with.
while let Some(left) = self.left.next().transpose()? {
if let Some(right) = self.right.get(&left[self.left_column]).cloned() {
// Join with all right matches and stash them in pending.
self.pending = Box::new(
right
.into_iter()
.map(move |right| left.iter().cloned().chain(right).collect())
.map(Ok),
);
return self.pending.next().transpose();
} else if self.outer {
// If there is no match for the left row, but it's an outer
// join, emit a row with right NULLs.
return Ok(Some(
left.into_iter()
.chain(std::iter::repeat(Value::Null).take(self.right_columns))
.collect(),
));
}
}
Ok(None)
}
}

The Projection node will simply evaluate the (trivial) column expressions using each joined row as input:

// Projects columns from the source, and evaluates expressions.
Node::Projection { source, expressions, aliases: _ } => {
let source = self.execute_node(*source)?;
Box::new(source.map(move |result| {
let row = result?;
expressions.iter().map(|expr| expr.evaluate(Some(&row))).collect()
}))
}

And finally the Order node will sort the results (which requires buffering them all in memory):

// ORDER BY
Node::Order { source, key } => {
let source = self.execute_node(*source)?;
Box::new(Self::order(source, key)?)
}

/// Sorts the input rows.
fn order(source: Rows, order: Vec<(Expression, Direction)>) -> Result<Rows> {
// We can't use sorted_by_cached_key(), since expression evaluation is
// fallible, and since we may have to vary the sort direction of each
// expression. Collect the rows and pre-computed sort keys into a vec.
let mut rows: Vec<(Row, Vec<Value>)> = source
.map(|result| {
result.and_then(|row| {
let sort_keys =
order.iter().map(|(expr, _)| expr.evaluate(Some(&row))).try_collect()?;
Ok((row, sort_keys))
})
})
.try_collect()?;
rows.sort_by(|(_, a_keys), (_, b_keys)| {
let dirs = order.iter().map(|(_, dir)| dir).copied();
for (a_key, b_key, dir) in izip!(a_keys, b_keys, dirs) {
let mut ordering = a_key.cmp(b_key);
if dir == Direction::Descending {
ordering = ordering.reverse();
}
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
});
Ok(Box::new(rows.into_iter().map(|(row, _)| Ok(row))))
}

The output row iterator of Order is returned via ExecutionResult::Select, and the caller can now go ahead and pull the resulting rows from it.

Session Management

The entry point to the SQL engine is the sql::execution::Session, which represents a single user session. It is obtained via sql::engine::Engine::session().

/// A SQL client session. Parses and executes raw SQL statements and handles
/// transaction control.
pub struct Session<'a, E: Engine<'a>> {
/// The SQL engine.
engine: &'a E,
/// The current transaction, if any.
txn: Option<E::Transaction>,
}

The session takes a series of raw SQL statement strings as input and parses them:

/// Executes a client statement.
pub fn execute(&mut self, statement: &str) -> Result<StatementResult> {
// Parse and execute the statement. Transaction control is handled here,
// other statements are handled by the SQL executor.
Ok(match Parser::parse(statement)? {

For each statement, it returns a result depending on the kind of statement:

/// A session statement result, returned over the network to SQL clients.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum StatementResult {
Begin(mvcc::TransactionState),
Commit { version: mvcc::Version },
Rollback { version: mvcc::Version },
Explain(Plan),
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
// For simplicity, we buffer and send the entire set of rows as a vector
// instead of streaming them to the client. Streaming reads haven't been
// implemented from Raft either, so they're buffered all the way through.
Select { columns: Vec<Label>, rows: Vec<Row> },
}

The session itself performs transaction control. It handles BEGIN, COMMIT, and ROLLBACK statements, and modifies the transaction accordingly.

// BEGIN: starts a new transaction and returns its state.
ast::Statement::Begin { read_only, as_of } => {
if self.txn.is_some() {
return errinput!("already in a transaction");
}
let txn = match (read_only, as_of) {
(false, None) => self.engine.begin()?,
(true, None) => self.engine.begin_read_only()?,
(true, Some(as_of)) => self.engine.begin_as_of(as_of)?,
(false, Some(_)) => {
return errinput!("can't start read-write transaction in a given version");
}
};
let state = txn.state().clone();
self.txn = Some(txn);
StatementResult::Begin(state)
}
// COMMIT: commits the currently open transaction, if any.
ast::Statement::Commit => {
let Some(txn) = self.txn.take() else {
return errinput!("not in a transaction");
};
let version = txn.state().version;
txn.commit()?;
StatementResult::Commit { version }
}
// ROLLBACK: rolls back the currently open transaction, if any.
ast::Statement::Rollback => {
let Some(txn) = self.txn.take() else {
return errinput!("not in a transaction");
};
let version = txn.state().version;
txn.rollback()?;
StatementResult::Rollback { version }
}

Any other statements are processed by the SQL planner, optimizer, and executor as we've seen in previous sections.

// Other statements (SELECT etc.) are handled by the SQL executor.
statement => {
let read_only = matches!(statement, ast::Statement::Select { .. });
self.with_txn(read_only, |txn| {
Plan::build(statement, txn)?.optimize()?.execute(txn)?.try_into()
})?
}

These statements are always executed using the session's current transaction. If there is no active transaction, the session will create a new, implicit transaction for each statement.

/// Runs a closure in the session's transaction, if there is one, otherwise
/// a temporary implicit transaction. If read_only is true, uses a read-only
/// implicit transaction. Does not automatically retry errors.
pub fn with_txn<F, T>(&mut self, read_only: bool, f: F) -> Result<T>
where
F: FnOnce(&mut E::Transaction) -> Result<T>,
{
// Use the current explicit transaction, if there is one.
if let Some(ref mut txn) = self.txn {
return f(txn);
}
// Otherwise, use an implicit transaction. Doing this session-side
// results in additional Raft roundtrips to begin and complete the
// transaction -- we could avoid this if the Raft SQL state machine
// supported implicit transactions, but we keep it simple.
let mut txn = match read_only {
true => self.engine.begin_read_only()?,
false => self.engine.begin()?,
};
let result = f(&mut txn);
match result {
Ok(_) => txn.commit()?,
Err(_) => txn.rollback()?,
}
result
}

And with that, we have a fully functional SQL engine!


SQL Optimization   |   Server