33
44use std:: sync:: Arc ;
55
6+ use vortex_compute:: filter:: Filter ;
67use vortex_error:: { VortexResult , vortex_panic} ;
78use vortex_mask:: Mask ;
8- use vortex_vector:: { Vector , VectorOps , vector_matches_dtype} ;
9+ use vortex_vector:: { Vector , vector_matches_dtype} ;
910
1011use crate :: execution:: { BatchKernelRef , BindCtx , DummyExecutionCtx , ExecutionCtx } ;
1112use crate :: pipeline:: PipelinedNode ;
@@ -24,7 +25,7 @@ pub trait ArrayOperator: 'static + Send + Sync {
2425 ///
2526 /// If the mask length does not match the array length.
2627 /// If the array's implementation returns an invalid vector (wrong length, wrong type, etc.).
27- fn execute_batch ( & self , selection : & Mask , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > ;
28+ fn execute_batch ( & self , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > ;
2829
2930 /// Optimize the array by running the optimization rules.
3031 fn reduce ( & self ) -> VortexResult < Option < ArrayRef > > ;
@@ -44,8 +45,8 @@ pub trait ArrayOperator: 'static + Send + Sync {
4445}
4546
4647impl ArrayOperator for Arc < dyn Array > {
47- fn execute_batch ( & self , selection : & Mask , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > {
48- self . as_ref ( ) . execute_batch ( selection , ctx)
48+ fn execute_batch ( & self , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > {
49+ self . as_ref ( ) . execute_batch ( ctx)
4950 }
5051
5152 fn reduce ( & self ) -> VortexResult < Option < ArrayRef > > {
@@ -70,17 +71,8 @@ impl ArrayOperator for Arc<dyn Array> {
7071}
7172
7273impl < V : VTable > ArrayOperator for ArrayAdapter < V > {
73- fn execute_batch ( & self , selection : & Mask , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > {
74- let vector =
75- <V :: OperatorVTable as OperatorVTable < V > >:: execute_batch ( & self . 0 , selection, ctx) ?;
76-
77- // Such a cheap check that we run it always. More expensive DType checks live in
78- // debug_assertions.
79- assert_eq ! (
80- vector. len( ) ,
81- selection. true_count( ) ,
82- "Batch execution returned vector of incorrect length"
83- ) ;
74+ fn execute_batch ( & self , ctx : & mut dyn ExecutionCtx ) -> VortexResult < Vector > {
75+ let vector = V :: execute ( & self . 0 , ctx) ?;
8476
8577 if cfg ! ( debug_assertions) {
8678 // Checks for correct type and nullability.
@@ -130,17 +122,20 @@ impl BindCtx for () {
130122
131123impl dyn Array + ' _ {
132124 pub fn execute ( & self ) -> VortexResult < Vector > {
133- self . execute_with_selection ( & Mask :: new_true ( self . len ( ) ) )
125+ // Check if the array is a pipeline node
126+ if self . as_pipelined ( ) . is_some ( ) {
127+ return PipelineDriver :: new ( self . to_array ( ) ) . execute ( & Mask :: new_true ( self . len ( ) ) ) ;
128+ }
129+ self . execute_batch ( & mut DummyExecutionCtx )
134130 }
135131
136132 pub fn execute_with_selection ( & self , selection : & Mask ) -> VortexResult < Vector > {
137- assert_eq ! ( self . len( ) , selection. len( ) ) ;
138-
139133 // Check if the array is a pipeline node
140134 if self . as_pipelined ( ) . is_some ( ) {
141135 return PipelineDriver :: new ( self . to_array ( ) ) . execute ( selection) ;
142136 }
143-
144- self . execute_batch ( selection, & mut DummyExecutionCtx )
137+ Ok ( self
138+ . execute_batch ( & mut DummyExecutionCtx ) ?
139+ . filter ( selection) )
145140 }
146141}
0 commit comments