toyDB uses an embedded key/value store for data
storage, located in the storage
module. This stores arbitrary keys and values as binary byte strings. The storage engine doesn't
know or care what the keys and values contain -- we'll see later how the SQL data model, with tables
and rows, is mapped onto this key/value structure.
The storage engine supports simple set/get/delete operations on individual keys. It does not itself
support transactions -- this is built on top, and we'll get back to it shortly.
Keys are stored in sorted order. This allows range scans, where we can iterate over all key/value
pairs between two specific keys, or with a specific key prefix. This will be needed by other
components in the system, e.g. to scan all rows in a specific SQL table, to scan all versions of an
MVCC key, to scan the tail of the Raft log, etc.
The storage engine is pluggable: there are multiple implementations, and the user can choose which
one to use in the config file. These implement the storage::Engine trait:
|
/// A key/value storage engine, which stores arbitrary byte strings. Keys are |
|
/// maintained in lexicographical order, which allows for range scans. This is |
|
/// needed e.g. to scan all rows in a specific SQL table (where all table rows |
|
/// have a common key prefix), or to scan the tail of the Raft log (after a |
|
/// given log entry index). |
|
/// |
|
/// Keys should use the Keycode order-preserving encoding, see |
|
/// [`crate::encoding::keycode`]. |
|
/// |
|
/// Writes are only guaranteed durable after calling [`Engine::flush()`]. |
|
/// |
|
/// For simplicity, this only supports a single user at a time, so all methods |
|
/// (including reads) take a mutable reference. This isn't that big of a deal |
|
/// since Raft execution is serial anyway. |
|
pub trait Engine: Send { |
|
/// The iterator returned by [`Engine::scan`]. |
|
type ScanIterator<'a>: ScanIterator + 'a |
|
where |
|
Self: Sized + 'a; // omit in trait objects, for dyn compatibility |
|
|
|
/// Deletes a key, or does nothing if it does not exist. |
|
fn delete(&mut self, key: &[u8]) -> Result<()>; |
|
|
|
/// Flushes any buffered data to disk. |
|
fn flush(&mut self) -> Result<()>; |
|
|
|
/// Gets a value for a key, if it exists. |
|
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>; |
|
|
|
/// Iterates over an ordered range of key/value pairs. |
|
fn scan(&mut self, range: impl RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> |
|
where |
|
Self: Sized; // omit in trait objects, for dyn compatibility |
|
|
|
/// Like scan, but can be used from trait objects (with dynamic dispatch). |
|
fn scan_dyn(&mut self, range: (Bound<Vec<u8>>, Bound<Vec<u8>>)) -> Box<dyn ScanIterator + '_>; |
|
|
|
/// Iterates over all key/value pairs starting with the given prefix. |
|
fn scan_prefix(&mut self, prefix: &[u8]) -> Self::ScanIterator<'_> |
|
where |
|
Self: Sized, // omit in trait objects, for dyn compatibility |
|
{ |
|
self.scan(keycode::prefix_range(prefix)) |
|
} |
|
|
|
/// Sets a value for a key, replacing the existing value if any. |
|
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()>; |
|
|
|
/// Returns the engine status. |
|
fn status(&mut self) -> Result<Status>; |
|
} |
Let's look at the existing storage engine implementations.
The simplest storage engine is the storage::Memory engine. This is a trivial implementation which
stores data in memory using the Rust standard library's
BTreeMap, without persisting
it to disk. It is primarily used for testing.
Since this is just a wrapper around the BTreeMap we can include it in its entirety here:
|
/// An in-memory key-value storage engine using the Rust standard library's |
|
/// B-tree implementation. Data is not persisted. Primarily for testing. |
|
#[derive(Default)] |
|
pub struct Memory(BTreeMap<Vec<u8>, Vec<u8>>); |
|
|
|
impl Memory { |
|
/// Creates a new Memory key-value storage engine. |
|
pub fn new() -> Self { |
|
Self::default() |
|
} |
|
} |
|
|
|
impl Engine for Memory { |
|
type ScanIterator<'a> = ScanIterator<'a>; |
|
|
|
fn delete(&mut self, key: &[u8]) -> Result<()> { |
|
self.0.remove(key); |
|
Ok(()) |
|
} |
|
|
|
fn flush(&mut self) -> Result<()> { |
|
Ok(()) |
|
} |
|
|
|
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
|
Ok(self.0.get(key).cloned()) |
|
} |
|
|
|
fn scan(&mut self, range: impl RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> { |
|
ScanIterator(self.0.range(range)) |
|
} |
|
|
|
fn scan_dyn( |
|
&mut self, |
|
range: (Bound<Vec<u8>>, Bound<Vec<u8>>), |
|
) -> Box<dyn super::ScanIterator + '_> { |
|
Box::new(self.scan(range)) |
|
} |
|
|
|
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> { |
|
self.0.insert(key.to_vec(), value); |
|
Ok(()) |
|
} |
|
|
|
fn status(&mut self) -> Result<Status> { |
|
Ok(Status { |
|
name: "memory".to_string(), |
|
keys: self.0.len() as u64, |
|
size: self.0.iter().map(|(k, v)| (k.len() + v.len()) as u64).sum(), |
|
disk_size: 0, |
|
live_disk_size: 0, |
|
}) |
|
} |
|
} |
|
|
|
pub struct ScanIterator<'a>(Range<'a, Vec<u8>, Vec<u8>>); |
|
|
|
impl Iterator for ScanIterator<'_> { |
|
type Item = Result<(Vec<u8>, Vec<u8>)>; |
|
|
|
fn next(&mut self) -> Option<Self::Item> { |
|
self.0.next().map(|(k, v)| Ok((k.clone(), v.clone()))) |
|
} |
|
} |
|
|
|
impl DoubleEndedIterator for ScanIterator<'_> { |
|
fn next_back(&mut self) -> Option<Self::Item> { |
|
self.0.next_back().map(|(k, v)| Ok((k.clone(), v.clone()))) |
|
} |
|
} |
The main storage engine is storage::BitCask. This is a very simple variant of
BitCask, used in the Riak
database. It is kind of like the LSM-tree's
baby cousin.
|
/// A very simple variant of BitCask, itself a simple log-structured key-value |
|
/// engine used e.g. by the Riak database. This is not compatible with BitCask |
|
/// databases generated by other implementations. See: |
|
/// <https://riak.com/assets/bitcask-intro.pdf> |
|
/// |
|
/// BitCask writes key-value pairs to an append-only log file, and keeps a |
|
/// mapping of keys to file offsets in memory. All live keys must fit in memory. |
|
/// Deletes write a tombstone value to the log file. To remove old garbage |
|
/// (deleted or replaced keys), logs can be compacted by writing new logs |
|
/// containing only live data, dropping replaced values and tombstones. |
|
/// |
|
/// This implementation is significantly simpler than standard BitCask: |
|
/// |
|
/// * Instead of writing multiple fixed-size log files, it uses a single |
|
/// append-only log file of arbitrary size. This increases the compaction |
|
/// volume, since the entire log file must be rewritten on every compaction. |
|
/// It can also exceed the filesystem's file size limit. However, toyDB |
|
/// databases are expected to be small. |
|
/// |
|
/// * Compactions lock the database for reads and writes. This is ok since toyDB |
|
/// only compacts during node startup and files are expected to be small. |
|
/// |
|
/// * Hint files are not used, the log itself is scanned when opened to |
|
/// build the keydir. Hint files only omit values, and toyDB values are |
|
/// expected to be small, so the hint files would be nearly as large as |
|
/// the compacted log files themselves. |
|
/// |
|
/// * Log entries don't contain timestamps or checksums. |
|
/// |
|
/// The structure of an encoded log entry is: |
|
/// |
|
/// 1. Key length as big-endian u32 [4 bytes]. |
|
/// 2. Value length as big-endian i32, or -1 for tombstones [4 bytes]. |
|
/// 3. Key as raw bytes [<= 2 GB]. |
|
/// 4. Value as raw bytes [<= 2 GB]. |
|
pub struct BitCask { |
|
/// The current append-only log file. |
|
log: Log, |
|
/// Maps keys to a value's offset and length in [`BitCask::log`]. |
|
keydir: KeyDir, |
|
} |
toyDB's BitCask implementation uses a single append-only log file for storage. To write a key/value
pair, we simply append it to the file. To delete a key, we append a special tombstone value. When
reading a key, the last entry for that key in the file is used.
The file format for a key/value pair is simply:
- The key length, as a big-endian
u32 (4 bytes).
- The value length, as a big-endian
i32 (4 bytes). -1 if tombstone.
- The binary key (n bytes).
- The binary value (n bytes).
For example, the key/value pair foo=bar would be written as follows (in hexadecimal):
keylen valuelen key value
00000003 00000003 666f6f 626172
Because the data file is a simple log, we don't need a separate write-ahead log
for crash recovery -- the data file is the write-ahead log.
To quickly look up key/value pairs when reading, we maintain an in-memory KeyDir index which maps
a key to the latest value's position in the file. All keys must therefore fit in memory.
|
/// Maps keys to a value's location in the log file. |
|
type KeyDir = BTreeMap<Vec<u8>, ValueLocation>; |
|
|
|
/// The location of a value in the log file. |
|
#[derive(Clone, Copy)] |
|
struct ValueLocation { |
|
offset: u64, |
|
length: usize, |
|
} |
We initially generate this index by scanning through the entire file when it is opened:
|
/// Builds a keydir by scanning the log file. If an incomplete entry is |
|
/// encountered, it is assumed to be caused by an incomplete write operation |
|
/// and the remainder of the file is truncated. |
|
fn build_keydir(&mut self) -> Result<KeyDir> { |
|
let mut len_buf = [0u8; 4]; |
|
let mut keydir = KeyDir::new(); |
|
let file_len = self.file.metadata()?.len(); |
|
let mut r = BufReader::new(&mut self.file); |
|
let mut offset = r.seek(SeekFrom::Start(0))?; |
|
|
|
while offset < file_len { |
|
// Read the next entry from the file, returning the key and value |
|
// location, or None for tombstones. |
|
let result = || -> StdResult<(Vec<u8>, Option<ValueLocation>), std::io::Error> { |
|
// Read the key length: 4-byte u32. |
|
r.read_exact(&mut len_buf)?; |
|
let key_len = u32::from_be_bytes(len_buf); |
|
|
|
// Read the value length: 4-byte i32, -1 for tombstones. |
|
r.read_exact(&mut len_buf)?; |
|
let value_loc = match i32::from_be_bytes(len_buf) { |
|
..0 => None, // tombstone |
|
len => Some(ValueLocation { |
|
offset: offset + 8 + key_len as u64, |
|
length: len as usize, |
|
}), |
|
}; |
|
|
|
// Read the key. |
|
let mut key = vec![0; key_len as usize]; |
|
r.read_exact(&mut key)?; |
|
|
|
// Skip past the value. |
|
if let Some(value_loc) = value_loc { |
|
if value_loc.end() > file_len { |
|
return Err(std::io::Error::new( |
|
std::io::ErrorKind::UnexpectedEof, |
|
"value extends beyond end of file", |
|
)); |
|
} |
|
r.seek_relative(value_loc.length as i64)?; |
|
} |
|
|
|
// Update the file offset. |
|
offset += 8 + key_len as u64 + value_loc.map_or(0, |v| v.length) as u64; |
|
|
|
Ok((key, value_loc)) |
|
}(); |
|
|
|
// Update the keydir with the entry. |
|
match result { |
|
Ok((key, Some(value_loc))) => keydir.insert(key, value_loc), |
|
Ok((key, None)) => keydir.remove(&key), |
|
// If an incomplete entry was found at the end of the file, assume an |
|
// incomplete write and truncate the file. |
|
Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { |
|
error!("Found incomplete entry at offset {offset}, truncating file"); |
|
self.file.set_len(offset)?; |
|
break; |
|
} |
|
Err(err) => return Err(err.into()), |
|
}; |
|
} |
|
|
|
Ok(keydir) |
|
} |
To write a key, we append it to the file and update the KeyDir:
|
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> { |
|
let value_location = self.log.write_entry(key, Some(&*value))?; |
|
self.keydir.insert(key.to_vec(), value_location); |
|
Ok(()) |
|
} |
|
/// Appends a key/value entry to the log file, using a None value for |
|
/// tombstones. It returns the location of the entry's value in the log, for |
|
/// use with the [`KeyDir`]. |
|
fn write_entry(&mut self, key: &[u8], value: Option<&[u8]>) -> Result<ValueLocation> { |
|
let length = 8 + key.len() + value.map_or(0, |v| v.len()); |
|
let offset = self.file.seek(SeekFrom::End(0))?; |
|
let mut w = BufWriter::with_capacity(length, &mut self.file); |
|
|
|
// Key length: 4-byte u32. |
|
w.write_all(&(key.len() as u32).to_be_bytes())?; |
|
|
|
// Value length: 4-byte i32, -1 for tombstones. |
|
w.write_all(&value.map_or(-1, |v| v.len() as i32).to_be_bytes())?; |
|
|
|
// The actual key and value. |
|
w.write_all(key)?; |
|
w.write_all(value.unwrap_or_default())?; |
|
w.flush()?; |
|
|
|
// Translate the entry location into a value location. |
|
Ok(ValueLocation { |
|
offset: offset + 8 + key.len() as u64, |
|
length: value.map_or(0, |v| v.len()), |
|
}) |
|
} |
To delete a key, we append a tombstone value instead:
|
fn delete(&mut self, key: &[u8]) -> Result<()> { |
|
self.log.write_entry(key, None)?; |
|
self.keydir.remove(key); |
|
Ok(()) |
|
} |
To read a value for a key, we look up the key's file location in the KeyDir index (if the key
exists), and then read it from the file:
|
/// Reads a value from the log file at the given location. |
|
fn read_value(&mut self, location: ValueLocation) -> Result<Vec<u8>> { |
|
let mut value = vec![0; location.length]; |
|
self.file.seek(SeekFrom::Start(location.offset))?; |
|
self.file.read_exact(&mut value)?; |
|
Ok(value) |
|
} |
The KeyDir uses an inner stdlib BTreeMap to keep track of keys. This allows range scans, where
we iterate over a sorted set of keys between the range bounds, loading each key from the file:
|
fn scan(&mut self, range: impl RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> { |
|
ScanIterator { inner: self.keydir.range(range), log: &mut self.log } |
|
} |
|
pub struct ScanIterator<'a> { |
|
inner: Range<'a, Vec<u8>, ValueLocation>, |
|
log: &'a mut Log, |
|
} |
|
|
|
impl ScanIterator<'_> { |
|
fn map(&mut self, item: (&Vec<u8>, &ValueLocation)) -> <Self as Iterator>::Item { |
|
let (key, value_loc) = item; |
|
Ok((key.clone(), self.log.read_value(*value_loc)?)) |
|
} |
|
} |
|
|
|
impl Iterator for ScanIterator<'_> { |
|
type Item = Result<(Vec<u8>, Vec<u8>)>; |
|
|
|
fn next(&mut self) -> Option<Self::Item> { |
|
self.inner.next().map(|item| self.map(item)) |
|
} |
|
} |
As keys are updated and deleted, we'll keep accumulating old versions in the log file. To remove
these, the log file is compacted on startup. This writes out the latest value of every live
key/value pair to a new file, and replaces the old file. The keys are written in sorted order, to
make later scans faster.
|
/// Compacts the current log file by writing out a new log file containing |
|
/// only live keys and replacing the current file with it. |
|
pub fn compact(&mut self) -> Result<()> { |
|
// Create a new temporary log file, or truncate it if it already exists. |
|
let new_path = self.log.path.with_extension("new"); |
|
let mut new_log = Log::new(new_path)?; |
|
new_log.file.set_len(0)?; |
|
|
|
// Write all live entries into the new log, and generate a new KeyDir. |
|
let mut new_keydir = KeyDir::new(); |
|
for (key, value_loc) in &self.keydir { |
|
let value = self.log.read_value(*value_loc)?; |
|
let value_loc = new_log.write_entry(key, Some(&value))?; |
|
new_keydir.insert(key.clone(), value_loc); |
|
} |
|
|
|
// Replace the current log with the new one. |
|
std::fs::rename(&new_log.path, &self.log.path)?; |
|
new_log.path = self.log.path.clone(); |
|
|
|
self.log = new_log; |
|
self.keydir = new_keydir; |
|
Ok(()) |
|
} |
← Overview | Key/Value Encoding →