@@ -5,13 +5,13 @@ use std::collections::BTreeSet;
55use std:: ops:: Range ;
66use std:: sync:: Arc ;
77
8- use futures:: try_join;
8+ use futures:: { FutureExt , try_join} ;
99use itertools:: Itertools ;
10- use vortex_array:: arrays:: { StructArray , StructVTable } ;
10+ use vortex_array:: arrays:: StructArray ;
1111use vortex_array:: stats:: Precision ;
1212use vortex_array:: validity:: Validity ;
13- use vortex_array:: { ArrayRef , IntoArray , MaskFuture , ToCanonical } ;
14- use vortex_dtype:: { DType , FieldDType , FieldMask , FieldName , Nullability , StructFields } ;
13+ use vortex_array:: { Array , IntoArray , MaskFuture , ToCanonical } ;
14+ use vortex_dtype:: { DType , FieldMask , FieldName , Nullability , StructFields } ;
1515use vortex_error:: { VortexExpect , VortexResult , vortex_err} ;
1616use vortex_expr:: transform:: immediate_access:: annotate_scope_access;
1717use vortex_expr:: transform:: {
@@ -274,56 +274,46 @@ impl LayoutReader for StructReader {
274274 . transpose ( ) ?;
275275
276276 // Partition the expression into expressions that can be evaluated over individual fields
277- let projection_fut = match & self . partition_expr ( expr. clone ( ) ) {
278- Partitioned :: Single ( name, partition) => self
279- . field_reader ( name) ?
280- . projection_evaluation ( row_range, partition, mask) ,
277+ let array_future = match & self . partition_expr ( expr. clone ( ) ) {
278+ Partitioned :: Single ( name, partition) => {
279+ // We should yield a new StructArray that has the given partition info.
280+ let name = name. clone ( ) ;
281+ self . field_reader ( & name) ?
282+ . projection_evaluation ( row_range, partition, mask) ?
283+ . map ( |result| {
284+ result. and_then ( move |array| {
285+ let len = array. len ( ) ;
286+ StructArray :: try_new (
287+ vec ! [ name] . into ( ) ,
288+ vec ! [ array] ,
289+ len,
290+ Validity :: NonNullable ,
291+ )
292+ . map ( |a| a. into_array ( ) )
293+ } )
294+ } )
295+ . boxed ( )
296+ }
281297 Partitioned :: Multi ( partitioned) => {
298+ // Apply the validity to each internal field instead.
282299 partitioned
283300 . clone ( )
284301 . into_array_future ( mask, |name, expr, mask| {
285302 self . field_reader ( name) ?
286303 . projection_evaluation ( row_range, expr, mask)
287- } )
304+ } ) ?
288305 }
289- } ? ;
306+ } ;
290307
291308 Ok ( Box :: pin ( async move {
292309 if let Some ( validity_fut) = validity_fut {
293- // Apply the validity array back onto the projected rows.
294- // NOTE: this only works while VortexExpr is linear. Once an expression
295- // can return a result of different length (e.g. `UNNEST`) then this becomes
296- // more complicated.
297- let ( validity, projection) = try_join ! ( validity_fut, projection_fut) ?;
298-
299- // The expression partitioner takes care of re-packing the projection results into
300- // a new struct array.
301- // Since the pack is always present, we apply the validity back onto the children
302- // directly.
303-
304- let projection_struct = projection. as_ :: < StructVTable > ( ) . clone ( ) ;
305- let field_names = projection_struct. struct_fields ( ) . names ( ) . clone ( ) ;
306-
307- let fields: Vec < ArrayRef > = projection_struct
308- . fields ( )
309- . iter ( )
310- . map ( |field| {
311- vortex_array:: compute:: mask (
312- field. as_ref ( ) ,
313- & Mask :: from_buffer ( !validity. to_bool ( ) . boolean_buffer ( ) ) ,
314- )
315- } )
316- . try_collect ( ) ?;
317-
318- Ok ( StructArray :: try_new (
319- field_names,
320- fields,
321- projection_struct. len ( ) ,
322- Validity :: NonNullable ,
323- ) ?
324- . into_array ( ) )
310+ let ( validity, array) = try_join ! ( validity_fut, array_future) ?;
311+ vortex_array:: compute:: mask (
312+ array. as_ref ( ) ,
313+ & Mask :: from_buffer ( !validity. to_bool ( ) . boolean_buffer ( ) ) ,
314+ )
325315 } else {
326- projection_fut . await
316+ array_future . await
327317 }
328318 } ) )
329319 }
@@ -339,8 +329,7 @@ mod tests {
339329 use vortex_array:: validity:: Validity ;
340330 use vortex_array:: { Array , ArrayContext , IntoArray , MaskFuture , ToCanonical } ;
341331 use vortex_buffer:: buffer;
342- use vortex_dtype:: Nullability :: NonNullable ;
343- use vortex_dtype:: { DType , Nullability , PType } ;
332+ use vortex_dtype:: { DType , FieldDType , FieldName , Nullability , PType , StructFields } ;
344333 use vortex_expr:: { col, eq, get_item, gt, lit, or, pack, root} ;
345334 use vortex_io:: runtime:: single:: block_on;
346335 use vortex_mask:: Mask ;
@@ -489,7 +478,7 @@ mod tests {
489478 let reader = layout. new_reader ( "" . into ( ) , segments) . unwrap ( ) ;
490479 let expr = pack (
491480 [ ( "a" , get_item ( "a" , root ( ) ) ) , ( "b" , get_item ( "b" , root ( ) ) ) ] ,
492- NonNullable ,
481+ Nullability :: NonNullable ,
493482 ) ;
494483 let result = block_on ( |_h| {
495484 reader
@@ -538,21 +527,24 @@ mod tests {
538527 . unwrap ( ) ;
539528
540529 let result = block_on ( move |_| project) . unwrap ( ) ;
541- // Result should be nullable primitive array
530+ // Result should be a struct with single field
542531 assert_eq ! (
543532 result. dtype( ) ,
544- & DType :: Primitive ( PType :: I32 , Nullability :: Nullable )
533+ & DType :: Struct (
534+ StructFields :: from_fields(
535+ vec![ FieldName :: from( "a" ) ] . into( ) ,
536+ vec![ FieldDType :: from( DType :: Primitive (
537+ PType :: I32 ,
538+ Nullability :: NonNullable ,
539+ ) ) ]
540+ ) ,
541+ Nullability :: Nullable ,
542+ )
545543 ) ;
546544
547545 assert_eq ! ( result. scalar_at( 0 ) , Scalar :: null( result. dtype( ) . clone( ) ) , ) ;
548546
549- assert_eq ! (
550- result. scalar_at( 1 ) ,
551- Scalar :: primitive( 2i32 , Nullability :: Nullable ) ,
552- ) ;
553- assert_eq ! (
554- result. scalar_at( 2 ) ,
555- Scalar :: primitive( 3i32 , Nullability :: Nullable ) ,
556- ) ;
547+ assert ! ( result. scalar_at( 1 ) . is_valid( ) ) ;
548+ assert ! ( result. scalar_at( 2 ) . is_valid( ) ) ;
557549 }
558550}
0 commit comments