Skip to content

Commit d804c71

Browse files
authored
feat(storage): add memtable (#6)
* feat(storage): add memtable Signed-off-by: Alex Chi <[email protected]> * in memory to disk Signed-off-by: Alex Chi <[email protected]>
1 parent 55ecd3b commit d804c71

File tree

6 files changed

+154
-33
lines changed

6 files changed

+154
-33
lines changed

code/03-00/src/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::executor::{ExecuteError, ExecutorBuilder};
1212
use crate::logical_planner::{LogicalPlanError, LogicalPlanner};
1313
use crate::parser::{parse, ParserError};
1414
use crate::physical_planner::{PhysicalPlanError, PhysicalPlanner};
15-
use crate::storage::InMemoryStorage;
15+
use crate::storage::DiskStorage;
1616

1717
/// The database instance.
1818
pub struct Database {
@@ -31,7 +31,7 @@ impl Database {
3131
/// Create a new database instance.
3232
pub fn new() -> Self {
3333
let catalog = Arc::new(DatabaseCatalog::new());
34-
let storage = Arc::new(InMemoryStorage::new());
34+
let storage = Arc::new(DiskStorage::new());
3535
let parallel = matches!(std::env::var("LIGHT_PARALLEL"), Ok(s) if s == "1");
3636
let runtime = if parallel {
3737
tokio::runtime::Builder::new_multi_thread()

code/03-00/src/storage/mod.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,37 @@ pub enum StorageError {
2020
/// A specialized `Result` type for storage operations.
2121
pub type StorageResult<T> = std::result::Result<T, StorageError>;
2222

23-
pub type StorageRef = Arc<InMemoryStorage>;
24-
pub type InMemoryTableRef = Arc<InMemoryTable>;
23+
pub type StorageRef = Arc<DiskStorage>;
24+
pub type DiskTableRef = Arc<DiskTable>;
2525

2626
/// In-memory storage.
27-
pub struct InMemoryStorage {
28-
tables: Mutex<HashMap<TableRefId, InMemoryTableRef>>,
27+
pub struct DiskStorage {
28+
tables: Mutex<HashMap<TableRefId, DiskTableRef>>,
2929
}
3030

31-
impl Default for InMemoryStorage {
31+
impl Default for DiskStorage {
3232
fn default() -> Self {
3333
Self::new()
3434
}
3535
}
3636

37-
impl InMemoryStorage {
37+
impl DiskStorage {
3838
/// Create a new in-memory storage.
3939
pub fn new() -> Self {
40-
InMemoryStorage {
40+
DiskStorage {
4141
tables: Mutex::new(HashMap::new()),
4242
}
4343
}
4444

4545
/// Add a table.
4646
pub fn add_table(&self, id: TableRefId) -> StorageResult<()> {
47-
let table = Arc::new(InMemoryTable::new(id));
47+
let table = Arc::new(DiskTable::new(id));
4848
self.tables.lock().unwrap().insert(id, table);
4949
Ok(())
5050
}
5151

5252
/// Get a table.
53-
pub fn get_table(&self, id: TableRefId) -> StorageResult<InMemoryTableRef> {
53+
pub fn get_table(&self, id: TableRefId) -> StorageResult<DiskTableRef> {
5454
self.tables
5555
.lock()
5656
.unwrap()
@@ -61,22 +61,22 @@ impl InMemoryStorage {
6161
}
6262

6363
/// A table in in-memory engine.
64-
pub struct InMemoryTable {
64+
pub struct DiskTable {
6565
#[allow(dead_code)]
6666
id: TableRefId,
67-
inner: RwLock<InMemoryTableInner>,
67+
inner: RwLock<DiskTableInner>,
6868
}
6969

7070
#[derive(Default)]
71-
struct InMemoryTableInner {
71+
struct DiskTableInner {
7272
chunks: Vec<DataChunk>,
7373
}
7474

75-
impl InMemoryTable {
75+
impl DiskTable {
7676
fn new(id: TableRefId) -> Self {
7777
Self {
7878
id,
79-
inner: RwLock::new(InMemoryTableInner::default()),
79+
inner: RwLock::new(DiskTableInner::default()),
8080
}
8181
}
8282

code/03-01/src/storage/mod.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
//! In-memory storage.
1+
//! Persistent storage on disk.
22
//!
33
//! RisingLight's in-memory representation of data is very simple. Currently,
44
//! it is simple a vector of `DataChunk`. Upon insertion, users' data are
55
//! simply appended to the end of the vector.
66
7+
mod rowset;
8+
mod table_transaction;
9+
710
use std::collections::HashMap;
811
use std::sync::{Arc, Mutex, RwLock};
912

13+
use self::table_transaction::TableTransaction;
1014
use crate::array::DataChunk;
1115
use crate::catalog::TableRefId;
1216

@@ -20,37 +24,37 @@ pub enum StorageError {
2024
/// A specialized `Result` type for storage operations.
2125
pub type StorageResult<T> = std::result::Result<T, StorageError>;
2226

23-
pub type StorageRef = Arc<InMemoryStorage>;
24-
pub type InMemoryTableRef = Arc<InMemoryTable>;
27+
pub type StorageRef = Arc<DiskStorage>;
28+
pub type DiskTableRef = Arc<DiskTable>;
2529

26-
/// In-memory storage.
27-
pub struct InMemoryStorage {
28-
tables: Mutex<HashMap<TableRefId, InMemoryTableRef>>,
30+
/// Persistent storage on disk.
31+
pub struct DiskStorage {
32+
tables: Mutex<HashMap<TableRefId, DiskTableRef>>,
2933
}
3034

31-
impl Default for InMemoryStorage {
35+
impl Default for DiskStorage {
3236
fn default() -> Self {
3337
Self::new()
3438
}
3539
}
3640

37-
impl InMemoryStorage {
38-
/// Create a new in-memory storage.
41+
impl DiskStorage {
42+
/// Create a new persistent storage on disk.
3943
pub fn new() -> Self {
40-
InMemoryStorage {
44+
DiskStorage {
4145
tables: Mutex::new(HashMap::new()),
4246
}
4347
}
4448

4549
/// Add a table.
4650
pub fn add_table(&self, id: TableRefId) -> StorageResult<()> {
47-
let table = Arc::new(InMemoryTable::new(id));
51+
let table = Arc::new(DiskTable::new(id));
4852
self.tables.lock().unwrap().insert(id, table);
4953
Ok(())
5054
}
5155

5256
/// Get a table.
53-
pub fn get_table(&self, id: TableRefId) -> StorageResult<InMemoryTableRef> {
57+
pub fn get_table(&self, id: TableRefId) -> StorageResult<DiskTableRef> {
5458
self.tables
5559
.lock()
5660
.unwrap()
@@ -61,33 +65,52 @@ impl InMemoryStorage {
6165
}
6266

6367
/// A table in in-memory engine.
64-
pub struct InMemoryTable {
68+
pub struct DiskTable {
6569
#[allow(dead_code)]
6670
id: TableRefId,
67-
inner: RwLock<InMemoryTableInner>,
71+
inner: RwLock<DiskTableInner>,
6872
}
6973

7074
#[derive(Default)]
71-
struct InMemoryTableInner {
75+
struct DiskTableInner {
7276
chunks: Vec<DataChunk>,
7377
}
7478

75-
impl InMemoryTable {
79+
impl DiskTable {
7680
fn new(id: TableRefId) -> Self {
7781
Self {
7882
id,
79-
inner: RwLock::new(InMemoryTableInner::default()),
83+
inner: RwLock::new(DiskTableInner::default()),
8084
}
8185
}
8286

87+
#[allow(dead_code)]
88+
async fn write(self: &Arc<Self>) -> StorageResult<TableTransaction> {
89+
Ok(TableTransaction::start(self.clone(), false, false).await?)
90+
}
91+
92+
#[allow(dead_code)]
93+
async fn read(self: &Arc<Self>) -> StorageResult<TableTransaction> {
94+
Ok(TableTransaction::start(self.clone(), true, false).await?)
95+
}
96+
97+
#[allow(dead_code)]
98+
async fn update(self: &Arc<Self>) -> StorageResult<TableTransaction> {
99+
Ok(TableTransaction::start(self.clone(), false, true).await?)
100+
}
101+
83102
/// Append a chunk to the table.
103+
///
104+
/// This interface will be deprecated soon in this tutorial.
84105
pub fn append(&self, chunk: DataChunk) -> StorageResult<()> {
85106
let mut inner = self.inner.write().unwrap();
86107
inner.chunks.push(chunk);
87108
Ok(())
88109
}
89110

90111
/// Get all chunks of the table.
112+
///
113+
/// This interface will be deprecated soon in this tutorial.
91114
pub fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
92115
let inner = self.inner.read().unwrap();
93116
Ok(inner.chunks.clone())
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#![allow(dead_code)]
2+
3+
use std::sync::Arc;
4+
5+
use itertools::Itertools;
6+
7+
use crate::array::{ArrayBuilderImpl, DataChunk};
8+
use crate::catalog::ColumnCatalog;
9+
use crate::storage::StorageResult;
10+
11+
pub struct MemRowset {
12+
builders: Vec<ArrayBuilderImpl>,
13+
}
14+
15+
impl MemRowset {
16+
pub fn new(columns: Arc<[ColumnCatalog]>) -> Self {
17+
Self {
18+
builders: columns
19+
.iter()
20+
.map(|column| ArrayBuilderImpl::with_capacity(0, column.desc().datatype()))
21+
.collect_vec(),
22+
}
23+
}
24+
25+
fn append(&mut self, columns: DataChunk) -> StorageResult<()> {
26+
for (idx, column) in columns.arrays().iter().enumerate() {
27+
self.builders[idx].append(column);
28+
}
29+
Ok(())
30+
}
31+
32+
fn flush(self) -> StorageResult<DataChunk> {
33+
Ok(self
34+
.builders
35+
.into_iter()
36+
.map(|builder| builder.finish())
37+
.collect::<DataChunk>())
38+
}
39+
}

code/03-01/src/storage/rowset/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod mem_rowset;
2+
3+
pub use mem_rowset::*;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#![allow(dead_code)]
2+
3+
use super::rowset::MemRowset;
4+
use super::{DiskTableRef, StorageResult};
5+
use crate::array::DataChunk;
6+
7+
/// [`TableTransaction`] records the state of a single table. All operations (insert, update,
8+
/// delete) should go through [`TableTransaction`].
9+
pub struct TableTransaction {
10+
mem_rowset: Option<MemRowset>,
11+
read_only: bool,
12+
update: bool,
13+
table: DiskTableRef,
14+
}
15+
16+
impl TableTransaction {
17+
/// Start a [`WriteBatch`]
18+
pub async fn start(table: DiskTableRef, read_only: bool, update: bool) -> StorageResult<Self> {
19+
Ok(Self {
20+
mem_rowset: None,
21+
table,
22+
update,
23+
read_only,
24+
})
25+
}
26+
27+
/// Flush [`WriteBatch`] to some on-disk RowSets.
28+
pub async fn flush(self) {
29+
todo!()
30+
}
31+
32+
/// Add a [`DataChunk`] to the mem rowset
33+
pub fn append(&self, _chunk: DataChunk) -> StorageResult<()> {
34+
todo!()
35+
}
36+
37+
/// Delete a row from the table.
38+
async fn delete(&mut self, _row_id: u64) -> StorageResult<()> {
39+
todo!()
40+
}
41+
42+
/// Commit all changes in this transaction.
43+
pub fn commit(self) -> StorageResult<()> {
44+
todo!()
45+
}
46+
47+
/// Abort all changes in this transaction.
48+
pub fn abort(self) -> StorageResult<()> {
49+
todo!()
50+
}
51+
52+
/// Create an iterator on this table.
53+
pub async fn scan(&self) {
54+
todo!()
55+
}
56+
}

0 commit comments

Comments
 (0)