1
1
//! On-disk storage
2
2
3
- use std:: sync:: Arc ;
3
+ use std:: collections:: HashMap ;
4
+ use std:: path:: PathBuf ;
5
+ use std:: sync:: { Arc , RwLock } ;
4
6
5
- use crate :: array:: DataChunk ;
6
- use crate :: catalog:: TableRefId ;
7
+ use anyhow:: anyhow;
8
+ use bytes:: { Buf , BufMut } ;
9
+
10
+ use crate :: array:: { Array , ArrayBuilder , ArrayImpl , DataChunk , I32Array , I32ArrayBuilder } ;
11
+ use crate :: catalog:: { ColumnDesc , TableRefId } ;
7
12
8
13
/// The error type of storage operations.
9
14
#[ derive( thiserror:: Error , Debug ) ]
@@ -17,13 +22,33 @@ pub type StorageRef = Arc<DiskStorage>;
17
22
pub type StorageTableRef = Arc < DiskTable > ;
18
23
19
24
/// On-disk storage.
20
- #[ derive( Clone ) ]
21
- pub struct DiskStorage ;
25
+ pub struct DiskStorage {
26
+ /// All tables in the current storage engine.
27
+ tables : RwLock < HashMap < TableRefId , StorageTableRef > > ,
28
+
29
+ /// The storage options.
30
+ options : Arc < StorageOptions > ,
31
+ }
32
+
33
+ pub struct StorageOptions {
34
+ /// The directory of the storage
35
+ base_path : PathBuf ,
36
+ }
37
+
38
+ pub fn err ( error : impl Into < anyhow:: Error > ) -> StorageError {
39
+ StorageError ( error. into ( ) )
40
+ }
22
41
23
42
/// An on-disk table.
24
43
pub struct DiskTable {
25
- # [ allow ( dead_code ) ]
44
+ /// Id of the table.
26
45
id : TableRefId ,
46
+
47
+ /// Columns of the current table.
48
+ column_descs : Arc < [ ColumnDesc ] > ,
49
+
50
+ /// The storage options.
51
+ options : Arc < StorageOptions > ,
27
52
}
28
53
29
54
impl Default for DiskStorage {
@@ -35,28 +60,94 @@ impl Default for DiskStorage {
35
60
impl DiskStorage {
36
61
/// Create a new in-memory storage.
37
62
pub fn new ( ) -> Self {
38
- DiskStorage
63
+ DiskStorage {
64
+ tables : RwLock :: new ( HashMap :: new ( ) ) ,
65
+ options : Arc :: new ( StorageOptions {
66
+ base_path : "risinglight.db" . into ( ) ,
67
+ } ) ,
68
+ }
39
69
}
40
70
41
71
/// Add a table.
42
- pub fn add_table ( & self , _id : TableRefId ) -> StorageResult < ( ) > {
43
- todo ! ( )
72
+ pub fn add_table ( & self , id : TableRefId , column_descs : & [ ColumnDesc ] ) -> StorageResult < ( ) > {
73
+ let mut tables = self . tables . write ( ) . unwrap ( ) ;
74
+ let table = DiskTable {
75
+ id,
76
+ options : self . options . clone ( ) ,
77
+ column_descs : column_descs. into ( ) ,
78
+ } ;
79
+ let res = tables. insert ( id, table. into ( ) ) ;
80
+ if res. is_some ( ) {
81
+ return Err ( anyhow ! ( "table already exists: {:?}" , id) . into ( ) ) ;
82
+ }
83
+ Ok ( ( ) )
44
84
}
45
85
46
86
/// Get a table.
47
- pub fn get_table ( & self , _id : TableRefId ) -> StorageResult < StorageTableRef > {
48
- todo ! ( )
87
+ pub fn get_table ( & self , id : TableRefId ) -> StorageResult < StorageTableRef > {
88
+ let tables = self . tables . read ( ) . unwrap ( ) ;
89
+ tables
90
+ . get ( & id)
91
+ . ok_or_else ( || anyhow ! ( "table not found: {:?}" , id) . into ( ) )
92
+ . cloned ( )
49
93
}
50
94
}
51
95
96
+ /// Encode an `I32Array` into a `Vec<u8>`.
97
+ fn encode_int32_column ( a : & I32Array ) -> StorageResult < Vec < u8 > > {
98
+ let mut buffer = Vec :: with_capacity ( a. len ( ) * 4 ) ;
99
+ for item in a. iter ( ) {
100
+ if let Some ( item) = item {
101
+ buffer. put_i32_le ( * item) ;
102
+ } else {
103
+ return Err ( anyhow ! ( "nullable encoding not supported!" ) . into ( ) ) ;
104
+ }
105
+ }
106
+ Ok ( buffer)
107
+ }
108
+
109
+ fn decode_int32_column ( mut data : & [ u8 ] ) -> StorageResult < I32Array > {
110
+ let mut builder = I32ArrayBuilder :: with_capacity ( data. len ( ) / 4 ) ;
111
+ while data. has_remaining ( ) {
112
+ builder. push ( Some ( & data. get_i32_le ( ) ) ) ;
113
+ }
114
+ Ok ( builder. finish ( ) )
115
+ }
116
+
52
117
impl DiskTable {
118
+ fn table_path ( & self ) -> PathBuf {
119
+ self . options . base_path . join ( self . id . table_id . to_string ( ) )
120
+ }
121
+
122
+ fn column_path ( & self , column_id : usize ) -> PathBuf {
123
+ self . table_path ( ) . join ( format ! ( "{}.col" , column_id) )
124
+ }
125
+
53
126
/// Append a chunk to the table.
54
- pub async fn append ( & self , _chunk : DataChunk ) -> StorageResult < ( ) > {
55
- todo ! ( )
127
+ pub async fn append ( & self , chunk : DataChunk ) -> StorageResult < ( ) > {
128
+ for ( idx, column) in chunk. arrays ( ) . iter ( ) . enumerate ( ) {
129
+ if let ArrayImpl :: Int32 ( column) = column {
130
+ let column_path = self . column_path ( idx) ;
131
+ let data = encode_int32_column ( column) ?;
132
+ tokio:: fs:: create_dir_all ( column_path. parent ( ) . unwrap ( ) )
133
+ . await
134
+ . map_err ( err) ?;
135
+ tokio:: fs:: write ( column_path, data) . await . map_err ( err) ?;
136
+ } else {
137
+ return Err ( anyhow ! ( "unsupported column type" ) . into ( ) ) ;
138
+ }
139
+ }
140
+ Ok ( ( ) )
56
141
}
57
142
58
143
/// Get all chunks of the table.
59
144
pub async fn all_chunks ( & self ) -> StorageResult < Vec < DataChunk > > {
60
- todo ! ( )
145
+ let mut columns = vec ! [ ] ;
146
+ for ( idx, _) in self . column_descs . iter ( ) . enumerate ( ) {
147
+ let column_path = self . column_path ( idx) ;
148
+ let data = tokio:: fs:: read ( column_path) . await . map_err ( err) ?;
149
+ columns. push ( decode_int32_column ( & data) ?) ;
150
+ }
151
+ Ok ( vec ! [ columns. into_iter( ) . map( ArrayImpl :: Int32 ) . collect( ) ] )
61
152
}
62
153
}
0 commit comments