@@ -5,17 +5,21 @@ use std::sync::Arc;
55
66use itertools:: Itertools ;
77use vortex_buffer:: BufferHandle ;
8+ use vortex_compute:: filter:: Filter ;
89use vortex_dtype:: DType ;
910use vortex_error:: VortexExpect ;
1011use vortex_error:: VortexResult ;
1112use vortex_error:: vortex_bail;
13+ use vortex_mask:: Mask ;
14+ use vortex_vector:: Vector ;
1215use vortex_vector:: struct_:: StructVector ;
1316
1417use crate :: EmptyMetadata ;
1518use crate :: arrays:: struct_:: StructArray ;
1619use crate :: kernel:: BindCtx ;
20+ use crate :: kernel:: Kernel ;
1721use crate :: kernel:: KernelRef ;
18- use crate :: kernel:: kernel ;
22+ use crate :: kernel:: PushDownResult ;
1923use crate :: serde:: ArrayChildren ;
2024use crate :: validity:: Validity ;
2125use crate :: vtable;
@@ -115,13 +119,42 @@ impl VTable for StructVTable {
115119 . try_collect ( ) ?;
116120 let validity_mask = array. validity_mask ( ) ;
117121
118- Ok ( kernel ( move || {
119- // SAFETY: we know that all field lengths match the struct array length, and the validity
120- let fields = fields. into_iter ( ) . map ( |k| k. execute ( ) ) . try_collect ( ) ?;
121- Ok ( unsafe { StructVector :: new_unchecked ( Arc :: new ( fields) , validity_mask) } . into ( ) )
122+ Ok ( Box :: new ( StructKernel {
123+ fields,
124+ validity_mask,
122125 } ) )
123126 }
124127}
125128
129+ #[ derive( Debug ) ]
130+ struct StructKernel {
131+ fields : Box < [ KernelRef ] > ,
132+ // TODO(ngates): hold a kernel that computes the mask.
133+ validity_mask : Mask ,
134+ }
135+
136+ impl Kernel for StructKernel {
137+ fn execute ( self : Box < Self > ) -> VortexResult < Vector > {
138+ // SAFETY: we know that all field lengths match the struct array length, and the validity
139+ let fields = self . fields . into_iter ( ) . map ( |k| k. execute ( ) ) . try_collect ( ) ?;
140+ Ok ( unsafe { StructVector :: new_unchecked ( Arc :: new ( fields) , self . validity_mask ) } . into ( ) )
141+ }
142+
143+ fn push_down_filter ( self : Box < Self > , selection : & Mask ) -> VortexResult < PushDownResult > {
144+ let fields = self
145+ . fields
146+ . into_iter ( )
147+ . map ( |k| k. force_push_down_filter ( selection) )
148+ . try_collect ( ) ?;
149+
150+ let validity_mask = self . validity_mask . filter ( selection) ;
151+
152+ Ok ( PushDownResult :: Pushed ( Box :: new ( StructKernel {
153+ fields,
154+ validity_mask,
155+ } ) ) )
156+ }
157+ }
158+
126159#[ derive( Debug ) ]
127160pub struct StructVTable ;
0 commit comments