@@ -5,13 +5,16 @@ use std::collections::BTreeSet;
55use std:: ops:: BitAnd ;
66use std:: ops:: Range ;
77use std:: sync:: Arc ;
8+ use std:: sync:: LazyLock ;
89
910use futures:: FutureExt ;
1011use futures:: future:: BoxFuture ;
1112use vortex_array:: Array ;
1213use vortex_array:: ArrayRef ;
1314use vortex_array:: MaskFuture ;
15+ use vortex_array:: compute:: filter;
1416use vortex_array:: expr:: Expression ;
17+ use vortex_array:: expr:: Root ;
1518use vortex_array:: serde:: ArrayParts ;
1619use vortex_array:: session:: ArraySessionExt ;
1720use vortex_dtype:: DType ;
@@ -27,6 +30,19 @@ use crate::layouts::SharedArrayFuture;
2730use crate :: layouts:: flat:: FlatLayout ;
2831use crate :: segments:: SegmentSource ;
2932
33+ static USE_VORTEX_OPERATORS : LazyLock < bool > = LazyLock :: new ( || {
34+ std:: env:: var ( "VORTEX_OPERATORS" )
35+ . map ( |v| v == "1" || v. to_lowercase ( ) == "true" )
36+ . unwrap_or ( false )
37+ } ) ;
38+
39+ /// The threshold of mask density below which we will evaluate the expression only over the
40+ /// selected rows, and above which we evaluate the expression over all rows and then select
41+ /// after.
42+ // TODO(ngates): more experimentation is needed, and this should probably be dynamic based on the
43+ // actual expression? Perhaps all expressions are given a selection mask to decide for themselves?
44+ const EXPR_EVAL_THRESHOLD : f64 = 0.2 ;
45+
3046pub struct FlatReader {
3147 layout : FlatLayout ,
3248 name : Arc < str > ,
@@ -137,18 +153,47 @@ impl LayoutReader for FlatReader {
137153 array = array. slice ( row_range. clone ( ) ) ;
138154 }
139155
140- // Apply the expression to the array.
141- let array = array. apply ( & expr) ?;
156+ let array_mask = if * USE_VORTEX_OPERATORS {
157+ // Apply the expression to the array.
158+ let array = array. apply ( & expr) ?;
142159
143- log:: info!( "Filter Array:\n {}" , array. display_tree( ) ) ;
144- let array = optimizer. optimize_array ( array) ?;
145- log:: warn!( "Optimized Filter Array:\n {}" , array. display_tree( ) ) ;
160+ log:: info!( "Filter Array:\n {}" , array. display_tree( ) ) ;
161+ let array = optimizer. optimize_array ( array) ?;
162+ log:: warn!( "Optimized Filter Array:\n {}" , array. display_tree( ) ) ;
146163
147- // Evaluate the array into a mask.
148- let array_mask = array. execute_mask ( & session) ?;
149- let array_mask = mask. bitand ( & array_mask) ;
164+ // Evaluate the array into a mask.
165+ let array_mask = array. execute_mask ( & session) ?;
166+ mask. bitand ( & array_mask)
167+ } else {
168+ // TODO(ngates): the mask may actually be dense within a range, as is often the case when
169+ // we have approximate mask results from a zone map. In which case we could look at
170+ // the true_count between the mask's first and last true positions.
171+ // TODO(ngates): we could also track runtime statistics about whether it's worth selecting
172+ // or not.
173+ if mask. density ( ) < EXPR_EVAL_THRESHOLD {
174+ // Evaluate only the selected rows of the mask.
175+ array = filter ( & array, & mask) ?;
176+ // TODO(joe): fixme casting null to false is *VERY* unsound, if the expression in the filter
177+ // can inspect nulls (e.g. `is_null`).
178+ // you will need to call the array evaluation instead of the mask evaluation.
179+ let array_mask = expr
180+ . evaluate ( & array)
181+ . map_err ( |err| {
182+ err. with_context ( format ! ( "While evaluating filter {}" , expr) )
183+ } ) ?
184+ . try_to_mask_fill_null_false ( ) ?;
185+ mask. intersect_by_rank ( & array_mask)
186+ } else {
187+ // Evaluate all rows, avoiding the more expensive rank intersection.
188+ array = expr. evaluate ( & array) . map_err ( |err| {
189+ err. with_context ( format ! ( "While evaluating filter {}" , expr) )
190+ } ) ?;
191+ let array_mask = array. try_to_mask_fill_null_false ( ) ?;
192+ mask. bitand ( & array_mask)
193+ }
194+ } ;
150195
151- log:: warn !(
196+ log:: debug !(
152197 "Flat mask evaluation {} - {} (mask = {}) => {}" ,
153198 name,
154199 expr,
@@ -186,17 +231,34 @@ impl LayoutReader for FlatReader {
186231 array = array. slice ( row_range. clone ( ) ) ;
187232 }
188233
189- // Evaluate the projection expression.
190- array = array. apply ( & expr) ?;
234+ if * USE_VORTEX_OPERATORS {
235+ // Evaluate the projection expression.
236+ array = array. apply ( & expr) ?;
191237
192- // Filter the array based on the row mask.
193- if !mask. all_true ( ) {
194- array = array. filter ( & mask) ?;
195- }
238+ // Filter the array based on the row mask.
239+ if !mask. all_true ( ) {
240+ array = array. filter ( & mask) ?;
241+ }
196242
197- log:: info!( "Project Array:\n {}" , array. display_tree( ) ) ;
198- let array = optimizer. optimize_array ( array) ?;
199- log:: info!( "Optimized Project Array:\n {}" , array. display_tree( ) ) ;
243+ log:: debug!( "Project Array:\n {}" , array. display_tree( ) ) ;
244+ let array = optimizer. optimize_array ( array) ?;
245+ log:: info!( "Optimized Project Array:\n {}" , array. display_tree( ) ) ;
246+ array
247+ } else {
248+ // Filter the array based on the row mask.
249+ if !mask. all_true ( ) {
250+ array = filter ( & array, & mask) ?;
251+ array = array. filter ( & mask) ?;
252+ }
253+
254+ // Evaluate the projection expression.
255+ if !expr. is :: < Root > ( ) {
256+ array = expr. evaluate ( & array) . map_err ( |err| {
257+ err. with_context ( format ! ( "While evaluating projection {}" , expr) )
258+ } ) ?;
259+ }
260+ array
261+ }
200262
201263 Ok ( array)
202264 }
0 commit comments