Skip to content

feat(storage): restart and finish part 1, 2 #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions code/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target
Cargo.lock
risinglight.db/
1 change: 0 additions & 1 deletion code/03-00/src/executor/create.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::*;
use crate::catalog::TableRefId;
use crate::physical_planner::PhysicalCreateTable;
use crate::storage::StorageRef;

/// The executor of `CREATE TABLE` statement.
pub struct CreateTableExecutor {
Expand Down
3 changes: 1 addition & 2 deletions code/03-00/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use itertools::Itertools;
use super::*;
use crate::array::{ArrayBuilderImpl, DataChunk};
use crate::catalog::{ColumnId, TableRefId};
use crate::storage::StorageRef;
use crate::types::{DataType, DataValue};

/// The executor of `INSERT` statement.
Expand Down Expand Up @@ -42,7 +41,7 @@ impl InsertExecutor {
for chunk in self.child {
let chunk = transform_chunk(chunk?, &output_columns);
count += chunk.cardinality();
table.append(chunk)?;
table.append(chunk).await?;
}
yield DataChunk::single(count as i32);
}
Expand Down
2 changes: 1 addition & 1 deletion code/03-00/src/executor/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl SeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecuteError)]
pub async fn execute(self) {
let table = self.storage.get_table(self.table_ref_id)?;
for chunk in table.all_chunks()? {
for chunk in table.all_chunks().await? {
yield chunk;
}
}
Expand Down
77 changes: 22 additions & 55 deletions code/03-00/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
//! In-memory storage.
//!
//! RisingLight's in-memory representation of data is very simple. Currently,
//! it is simple a vector of `DataChunk`. Upon insertion, users' data are
//! simply appended to the end of the vector.
//! On-disk storage
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::Arc;

use crate::array::DataChunk;
use crate::catalog::TableRefId;

/// The error type of storage operations.
#[derive(thiserror::Error, Debug)]
pub enum StorageError {
#[error("table not found: {0:?}")]
NotFound(TableRefId),
}
#[error("{0:?}")]
pub struct StorageError(#[from] anyhow::Error);

/// A specialized `Result` type for storage operations.
pub type StorageResult<T> = std::result::Result<T, StorageError>;

pub type StorageRef = Arc<DiskStorage>;
pub type DiskTableRef = Arc<DiskTable>;
pub type StorageTableRef = Arc<DiskTable>;

/// On-disk storage.
#[derive(Clone)]
pub struct DiskStorage;

/// In-memory storage.
pub struct DiskStorage {
tables: Mutex<HashMap<TableRefId, DiskTableRef>>,
/// An on-disk table.
pub struct DiskTable {
#[allow(dead_code)]
id: TableRefId,
}

impl Default for DiskStorage {
Expand All @@ -37,59 +35,28 @@ impl Default for DiskStorage {
impl DiskStorage {
/// Create a new in-memory storage.
pub fn new() -> Self {
DiskStorage {
tables: Mutex::new(HashMap::new()),
}
DiskStorage
}

/// Add a table.
pub fn add_table(&self, id: TableRefId) -> StorageResult<()> {
let table = Arc::new(DiskTable::new(id));
self.tables.lock().unwrap().insert(id, table);
Ok(())
pub fn add_table(&self, _id: TableRefId) -> StorageResult<()> {
todo!()
}

/// Get a table.
pub fn get_table(&self, id: TableRefId) -> StorageResult<DiskTableRef> {
self.tables
.lock()
.unwrap()
.get(&id)
.cloned()
.ok_or(StorageError::NotFound(id))
pub fn get_table(&self, _id: TableRefId) -> StorageResult<StorageTableRef> {
todo!()
}
}

/// A table in in-memory engine.
pub struct DiskTable {
#[allow(dead_code)]
id: TableRefId,
inner: RwLock<DiskTableInner>,
}

#[derive(Default)]
struct DiskTableInner {
chunks: Vec<DataChunk>,
}

impl DiskTable {
fn new(id: TableRefId) -> Self {
Self {
id,
inner: RwLock::new(DiskTableInner::default()),
}
}

/// Append a chunk to the table.
pub fn append(&self, chunk: DataChunk) -> StorageResult<()> {
let mut inner = self.inner.write().unwrap();
inner.chunks.push(chunk);
Ok(())
pub async fn append(&self, _chunk: DataChunk) -> StorageResult<()> {
todo!()
}

/// Get all chunks of the table.
pub fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
let inner = self.inner.read().unwrap();
Ok(inner.chunks.clone())
pub async fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
todo!()
}
}
12 changes: 6 additions & 6 deletions code/03-00/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![allow(unused_imports)]
#![allow(dead_code)]

use std::path::Path;

use test_case::test_case;
Expand All @@ -6,16 +9,13 @@ use crate::array::DataChunk;
use crate::types::DataValue;
use crate::{Database, Error};

#[test_case("01-01.slt")]
#[test_case("01-03.slt")]
#[test_case("01-05.slt")]
#[test_case("01-06.slt")]
#[test_case("01-07.slt")]
fn test(name: &str) {
init_logger();
let script = std::fs::read_to_string(Path::new("../sql").join(name)).unwrap();
let mut tester = sqllogictest::Runner::new(Database::new());
tester.run_script(&script).unwrap();
if let Err(err) = tester.run_script(&script) {
panic!("{}", err);
}
}

impl sqllogictest::DB for Database {
Expand Down
3 changes: 2 additions & 1 deletion code/03-01/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
anyhow = "1"
bitvec = "1.0"
bytes = "1"
enum_dispatch = "0.3"
env_logger = "0.9"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -17,7 +18,7 @@ prettytable-rs = { version = "0.8", default-features = false }
rustyline = "9"
sqlparser = "0.13"
thiserror = "1"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros", "fs"] }
tokio-stream = "0.1"

[dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion code/03-01/src/array

This file was deleted.

85 changes: 85 additions & 0 deletions code/03-01/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::fmt;
use std::sync::Arc;

use itertools::Itertools;

use super::*;

/// A collection of arrays.
///
/// A chunk is a horizontal subset of a query result.
#[derive(PartialEq, Clone)]
pub struct DataChunk {
arrays: Arc<[ArrayImpl]>,
}

/// Create [`DataChunk`] from a list of column arrays.
impl FromIterator<ArrayImpl> for DataChunk {
fn from_iter<I: IntoIterator<Item = ArrayImpl>>(iter: I) -> Self {
let arrays = iter.into_iter().collect::<Arc<[ArrayImpl]>>();
assert!(!arrays.is_empty());
let cardinality = arrays[0].len();
assert!(
arrays.iter().map(|a| a.len()).all(|l| l == cardinality),
"all arrays must have the same length"
);
DataChunk { arrays }
}
}

impl DataChunk {
/// Return a [`DataChunk`] with 1 `item` in 1 array.
pub fn single(item: i32) -> Self {
DataChunk {
arrays: [ArrayImpl::Int32([item].into_iter().collect())]
.into_iter()
.collect(),
}
}

/// Return the number of rows in the chunk.
pub fn cardinality(&self) -> usize {
self.arrays[0].len()
}

/// Get all arrays.
pub fn arrays(&self) -> &[ArrayImpl] {
&self.arrays
}

/// Concatenate multiple chunks into one.
pub fn concat(chunks: &[DataChunk]) -> Self {
assert!(!chunks.is_empty(), "must concat at least one chunk");
let mut builders = chunks[0]
.arrays()
.iter()
.map(ArrayBuilderImpl::from_type_of_array)
.collect_vec();
for chunk in chunks {
for (array, builder) in chunk.arrays.iter().zip(builders.iter_mut()) {
builder.append(array);
}
}
builders.into_iter().map(|b| b.finish()).collect()
}
}

/// Print the chunk as a pretty table.
impl fmt::Display for DataChunk {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use prettytable::{format, Table};
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
for i in 0..self.cardinality() {
let row = self.arrays.iter().map(|a| a.get(i).to_string()).collect();
table.add_row(row);
}
write!(f, "{}", table)
}
}

impl fmt::Debug for DataChunk {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self)
}
}
37 changes: 37 additions & 0 deletions code/03-01/src/array/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::iter::Iterator;
use std::marker::PhantomData;

use super::Array;

/// An iterator over the elements of an [`Array`].
#[derive(Clone)]
pub struct ArrayIter<'a, A: Array> {
array: &'a A,
index: usize,
_phantom: PhantomData<&'a usize>,
}

impl<'a, A: Array> ArrayIter<'a, A> {
/// Create an iterator over array.
pub fn new(array: &'a A) -> Self {
Self {
array,
index: 0,
_phantom: PhantomData,
}
}
}

impl<'a, A: Array> Iterator for ArrayIter<'a, A> {
type Item = Option<&'a A::Item>;

fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.array.len() {
None
} else {
let item = self.array.get(self.index);
self.index += 1;
Some(item)
}
}
}
Loading