Skip to content

Latest commit

 

History

History
185 lines (127 loc) · 9.3 KB

File metadata and controls

185 lines (127 loc) · 9.3 KB

SQL Storage

The SQL storage engine, in the sql::engine module, stores tables and rows. toyDB has two SQL storage implementations:

  • sql::engine::Local: local storage using a storage::Engine key/value store.
  • sql::engine::Raft: Raft-replicated storage, using Local on each node below Raft.

These implement the sql::engine::Engine trait, which specifies the SQL storage API. SQL execution can use either simple local storage or Raft-replicated storage -- toyDB itself always uses the Raft-replicated engine, but many tests use a local in-memory engine.

The sql::engine::Engine trait is fully transactional, based on the storage::MVCC transaction engine discussed previously. As such, the trait just has a few methods that begin transactions -- the storage logic itself is implemented in the transaction, which we'll cover in next. The trait also has a session() method to start SQL sessions for query execution, which we'll revisit in the execution section.

/// A SQL engine. This provides low-level CRUD (create, read, update, delete)
/// operations for table rows, a schema catalog for accessing and modifying
/// table schemas, and interactive SQL sessions that execute client SQL
/// statements. All engine access is transactional with snapshot isolation.
pub trait Engine<'a>: Sized {
/// The engine's transaction type. This provides both row-level CRUD operations and
/// transactional access to the schema catalog.
type Transaction: Transaction + 'a;
/// Begins a read-write transaction.
fn begin(&'a self) -> Result<Self::Transaction>;
/// Begins a read-only transaction.
fn begin_read_only(&'a self) -> Result<Self::Transaction>;
/// Begins a read-only transaction as of a historical version.
fn begin_as_of(&'a self, version: mvcc::Version) -> Result<Self::Transaction>;
/// Creates a client session for executing SQL statements.
fn session(&'a self) -> Session<'a, Self> {
Session::new(self)
}
}

Here, we'll only look at the Local engine, and we'll discuss Raft replication afterwards. Local itself is just a thin wrapper around a storage::MVCC<storage::Engine> to create transactions:

/// A SQL engine using local storage. This provides the main SQL storage logic.
/// The Raft SQL engine dispatches to this for node-local SQL storage, executing
/// the same writes across each nodes' instance of `Local`.
pub struct Local<E: storage::Engine + 'static> {
/// The local MVCC storage engine.
pub mvcc: mvcc::MVCC<E>,
}
impl<E: storage::Engine> Local<E> {
/// Creates a new local SQL engine using the given storage engine.
pub fn new(engine: E) -> Self {
Self { mvcc: mvcc::MVCC::new(engine) }
}
/// Resumes a transaction from the given state. This is usually kept within
/// `mvcc::Transaction`, but the Raft-based engine can't retain the MVCC
/// transaction across requests since it may be executed on different leader
/// nodes, so it instead keeps the state client-side in the session.
pub fn resume(&self, state: mvcc::TransactionState) -> Result<Transaction<E>> {
Ok(Transaction::new(self.mvcc.resume(state)?))
}
/// Gets an unversioned key, or None if it doesn't exist.
pub fn get_unversioned(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.mvcc.get_unversioned(key)
}
/// Sets an unversioned key.
pub fn set_unversioned(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
self.mvcc.set_unversioned(key, value)
}
}
impl<E: storage::Engine> super::Engine<'_> for Local<E> {
type Transaction = Transaction<E>;
fn begin(&self) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.mvcc.begin()?))
}
fn begin_read_only(&self) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.mvcc.begin_read_only()?))
}
fn begin_as_of(&self, version: mvcc::Version) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.mvcc.begin_as_of(version)?))
}
}

Key/Value Representation

Local uses a storage::Engine key/value store to store SQL table schemas, table rows, and secondary index entries. But how do we represent these as keys and values?

The keys are represented by the sql::engine::Key enum, and encoded using the Keycode encoding that we've discussed in the encoding section:

/// SQL engine keys, using the Keycode order-preserving encoding. For
/// simplicity, table and column names are used directly as identifiers, instead
/// of e.g. numeric IDs. It is not possible to change table/column names, so
/// this is fine, if somewhat inefficient.
///
/// Uses Cow to allow encoding borrowed values but decoding owned values.
#[derive(Debug, Deserialize, Serialize)]
pub enum Key<'a> {
/// A table schema, keyed by table name. The value is a `sql::types::Table`.
Table(Cow<'a, str>),
/// A column index entry, keyed by table name, column name, and index value.
/// The value is a `BTreeSet` of `sql::types::Value` primary key values.
Index(Cow<'a, str>, Cow<'a, str>, Cow<'a, Value>),
/// A table row, keyed by table name and primary key value. The value is a
/// `sql::types::Row`.
Row(Cow<'a, str>, Cow<'a, Value>),
}

The values are encoded using the Bincode encoding, where the value type is given by the key:

  • Key::Tablesql::types::Table (table schemas)
  • Key::IndexBTreeSet<sql::types::Value> (indexed primary keys)
  • Key::Rowsql::types::Row (table rows)

Recall that the Keycode encoding will store keys in sorted order. This means that all Key::Table entries come first, then all Key::Index, then all Key::Row. These are further grouped and sorted by their fields.

For example, consider these SQL tables containing movies and genres, with a secondary index on movies.genre_id for fast lookups of movies with a given genre:

CREATE TABLE genres (
    id INTEGER PRIMARY KEY,
    name STRING NOT NULL
);

CREATE TABLE movies (
    id INTEGER PRIMARY KEY,
    title STRING NOT NULL,
    released INTEGER NOT NULL,
    genre_id INTEGER NOT NULL INDEX REFERENCES genres
);

INSERT INTO genres VALUES (1, 'Drama'), (2, 'Action');

INSERT INTO movies VALUES
    (1, 'Sicario', 2015, 2),
    (2, '21 Grams', 2003, 1),
    (3, 'Heat', 1995, 2);

This would result in the following illustrated keys and values, in the given order:

/Table/genres → Table { name: "genres", primary_key: 0, columns: ... }
/Table/movies → Table { name: "movies", primary_key: 0, columns: ... }
/Index/movies/genre_id/Integer(1) → BTreeSet { Integer(2) }
/Index/movies/genre_id/Integer(2) → BTreeSet { Integer(1), Integer(3) }
/Row/genres/Integer(1) → Row { Integer(1), String("Action") }
/Row/genres/Integer(2) → Row { Integer(2), String("Drama") }
/Row/movies/Integer(1) → Row { Integer(1), String("Sicario"), Integer(2015), Integer(2) }
/Row/movies/Integer(2) → Row { Integer(2), String("21 Grams"), Integer(2003), Integer(1) }
/Row/movies/Integer(3) → Row { Integer(3), String("Heat"), Integer(1995), Integer(2) }

Thus, if we want to do a full table scan of the movies table, we just do a prefix scan of /Row/movies/. If we want to do a secondary index lookup of all movies with genre_id = 2, we fetch /Index/movies/genre_id/Integer(2) and find that movies with id = {1,3} have this genre.

To help with prefix scans, the valid key prefixes are represented as sql::engine::KeyPrefix:

/// Key prefixes, allowing prefix scans of specific parts of the keyspace. These
/// must match the keys -- in particular, the enum variant indexes must match,
/// since it's part of the encoded key.
#[derive(Deserialize, Serialize)]
enum KeyPrefix<'a> {
/// All table schemas.
Table,
/// All column index entries, keyed by table and column name.
Index(Cow<'a, str>, Cow<'a, str>),
/// All table rows, keyed by table name.
Row(Cow<'a, str>),
}
impl<'a> encoding::Key<'a> for KeyPrefix<'a> {}

For a look at the actual on-disk binary storage format, see the test scripts under src/sql/testscripts/writes, which output the logical and raw binary representation of write operations.

Schema Catalog

The sql::engine::Catalog trait is used to store table schemas, i.e. sql::types::Table. It has a handful of methods for creating, dropping and fetching tables (recall that toyDB does not support schema changes). The Table::name field is used as a unique table identifier throughout.

/// The catalog stores table schema information. It must be implemented for
/// Transaction, and is thus fully transactional. For simplicity, it only
/// supports creating and dropping tables -- there are no ALTER TABLE schema
/// changes, nor CREATE INDEX.
pub trait Catalog {
/// Creates a new table. Errors if it already exists.
fn create_table(&self, table: Table) -> Result<()>;
/// Drops a table. Errors if it does not exist, unless if_exists is true.
/// Returns true if the table existed and was deleted.
fn drop_table(&self, table: &str, if_exists: bool) -> Result<bool>;
/// Fetches a table schema, or None if it doesn't exist.
fn get_table(&self, table: &str) -> Result<Option<Table>>;
/// Returns a list of all table schemas.
fn list_tables(&self) -> Result<Vec<Table>>;
/// Fetches a table schema, or errors if it does not exist.
fn must_get_table(&self, table: &str) -> Result<Table> {
self.get_table(table)?.ok_or_else(|| errinput!("table {table} does not exist"))
}
}

The Catalog trait is also fully transactional, as it must be implemented on a transaction via the type Transaction: Transaction + Catalog trait bound on sql::engine::Engine.

Creating a table is straightforward: insert a key/value pair with a Keycode-encoded Key::Table for the key and a Bincode-encoded sql::types::Table for the value. We first check that the table doesn't already exist, and validate the table schema using Table::validate().

impl<E: storage::Engine> Catalog for Transaction<E> {
fn create_table(&self, table: Table) -> Result<()> {
if self.get_table(&table.name)?.is_some() {
return errinput!("table {} already exists", table.name);
}
table.validate(self)?;
self.txn.set(&Key::Table((&table.name).into()).encode(), table.encode())
}

Similarly, fetching and listing tables is straightforward: just key/value gets or scans using the appropriate keys.

fn get_table(&self, table: &str) -> Result<Option<Table>> {
self.txn.get(&Key::Table(table.into()).encode())?.map(|v| Table::decode(&v)).transpose()
}
fn list_tables(&self) -> Result<Vec<Table>> {
self.txn
.scan_prefix(&KeyPrefix::Table.encode())
.map(|r| r.and_then(|(_, v)| Table::decode(&v)))
.collect()
}

Dropping tables is a bit more involved, since we have to perform some validation and also delete the actual table rows and any secondary index entries, but it's not terribly complicated:

fn drop_table(&self, table: &str, if_exists: bool) -> Result<bool> {
let Some(table) = self.get_table(table)? else {
if if_exists {
return Ok(false);
}
return errinput!("table {table} does not exist");
};
// Check for foreign key references.
if let Some((source, refs)) =
self.table_references(&table.name)?.iter().find(|(t, _)| t.name != table.name)
{
return errinput!(
"table {} is referenced from {}.{}",
table.name,
source.name,
source.columns[refs[0]].name
);
}
// Delete the table schema entry.
self.txn.delete(&Key::Table((&table.name).into()).encode())?;
// Delete the table rows.
let prefix = &KeyPrefix::Row((&table.name).into()).encode();
let mut keys = self.txn.scan_prefix(prefix).map_ok(|(key, _)| key);
while let Some(key) = keys.next().transpose()? {
self.txn.delete(&key)?;
}
// Delete any secondary index entries.
for column in table.columns.iter().filter(|c| c.index) {
let prefix = &KeyPrefix::Index((&table.name).into(), (&column.name).into()).encode();
let mut keys = self.txn.scan_prefix(prefix).map_ok(|(key, _)| key);
while let Some(key) = keys.next().transpose()? {
self.txn.delete(&key)?;
}
}
Ok(true)
}

Row Storage and Transactions

The workhorse of the SQL storage engine is the Transaction trait, which provides CRUD operations (create, read, update, delete) on table rows and secondary index entries. For performance (especially with Raft), it operates on row batches rather than individual rows.

/// A SQL transaction. Executes transactional CRUD operations on table rows.
/// Provides snapshot isolation (see `storage::mvcc` module for details).
///
/// All methods operate on row batches rather than single rows to amortize the
/// cost. With the Raft engine, each call results in a Raft roundtrip, and we'd
/// rather not have to do that for every single row that's modified.
pub trait Transaction: Catalog {
/// The transaction's internal MVCC state.
fn state(&self) -> &mvcc::TransactionState;
/// Commits the transaction.
fn commit(self) -> Result<()>;
/// Rolls back the transaction.
fn rollback(self) -> Result<()>;
/// Deletes table rows by primary key, if they exist.
fn delete(&self, table: &str, ids: &[Value]) -> Result<()>;
/// Fetches table rows by primary key, if they exist.
fn get(&self, table: &str, ids: &[Value]) -> Result<Vec<Row>>;
/// Inserts new table rows.
fn insert(&self, table: &str, rows: Vec<Row>) -> Result<()>;
/// Looks up a set of primary keys by index values. BTreeSet for testing.
fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result<BTreeSet<Value>>;
/// Scans a table's rows, optionally applying the given filter.
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Rows>;
/// Updates table rows by primary key. BTreeMap for testing.
fn update(&self, table: &str, rows: BTreeMap<Value, Row>) -> Result<()>;
}

The Local::Transaction implementation is just a wrapper around an MVCC transaction, and the commit/rollback methods just call straight through to it:

/// A SQL transaction, wrapping an MVCC transaction.
pub struct Transaction<E: storage::Engine + 'static> {
txn: mvcc::Transaction<E>,
}

fn state(&self) -> &mvcc::TransactionState {
self.txn.state()
}
fn commit(self) -> Result<()> {
self.txn.commit()
}
fn rollback(self) -> Result<()> {
self.txn.rollback()
}

To insert new rows into a table, we first have to perform some validation: check that the table exists and validate the rows against the table schema (including checking for e.g. primary key conflicts and foreign key references). We then store the rows as a key/value pairs, using a Key::Row with the table name and primary key value. And finally, we update secondary index entries (if any).

fn insert(&self, table: &str, rows: Vec<Row>) -> Result<()> {
let table = self.must_get_table(table)?;
for row in rows {
// Insert the row.
table.validate_row(&row, false, self)?;
let id = &row[table.primary_key];
self.txn.set(&Key::Row((&table.name).into(), id.into()).encode(), row.encode())?;
// Update any secondary index entries.
for (i, column) in table.columns.iter().enumerate().filter(|(_, c)| c.index) {
let mut ids = self.get_index(&table.name, &column.name, &row[i])?;
ids.insert(id.clone());
self.set_index(&table.name, &column.name, &row[i], ids)?;
}
}
Ok(())
}

Row updates are similar to inserts, but in the case of a primary key change we instead delete the old row and insert a new one, for simplicity. Secondary index updates also have to update both the old and new entries.

fn update(&self, table: &str, rows: BTreeMap<Value, Row>) -> Result<()> {
let table = self.must_get_table(table)?;
for (id, row) in rows {
// If the primary key changes, we simply do a delete and insert.
// This simplifies constraint validation.
if id != row[table.primary_key] {
self.delete(&table.name, &[id])?;
self.insert(&table.name, vec![row])?;
continue;
}
// Validate the row, but don't write it yet since we may need to
// read the existing value to update secondary indexes.
table.validate_row(&row, true, self)?;
// Update indexes, knowing that the primary key has not changed.
let indexes = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect_vec();
if !indexes.is_empty() {
let old = self.get(&table.name, slice::from_ref(&id))?.remove(0);
for (i, column) in indexes {
// If the value didn't change, we don't have to do anything.
if old[i] == row[i] {
continue;
}
// Remove the old value from the index entry.
let mut ids = self.get_index(&table.name, &column.name, &old[i])?;
ids.remove(&id);
self.set_index(&table.name, &column.name, &old[i], ids)?;
// Insert the new value into the index entry.
let mut ids = self.get_index(&table.name, &column.name, &row[i])?;
ids.insert(id.clone());
self.set_index(&table.name, &column.name, &row[i], ids)?;
}
}
// Update the row.
self.txn.set(&Key::Row((&table.name).into(), (&id).into()).encode(), row.encode())?;
}
Ok(())
}

Row deletions are also similar: validate that the deletion is safe (e.g. check that there are no foreign key references to it), then delete the Key::Row keys and any secondary index entries:

fn delete(&self, table: &str, ids: &[Value]) -> Result<()> {
let table = self.must_get_table(table)?;
let indexes = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect_vec();
// Check for foreign key references to the deleted rows.
for (source, refs) in self.table_references(&table.name)? {
let self_reference = source.name == table.name;
for i in refs {
let column = &source.columns[i];
let mut source_ids = if i == source.primary_key {
// If the reference is from a primary key column, do a lookup.
self.get(&source.name, ids)?
.into_iter()
.map(|row| row.into_iter().nth(i).expect("short row"))
.collect()
} else {
// Otherwise (commonly), do a secondary index lookup.
// All foreign keys have a secondary index.
self.lookup_index(&source.name, &column.name, ids)?
};
// We can ignore any references between the deleted rows,
// including a row referencing itself.
if self_reference {
for id in ids {
source_ids.remove(id);
}
}
// Error if the delete would violate referential integrity.
if let Some(source_id) = source_ids.first() {
let table = source.name;
let column = &source.columns[source.primary_key].name;
return errinput!("row referenced by {table}.{column}={source_id}");
}
}
}
for id in ids {
// Update any secondary index entries.
if !indexes.is_empty() {
if let Some(row) = self.get_row(&table.name, id)? {
for (i, column) in indexes.iter().copied() {
let mut ids = self.get_index(&table.name, &column.name, &row[i])?;
ids.remove(id);
self.set_index(&table.name, &column.name, &row[i], ids)?;
}
}
}
// Delete the row.
self.txn.delete(&Key::Row((&table.name).into(), id.into()).encode())?;
}
Ok(())
}

To fetch rows by primary key, we simply call through to key/value gets using the appropriate Key::Row:

fn get(&self, table: &str, ids: &[Value]) -> Result<Vec<Row>> {
ids.iter().filter_map(|id| self.get_row(table, id).transpose()).collect()
}

/// Fetches a single row by primary key, or None if it doesn't exist.
fn get_row(&self, table: &str, id: &Value) -> Result<Option<Row>> {
self.txn
.get(&Key::Row(table.into(), id.into()).encode())?
.map(|v| Row::decode(&v))
.transpose()
}

Similarly, index lookups fetch a Key::Index for the indexed value, returning matching primary keys:

fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result<BTreeSet<Value>> {
debug_assert!(self.has_index(table, column)?, "no index on {table}.{column}");
values.iter().map(|v| self.get_index(table, column, v)).flatten_ok().collect()
}

/// Fetches the matching primary keys for the given secondary index value,
/// or an empty set if there is none.
fn get_index(&self, table: &str, column: &str, value: &Value) -> Result<BTreeSet<Value>> {
debug_assert!(self.has_index(table, column)?, "no index on {table}.{column}");
Ok(self
.txn
.get(&Key::Index(table.into(), column.into(), value.into()).encode())?
.map(|v| BTreeSet::decode(&v))
.transpose()?
.unwrap_or_default())
}

Scanning table rows just performs a prefix scan with the appropriate KeyPrefix::Row, returning a row iterator. This can optionally also do row filtering via filter pushdowns, which we'll revisit when we look at the SQL optimizer.

fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Rows> {
// TODO: this could be simpler if process_results() implemented Clone.
let rows = self
.txn
.scan_prefix(&KeyPrefix::Row(table.into()).encode())
.map(|result| result.and_then(|(_, value)| Row::decode(&value)));
let Some(filter) = filter else {
return Ok(Box::new(rows));
};
let rows = rows.filter_map(move |result| {
result
.and_then(|row| match filter.evaluate(Some(&row))? {
Value::Boolean(true) => Ok(Some(row)),
Value::Boolean(false) | Value::Null => Ok(None),
value => errinput!("filter returned {value}, expected boolean"),
})
.transpose()
});
Ok(Box::new(rows))
}

And with that, we can now store and retrieve SQL tables and rows on disk. Let's see how to replicate it across nodes via Raft.


SQL Data Model   |   SQL Raft Replication