Skip to content

Commit 7c66dca

Browse files
authored
feat(storage): restart and finish part 1, 2 (#16)
Signed-off-by: Alex Chi <[email protected]>
1 parent ec4e082 commit 7c66dca

File tree

128 files changed

+6158
-739
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

128 files changed

+6158
-739
lines changed

code/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
target
22
Cargo.lock
3+
risinglight.db/

code/03-00/src/executor/create.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::*;
22
use crate::catalog::TableRefId;
33
use crate::physical_planner::PhysicalCreateTable;
4-
use crate::storage::StorageRef;
54

65
/// The executor of `CREATE TABLE` statement.
76
pub struct CreateTableExecutor {

code/03-00/src/executor/insert.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use itertools::Itertools;
33
use super::*;
44
use crate::array::{ArrayBuilderImpl, DataChunk};
55
use crate::catalog::{ColumnId, TableRefId};
6-
use crate::storage::StorageRef;
76
use crate::types::{DataType, DataValue};
87

98
/// The executor of `INSERT` statement.
@@ -42,7 +41,7 @@ impl InsertExecutor {
4241
for chunk in self.child {
4342
let chunk = transform_chunk(chunk?, &output_columns);
4443
count += chunk.cardinality();
45-
table.append(chunk)?;
44+
table.append(chunk).await?;
4645
}
4746
yield DataChunk::single(count as i32);
4847
}

code/03-00/src/executor/seq_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ impl SeqScanExecutor {
1313
#[try_stream(boxed, ok = DataChunk, error = ExecuteError)]
1414
pub async fn execute(self) {
1515
let table = self.storage.get_table(self.table_ref_id)?;
16-
for chunk in table.all_chunks()? {
16+
for chunk in table.all_chunks().await? {
1717
yield chunk;
1818
}
1919
}

code/03-00/src/storage/mod.rs

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1-
//! In-memory storage.
2-
//!
3-
//! RisingLight's in-memory representation of data is very simple. Currently,
4-
//! it is simple a vector of `DataChunk`. Upon insertion, users' data are
5-
//! simply appended to the end of the vector.
1+
//! On-disk storage
62
7-
use std::collections::HashMap;
8-
use std::sync::{Arc, Mutex, RwLock};
3+
use std::sync::Arc;
94

105
use crate::array::DataChunk;
116
use crate::catalog::TableRefId;
127

138
/// The error type of storage operations.
149
#[derive(thiserror::Error, Debug)]
15-
pub enum StorageError {
16-
#[error("table not found: {0:?}")]
17-
NotFound(TableRefId),
18-
}
10+
#[error("{0:?}")]
11+
pub struct StorageError(#[from] anyhow::Error);
1912

2013
/// A specialized `Result` type for storage operations.
2114
pub type StorageResult<T> = std::result::Result<T, StorageError>;
2215

2316
pub type StorageRef = Arc<DiskStorage>;
24-
pub type DiskTableRef = Arc<DiskTable>;
17+
pub type StorageTableRef = Arc<DiskTable>;
18+
19+
/// On-disk storage.
20+
#[derive(Clone)]
21+
pub struct DiskStorage;
2522

26-
/// In-memory storage.
27-
pub struct DiskStorage {
28-
tables: Mutex<HashMap<TableRefId, DiskTableRef>>,
23+
/// An on-disk table.
24+
pub struct DiskTable {
25+
#[allow(dead_code)]
26+
id: TableRefId,
2927
}
3028

3129
impl Default for DiskStorage {
@@ -37,59 +35,28 @@ impl Default for DiskStorage {
3735
impl DiskStorage {
3836
/// Create a new in-memory storage.
3937
pub fn new() -> Self {
40-
DiskStorage {
41-
tables: Mutex::new(HashMap::new()),
42-
}
38+
DiskStorage
4339
}
4440

4541
/// Add a table.
46-
pub fn add_table(&self, id: TableRefId) -> StorageResult<()> {
47-
let table = Arc::new(DiskTable::new(id));
48-
self.tables.lock().unwrap().insert(id, table);
49-
Ok(())
42+
pub fn add_table(&self, _id: TableRefId) -> StorageResult<()> {
43+
todo!()
5044
}
5145

5246
/// Get a table.
53-
pub fn get_table(&self, id: TableRefId) -> StorageResult<DiskTableRef> {
54-
self.tables
55-
.lock()
56-
.unwrap()
57-
.get(&id)
58-
.cloned()
59-
.ok_or(StorageError::NotFound(id))
47+
pub fn get_table(&self, _id: TableRefId) -> StorageResult<StorageTableRef> {
48+
todo!()
6049
}
6150
}
6251

63-
/// A table in in-memory engine.
64-
pub struct DiskTable {
65-
#[allow(dead_code)]
66-
id: TableRefId,
67-
inner: RwLock<DiskTableInner>,
68-
}
69-
70-
#[derive(Default)]
71-
struct DiskTableInner {
72-
chunks: Vec<DataChunk>,
73-
}
74-
7552
impl DiskTable {
76-
fn new(id: TableRefId) -> Self {
77-
Self {
78-
id,
79-
inner: RwLock::new(DiskTableInner::default()),
80-
}
81-
}
82-
8353
/// Append a chunk to the table.
84-
pub fn append(&self, chunk: DataChunk) -> StorageResult<()> {
85-
let mut inner = self.inner.write().unwrap();
86-
inner.chunks.push(chunk);
87-
Ok(())
54+
pub async fn append(&self, _chunk: DataChunk) -> StorageResult<()> {
55+
todo!()
8856
}
8957

9058
/// Get all chunks of the table.
91-
pub fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
92-
let inner = self.inner.read().unwrap();
93-
Ok(inner.chunks.clone())
59+
pub async fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
60+
todo!()
9461
}
9562
}

code/03-00/src/test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#![allow(unused_imports)]
2+
#![allow(dead_code)]
3+
14
use std::path::Path;
25

36
use test_case::test_case;
@@ -6,16 +9,13 @@ use crate::array::DataChunk;
69
use crate::types::DataValue;
710
use crate::{Database, Error};
811

9-
#[test_case("01-01.slt")]
10-
#[test_case("01-03.slt")]
11-
#[test_case("01-05.slt")]
12-
#[test_case("01-06.slt")]
13-
#[test_case("01-07.slt")]
1412
fn test(name: &str) {
1513
init_logger();
1614
let script = std::fs::read_to_string(Path::new("../sql").join(name)).unwrap();
1715
let mut tester = sqllogictest::Runner::new(Database::new());
18-
tester.run_script(&script).unwrap();
16+
if let Err(err) = tester.run_script(&script) {
17+
panic!("{}", err);
18+
}
1919
}
2020

2121
impl sqllogictest::DB for Database {

code/03-01/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition = "2021"
77
[dependencies]
88
anyhow = "1"
99
bitvec = "1.0"
10+
bytes = "1"
1011
enum_dispatch = "0.3"
1112
env_logger = "0.9"
1213
futures = { version = "0.3", default-features = false, features = ["alloc"] }
@@ -17,7 +18,7 @@ prettytable-rs = { version = "0.8", default-features = false }
1718
rustyline = "9"
1819
sqlparser = "0.13"
1920
thiserror = "1"
20-
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros"] }
21+
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros", "fs"] }
2122
tokio-stream = "0.1"
2223

2324
[dev-dependencies]

code/03-01/src/array

Lines changed: 0 additions & 1 deletion
This file was deleted.

code/03-01/src/array/data_chunk.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use std::fmt;
2+
use std::sync::Arc;
3+
4+
use itertools::Itertools;
5+
6+
use super::*;
7+
8+
/// A collection of arrays.
9+
///
10+
/// A chunk is a horizontal subset of a query result.
11+
#[derive(PartialEq, Clone)]
12+
pub struct DataChunk {
13+
arrays: Arc<[ArrayImpl]>,
14+
}
15+
16+
/// Create [`DataChunk`] from a list of column arrays.
17+
impl FromIterator<ArrayImpl> for DataChunk {
18+
fn from_iter<I: IntoIterator<Item = ArrayImpl>>(iter: I) -> Self {
19+
let arrays = iter.into_iter().collect::<Arc<[ArrayImpl]>>();
20+
assert!(!arrays.is_empty());
21+
let cardinality = arrays[0].len();
22+
assert!(
23+
arrays.iter().map(|a| a.len()).all(|l| l == cardinality),
24+
"all arrays must have the same length"
25+
);
26+
DataChunk { arrays }
27+
}
28+
}
29+
30+
impl DataChunk {
31+
/// Return a [`DataChunk`] with 1 `item` in 1 array.
32+
pub fn single(item: i32) -> Self {
33+
DataChunk {
34+
arrays: [ArrayImpl::Int32([item].into_iter().collect())]
35+
.into_iter()
36+
.collect(),
37+
}
38+
}
39+
40+
/// Return the number of rows in the chunk.
41+
pub fn cardinality(&self) -> usize {
42+
self.arrays[0].len()
43+
}
44+
45+
/// Get all arrays.
46+
pub fn arrays(&self) -> &[ArrayImpl] {
47+
&self.arrays
48+
}
49+
50+
/// Concatenate multiple chunks into one.
51+
pub fn concat(chunks: &[DataChunk]) -> Self {
52+
assert!(!chunks.is_empty(), "must concat at least one chunk");
53+
let mut builders = chunks[0]
54+
.arrays()
55+
.iter()
56+
.map(ArrayBuilderImpl::from_type_of_array)
57+
.collect_vec();
58+
for chunk in chunks {
59+
for (array, builder) in chunk.arrays.iter().zip(builders.iter_mut()) {
60+
builder.append(array);
61+
}
62+
}
63+
builders.into_iter().map(|b| b.finish()).collect()
64+
}
65+
}
66+
67+
/// Print the chunk as a pretty table.
68+
impl fmt::Display for DataChunk {
69+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70+
use prettytable::{format, Table};
71+
let mut table = Table::new();
72+
table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
73+
for i in 0..self.cardinality() {
74+
let row = self.arrays.iter().map(|a| a.get(i).to_string()).collect();
75+
table.add_row(row);
76+
}
77+
write!(f, "{}", table)
78+
}
79+
}
80+
81+
impl fmt::Debug for DataChunk {
82+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83+
write!(f, "{}", self)
84+
}
85+
}

code/03-01/src/array/iter.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::iter::Iterator;
2+
use std::marker::PhantomData;
3+
4+
use super::Array;
5+
6+
/// An iterator over the elements of an [`Array`].
7+
#[derive(Clone)]
8+
pub struct ArrayIter<'a, A: Array> {
9+
array: &'a A,
10+
index: usize,
11+
_phantom: PhantomData<&'a usize>,
12+
}
13+
14+
impl<'a, A: Array> ArrayIter<'a, A> {
15+
/// Create an iterator over array.
16+
pub fn new(array: &'a A) -> Self {
17+
Self {
18+
array,
19+
index: 0,
20+
_phantom: PhantomData,
21+
}
22+
}
23+
}
24+
25+
impl<'a, A: Array> Iterator for ArrayIter<'a, A> {
26+
type Item = Option<&'a A::Item>;
27+
28+
fn next(&mut self) -> Option<Self::Item> {
29+
if self.index >= self.array.len() {
30+
None
31+
} else {
32+
let item = self.array.get(self.index);
33+
self.index += 1;
34+
Some(item)
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)