Skip to content

Commit 2e2eecf

Browse files
authored
Change evaluations to take MaskFuture (#4482)
We want the option to be able to run the conjunct and projection expressions in parallel, at least in order to race their I/O. Without a MaskFuture, we cannot begin to process the projection evaluation without first completing the download and evaluation of the conjuncts. We currently use pre-fetching to get around this, but it's a little crude and we'd rather be explicit. This PR does not race conjuncts, simply changes the signature. --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent f3a94e1 commit 2e2eecf

File tree

13 files changed

+187
-118
lines changed

13 files changed

+187
-118
lines changed

vortex-layout/src/layouts/chunked/reader.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::layouts::chunked::ChunkedLayout;
2222
use crate::reader::LayoutReader;
2323
use crate::segments::SegmentSource;
2424
use crate::{
25-
ArrayEvaluation, LayoutReaderRef, LazyReaderChildren, MaskEvaluation, PruningEvaluation,
25+
ArrayEvaluation, LayoutReaderRef, LazyReaderChildren, MaskEvaluation, MaskFuture,
26+
PruningEvaluation,
2627
};
2728

2829
/// A [`LayoutReader`] for chunked layouts.
@@ -286,27 +287,16 @@ struct ChunkedMaskEvaluation {
286287

287288
#[async_trait]
288289
impl MaskEvaluation for ChunkedMaskEvaluation {
289-
async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
290-
log::debug!(
291-
"Chunked mask evaluation {} (mask = {})",
292-
self.name,
293-
mask.density()
294-
);
290+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask> {
291+
log::debug!("Chunked mask evaluation {}", self.name,);
295292

296293
// Split the mask over each chunk.
297294
let masks: Vec<_> = FuturesOrdered::from_iter(
298295
self.mask_ranges
299296
.iter()
300297
.map(|range| mask.slice(range.clone()))
301298
.zip_eq(&self.chunk_evals)
302-
.map(|(mask, chunk_eval)| {
303-
if mask.all_false() {
304-
// If the mask is all false, we can skip the evaluation.
305-
ready(Ok(mask)).boxed()
306-
} else {
307-
chunk_eval.invoke(mask).boxed()
308-
}
309-
}),
299+
.map(|(mask, chunk_eval)| chunk_eval.invoke(mask).boxed()),
310300
)
311301
.try_collect()
312302
.await?;
@@ -329,14 +319,13 @@ struct ChunkedArrayEvaluation {
329319

330320
#[async_trait]
331321
impl ArrayEvaluation for ChunkedArrayEvaluation {
332-
async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef> {
322+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<ArrayRef> {
333323
// Split the mask over each chunk.
334324
let chunks: Vec<_> = FuturesOrdered::from_iter(
335325
self.mask_ranges
336326
.iter()
337327
.map(|range| mask.slice(range.clone()))
338328
.zip_eq(&self.chunk_evals)
339-
.filter(|(mask, _chunk_eval)| mask.true_count() > 0)
340329
.map(|(mask, chunk_eval)| chunk_eval.invoke(mask)),
341330
)
342331
.try_collect()
@@ -364,13 +353,14 @@ mod test {
364353
use vortex_dtype::Nullability::NonNullable;
365354
use vortex_dtype::{DType, PType};
366355
use vortex_expr::root;
367-
use vortex_mask::Mask;
368356

369357
use crate::layouts::chunked::writer::ChunkedLayoutStrategy;
370358
use crate::layouts::flat::writer::FlatLayoutStrategy;
371359
use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
372360
use crate::sequence::SequenceId;
373-
use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
361+
use crate::{
362+
LayoutRef, LayoutStrategy, MaskFuture, SequentialStreamAdapter, SequentialStreamExt as _,
363+
};
374364

375365
#[fixture]
376366
/// Create a chunked layout with three chunks of primitive arrays.
@@ -410,7 +400,9 @@ mod test {
410400
.unwrap()
411401
.projection_evaluation(&(0..layout.row_count()), &root())
412402
.unwrap()
413-
.invoke(Mask::new_true(usize::try_from(layout.row_count()).unwrap()))
403+
.invoke(MaskFuture::new_true(
404+
usize::try_from(layout.row_count()).unwrap(),
405+
))
414406
.await
415407
.unwrap()
416408
.to_primitive();

vortex-layout/src/layouts/dict/reader.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use super::DictLayout;
2121
use crate::layouts::SharedArrayFuture;
2222
use crate::segments::SegmentSource;
2323
use crate::{
24-
ArrayEvaluation, LayoutReader, LayoutReaderRef, MaskEvaluation, NoOpPruningEvaluation,
25-
PruningEvaluation,
24+
ArrayEvaluation, LayoutReader, LayoutReaderRef, MaskEvaluation, MaskFuture,
25+
NoOpPruningEvaluation, PruningEvaluation,
2626
};
2727

2828
pub struct DictReader {
@@ -78,7 +78,7 @@ impl DictReader {
7878
.vortex_expect("must construct dict values array evaluation");
7979

8080
async move {
81-
eval.invoke(Mask::new_true(values_len))
81+
eval.invoke(MaskFuture::new_true(values_len))
8282
.await
8383
.map_err(Arc::new)
8484
}
@@ -175,35 +175,38 @@ struct DictMaskEvaluation {
175175

176176
#[async_trait]
177177
impl MaskEvaluation for DictMaskEvaluation {
178-
async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
179-
if mask.all_false() {
180-
return Ok(mask);
181-
}
182-
183-
let values_result = self.values_eval.clone().await?;
178+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask> {
179+
let values = self.values_eval.clone().await?;
180+
let mask = mask.await?;
184181

185182
// Short-circuit when the values are all true/false.
186-
if let Some(MinMaxResult { min, max }) = min_max(&values_result)? {
183+
if let Some(MinMaxResult { min, max }) = min_max(&values)? {
187184
if !max.as_bool().value().unwrap_or(true) {
188185
// All values are false
189186
return Ok(Mask::AllFalse(mask.len()));
190187
}
191188
if min.as_bool().value().unwrap_or(false) {
192189
// All values are true, but we still need to respect codes validity
193-
let codes = self.codes_eval.invoke(Mask::new_true(mask.len())).await?;
190+
let codes = self
191+
.codes_eval
192+
.invoke(MaskFuture::new_true(mask.len()))
193+
.await?;
194194
return Ok(mask.bitand(&codes.validity_mask()));
195195
}
196196
}
197197

198-
let codes = self.codes_eval.invoke(Mask::new_true(mask.len())).await?;
198+
let codes = self
199+
.codes_eval
200+
.invoke(MaskFuture::new_true(mask.len()))
201+
.await?;
199202
// Creating a mask from the dict array would canonicalize it,
200203
// it should be fine for now as long as values is already canonical,
201204
// so different row ranges do not canonicalize to the same array
202205
// multiple times.
203206
// TODO(joe): fixme casting null to false is *VERY* unsound, if the expression in the filter
204207
// can inspect nulls (e.g. `is_null`).
205208
// See `FlatEvaluation` for more details.
206-
let dict_mask = take(&values_result, &codes)?.try_to_mask_fill_null_false()?;
209+
let dict_mask = take(&values, &codes)?.try_to_mask_fill_null_false()?;
207210

208211
Ok(mask.bitand(&dict_mask))
209212
}
@@ -217,7 +220,7 @@ struct DictArrayEvaluation {
217220

218221
#[async_trait]
219222
impl ArrayEvaluation for DictArrayEvaluation {
220-
async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef> {
223+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<ArrayRef> {
221224
let (values_result, codes) = join!(self.values_eval.clone(), self.codes_eval.invoke(mask));
222225
let (values_result, codes) = (values_result?, codes?);
223226

@@ -240,14 +243,13 @@ mod tests {
240243
use vortex_array::{ArrayContext, IntoArray as _};
241244
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};
242245
use vortex_expr::{is_null, not, pack, root};
243-
use vortex_mask::Mask;
244246

245247
use crate::layouts::dict::writer::{DictLayoutOptions, DictStrategy};
246248
use crate::layouts::flat::writer::FlatLayoutStrategy;
247249
use crate::segments::{SequenceWriter, TestSegments};
248250
use crate::sequence::SequenceId;
249251
use crate::{
250-
LayoutId, LayoutRef, LayoutStrategy, LocalExecutor, SequentialStreamAdapter,
252+
LayoutId, LayoutRef, LayoutStrategy, LocalExecutor, MaskFuture, SequentialStreamAdapter,
251253
SequentialStreamExt,
252254
};
253255

@@ -307,7 +309,7 @@ mod tests {
307309
.unwrap()
308310
.projection_evaluation(&(0..layout.row_count()), &expression)
309311
.unwrap()
310-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
312+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
311313
.await
312314
.unwrap();
313315
let expected = StructArray::try_new(
@@ -388,7 +390,7 @@ mod tests {
388390
.unwrap()
389391
.filter_evaluation(&(0..3), &filter)
390392
.unwrap()
391-
.invoke(Mask::new_true(3))
393+
.invoke(MaskFuture::new_true(3))
392394
.await
393395
.unwrap();
394396

@@ -448,7 +450,7 @@ mod tests {
448450
.unwrap()
449451
.projection_evaluation(&(0..layout.row_count()), &expression)
450452
.unwrap()
451-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
453+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
452454
.await
453455
.unwrap();
454456
let expected = array.validity_mask().into_array();

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use crate::layouts::SharedArrayFuture;
2323
use crate::layouts::flat::FlatLayout;
2424
use crate::segments::SegmentSource;
2525
use crate::{
26-
ArrayEvaluation, LayoutReader, MaskEvaluation, NoOpPruningEvaluation, PruningEvaluation,
26+
ArrayEvaluation, LayoutReader, MaskEvaluation, MaskFuture, NoOpPruningEvaluation,
27+
PruningEvaluation,
2728
};
2829

2930
/// The threshold of mask density below which we will evaluate the expression only over the
@@ -150,13 +151,12 @@ struct FlatEvaluation {
150151

151152
#[async_trait]
152153
impl MaskEvaluation for FlatEvaluation {
153-
async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
154+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<Mask> {
154155
// TODO(ngates): if the mask density is low enough, or if the mask is dense within a range
155156
// (as often happens with zone map pruning), then we could slice/filter the array prior
156157
// to evaluating the expression.
157-
158-
// Now we await the array .
159158
let mut array = self.array.clone().await?;
159+
let mask = mask.await?;
160160

161161
if let Some(array) =
162162
try_evaluate_using_operator(self.row_range.clone(), &array, &self.expr, &mask)?
@@ -208,16 +208,11 @@ impl MaskEvaluation for FlatEvaluation {
208208

209209
#[async_trait]
210210
impl ArrayEvaluation for FlatEvaluation {
211-
async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef> {
212-
log::debug!(
213-
"Flat array evaluation {} - {} (mask = {})",
214-
self.name,
215-
self.expr,
216-
mask.density(),
217-
);
211+
async fn invoke(&self, mask: MaskFuture) -> VortexResult<ArrayRef> {
212+
log::debug!("Flat array evaluation {} - {}", self.name, self.expr);
218213

219-
// Now we await the array .
220214
let mut array = self.array.clone().await?;
215+
let mask = mask.await?;
221216

222217
if let Some(array) =
223218
try_evaluate_using_operator(self.row_range.clone(), &array, &self.expr, &mask)?
@@ -305,12 +300,11 @@ mod test {
305300
use vortex_array::{ArrayContext, ToCanonical};
306301
use vortex_buffer::buffer;
307302
use vortex_expr::{gt, lit, root};
308-
use vortex_mask::Mask;
309303

310304
use crate::layouts::flat::writer::FlatLayoutStrategy;
311305
use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
312306
use crate::sequence::SequenceId;
313-
use crate::{LayoutStrategy as _, SequentialStreamAdapter, SequentialStreamExt};
307+
use crate::{LayoutStrategy as _, MaskFuture, SequentialStreamAdapter, SequentialStreamExt};
314308

315309
#[test]
316310
fn flat_identity() {
@@ -339,7 +333,7 @@ mod test {
339333
.unwrap()
340334
.projection_evaluation(&(0..layout.row_count()), &root())
341335
.unwrap()
342-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
336+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
343337
.await
344338
.unwrap()
345339
.to_primitive();
@@ -379,7 +373,7 @@ mod test {
379373
.unwrap()
380374
.projection_evaluation(&(0..layout.row_count()), &expr)
381375
.unwrap()
382-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
376+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
383377
.await
384378
.unwrap()
385379
.to_bool();
@@ -418,7 +412,7 @@ mod test {
418412
.unwrap()
419413
.projection_evaluation(&(2..4), &root())
420414
.unwrap()
421-
.invoke(Mask::new_true(2))
415+
.invoke(MaskFuture::new_true(2))
422416
.await
423417
.unwrap()
424418
.to_primitive();

vortex-layout/src/layouts/flat/writer.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,14 @@ mod tests {
147147
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};
148148
use vortex_error::VortexUnwrap;
149149
use vortex_expr::root;
150-
use vortex_mask::{AllOr, Mask};
150+
use vortex_mask::AllOr;
151151

152152
use crate::layouts::flat::writer::FlatLayoutStrategy;
153153
use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
154154
use crate::sequence::SequenceId;
155155
use crate::{
156-
LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter, SequentialStreamExt as _,
156+
LayoutStrategy, MaskFuture, SendableSequentialStream, SequentialStreamAdapter,
157+
SequentialStreamExt as _,
157158
};
158159

159160
fn stream_only(array: ArrayRef) -> SendableSequentialStream {
@@ -185,7 +186,7 @@ mod tests {
185186
.unwrap()
186187
.projection_evaluation(&(0..layout.row_count()), &root())
187188
.unwrap()
188-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
189+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
189190
.await
190191
.unwrap();
191192

@@ -226,7 +227,7 @@ mod tests {
226227
.unwrap()
227228
.projection_evaluation(&(0..layout.row_count()), &root())
228229
.unwrap()
229-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
230+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
230231
.await
231232
.unwrap();
232233

@@ -286,7 +287,7 @@ mod tests {
286287
.unwrap()
287288
.projection_evaluation(&(0..layout.row_count()), &root())
288289
.unwrap()
289-
.invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
290+
.invoke(MaskFuture::new_true(layout.row_count().try_into().unwrap()))
290291
.await
291292
.unwrap();
292293

0 commit comments

Comments
 (0)