File tree Expand file tree Collapse file tree 1 file changed +12
-5
lines changed
bench-vortex/src/random_access Expand file tree Collapse file tree 1 file changed +12
-5
lines changed Original file line number Diff line number Diff line change @@ -25,9 +25,12 @@ use stream::StreamExt;
2525use vortex:: array:: Array ;
2626use vortex:: array:: ArrayRef ;
2727use vortex:: array:: IntoArray ;
28+ use vortex:: array:: VectorExecutor ;
2829use vortex:: array:: stream:: ArrayStreamExt ;
30+ use vortex:: array:: vectors:: VectorIntoArray ;
2931use vortex:: buffer:: Buffer ;
3032use vortex:: file:: OpenOptionsSessionExt ;
33+ use vortex:: layout:: layouts:: USE_VORTEX_OPERATORS ;
3134use vortex:: utils:: aliases:: hash_map:: HashMap ;
3235
3336use crate :: SESSION ;
@@ -43,18 +46,22 @@ pub async fn take_vortex_tokio(
4346}
4447
4548async fn take_vortex ( reader : impl AsRef < Path > , indices : Buffer < u64 > ) -> anyhow:: Result < ArrayRef > {
46- Ok ( SESSION
49+ let array = SESSION
4750 . open_options ( )
4851 . open ( reader. as_ref ( ) )
4952 . await ?
5053 . scan ( ) ?
5154 . with_row_indices ( indices)
5255 . into_array_stream ( ) ?
5356 . read_all ( )
54- . await ?
55- // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es.
56- . to_canonical ( )
57- . into_array ( ) )
57+ . await ?;
58+
59+ // We canonicalize / decompress for equivalence to Arrow's `RecordBatch`es.
60+ Ok ( if * USE_VORTEX_OPERATORS {
61+ array. execute_vector ( & SESSION ) ?. into_array ( array. dtype ( ) )
62+ } else {
63+ array. to_canonical ( ) . into_array ( )
64+ } )
5865}
5966
6067pub async fn take_parquet ( path : & Path , indices : Buffer < u64 > ) -> anyhow:: Result < RecordBatch > {
You can’t perform that action at this time.
0 commit comments