33//! This module defines the VexLake data schema and provides utilities for
44//! reading and writing vector data in Parquet format.
55
6- use arrow:: array:: { ArrayRef , FixedSizeListArray , Float32Array , RecordBatch , UInt64Array , StringArray } ;
6+ use arrow:: array:: {
7+ ArrayRef , FixedSizeListArray , Float32Array , RecordBatch , StringArray , UInt64Array ,
8+ } ;
79use arrow:: datatypes:: { DataType , Field , Schema , SchemaRef } ;
810use std:: sync:: Arc ;
911
10- use crate :: { Error , Result } ;
1112use super :: StorageClient ;
13+ use crate :: { Error , Result } ;
1214
1315/// Schema for VexLake vector data
1416pub struct VexSchema ;
@@ -52,19 +54,24 @@ impl<'a> ParquetWriter<'a> {
5254 metadata : & [ Option < String > ] ,
5355 ) -> Result < RecordBatch > {
5456 if ids. len ( ) != vectors. len ( ) || ids. len ( ) != metadata. len ( ) {
55- return Err ( Error :: InvalidConfig ( "Input arrays must have same length" . to_string ( ) ) ) ;
57+ return Err ( Error :: InvalidConfig (
58+ "Input arrays must have same length" . to_string ( ) ,
59+ ) ) ;
5660 }
5761
5862 let id_array = UInt64Array :: from ( ids. to_vec ( ) ) ;
59-
63+
6064 let mut flattened_vectors = Vec :: with_capacity ( vectors. len ( ) * self . dimension ) ;
6165 for v in vectors {
6266 if v. len ( ) != self . dimension {
63- return Err ( Error :: DimensionMismatch { expected : self . dimension , actual : v. len ( ) } ) ;
67+ return Err ( Error :: DimensionMismatch {
68+ expected : self . dimension ,
69+ actual : v. len ( ) ,
70+ } ) ;
6471 }
6572 flattened_vectors. extend_from_slice ( v) ;
6673 }
67-
74+
6875 let values = Float32Array :: from ( flattened_vectors) ;
6976 let field = Arc :: new ( Field :: new ( "item" , DataType :: Float32 , true ) ) ;
7077 let vector_array = FixedSizeListArray :: try_new (
@@ -73,7 +80,7 @@ impl<'a> ParquetWriter<'a> {
7380 Arc :: new ( values) as ArrayRef ,
7481 None ,
7582 )
76- . map_err ( |e| Error :: Arrow ( e ) ) ?;
83+ . map_err ( Error :: Arrow ) ?;
7784
7885 let metadata_array = StringArray :: from ( metadata. to_vec ( ) ) ;
7986
@@ -95,17 +102,23 @@ impl<'a> ParquetWriter<'a> {
95102 let mut buf = Vec :: new ( ) ;
96103 let mut writer = AsyncArrowWriter :: try_new ( & mut buf, batch. schema ( ) , None )
97104 . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
98-
99- writer. write ( batch) . await . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
100- writer. close ( ) . await . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
105+
106+ writer
107+ . write ( batch)
108+ . await
109+ . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
110+ writer
111+ . close ( )
112+ . await
113+ . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
101114
102115 self . client . write ( path, buf) . await ?;
103116 Ok ( ( ) )
104117 }
105118}
106119
107- use datafusion:: prelude:: * ;
108120use datafusion:: physical_plan:: collect;
121+ use datafusion:: prelude:: * ;
109122
110123/// Reader for VexLake Parquet files using DataFusion
111124pub struct ParquetReader < ' a > {
@@ -122,51 +135,73 @@ impl<'a> ParquetReader<'a> {
122135 pub async fn read_all ( & self , path : & str ) -> Result < Vec < RecordBatch > > {
123136 // DataFusion SessionContext
124137 let _ctx = SessionContext :: new ( ) ;
125-
126- // Since we are using OpenDAL, for now we might need to read the whole file
138+
139+ // Since we are using OpenDAL, for now we might need to read the whole file
127140 // into memory or implement an ObjectStore for DataFusion.
128141 // For simplicity in this phase, we'll read the file and use ctx.read_parquet with a local path
129- // OR better, we use RecordBatchReader from the parquet crate directly for now
142+ // OR better, we use RecordBatchReader from the parquet crate directly for now
130143 // until we have the full DataFusion ObjectStore integrated.
131-
144+
132145 let data = self . client . read ( path) . await ?;
133146 let bytes = bytes:: Bytes :: from ( data) ;
134-
147+
135148 use parquet:: arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ;
136-
149+
137150 let builder = ParquetRecordBatchReaderBuilder :: try_new ( bytes)
138151 . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
139-
152+
140153 let reader = builder. build ( ) . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
141-
154+
142155 let mut batches = Vec :: new ( ) ;
143156 for batch in reader {
144157 batches. push ( batch. map_err ( Error :: Arrow ) ?) ;
145158 }
146-
159+
147160 Ok ( batches)
148161 }
149162
150163 /// Execute a query using DataFusion
151164 pub async fn query ( & self , path : & str , sql : & str ) -> Result < Vec < RecordBatch > > {
152165 let ctx = SessionContext :: new ( ) ;
153-
166+
154167 // We'll write to a temp file to allow DataFusion to read it
155168 // TODO: In Phase 4, we will register an ObjectStore for direct S3 reading
156169 let data = self . client . read ( path) . await ?;
157- let temp_dir = tempfile:: tempdir ( ) . map_err ( |e| Error :: Storage ( opendal:: Error :: new ( opendal:: ErrorKind :: Unexpected , & e. to_string ( ) ) ) ) ?;
170+ let temp_dir = tempfile:: tempdir ( ) . map_err ( |e| {
171+ Error :: Storage ( Box :: new ( opendal:: Error :: new (
172+ opendal:: ErrorKind :: Unexpected ,
173+ e. to_string ( ) ,
174+ ) ) )
175+ } ) ?;
158176 let file_path = temp_dir. path ( ) . join ( "data.parquet" ) ;
159- std:: fs:: write ( & file_path, data) . map_err ( |e| Error :: Storage ( opendal:: Error :: new ( opendal:: ErrorKind :: Unexpected , & e. to_string ( ) ) ) ) ?;
177+ std:: fs:: write ( & file_path, data) . map_err ( |e| {
178+ Error :: Storage ( Box :: new ( opendal:: Error :: new (
179+ opendal:: ErrorKind :: Unexpected ,
180+ e. to_string ( ) ,
181+ ) ) )
182+ } ) ?;
183+
184+ ctx. register_parquet (
185+ "vectors" ,
186+ file_path. to_str ( ) . unwrap ( ) ,
187+ ParquetReadOptions :: default ( ) ,
188+ )
189+ . await
190+ . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
160191
161- ctx. register_parquet ( "vectors" , file_path. to_str ( ) . unwrap ( ) , ParquetReadOptions :: default ( ) )
192+ let df = ctx
193+ . sql ( sql)
194+ . await
195+ . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
196+ let plan = df
197+ . create_physical_plan ( )
162198 . await
163199 . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
164-
165- let df = ctx. sql ( sql) . await . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
166- let plan = df. create_physical_plan ( ) . await . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
167200 let task_ctx = ctx. task_ctx ( ) ;
168-
169- let result = collect ( plan, task_ctx) . await . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
201+
202+ let result = collect ( plan, task_ctx)
203+ . await
204+ . map_err ( |e| Error :: Index ( e. to_string ( ) ) ) ?;
170205 Ok ( result)
171206 }
172207}
@@ -182,17 +217,14 @@ mod tests {
182217 let reader = ParquetReader :: new ( & client) ;
183218
184219 let ids = vec ! [ 1 , 2 ] ;
185- let vectors = vec ! [
186- vec![ 1.0 , 2.0 , 3.0 ] ,
187- vec![ 4.0 , 5.0 , 6.0 ] ,
188- ] ;
189- let metadata = vec ! [
190- Some ( "{\" tag\" : \" a\" }" . to_string( ) ) ,
191- None ,
192- ] ;
220+ let vectors = vec ! [ vec![ 1.0 , 2.0 , 3.0 ] , vec![ 4.0 , 5.0 , 6.0 ] ] ;
221+ let metadata = vec ! [ Some ( "{\" tag\" : \" a\" }" . to_string( ) ) , None ] ;
193222
194223 let batch = writer. create_batch ( & ids, & vectors, & metadata) . unwrap ( ) ;
195- writer. write_batch ( "data/test.parquet" , & batch) . await . unwrap ( ) ;
224+ writer
225+ . write_batch ( "data/test.parquet" , & batch)
226+ . await
227+ . unwrap ( ) ;
196228
197229 assert ! ( client. exists( "data/test.parquet" ) . await . unwrap( ) ) ;
198230
@@ -202,7 +234,10 @@ mod tests {
202234 assert_eq ! ( read_batches[ 0 ] . num_rows( ) , 2 ) ;
203235
204236 // Test query
205- let query_results = reader. query ( "data/test.parquet" , "SELECT id FROM vectors WHERE id = 1" ) . await . unwrap ( ) ;
237+ let query_results = reader
238+ . query ( "data/test.parquet" , "SELECT id FROM vectors WHERE id = 1" )
239+ . await
240+ . unwrap ( ) ;
206241 assert_eq ! ( query_results. len( ) , 1 ) ;
207242 assert_eq ! ( query_results[ 0 ] . num_rows( ) , 1 ) ;
208243 }
0 commit comments