Skip to content

Commit 3615760

Browse files
authored
Chore: add todo and clean a bit in pipeline step (#5334)
Signed-off-by: Connor Tsui <[email protected]>
1 parent 31b5671 commit 3615760

File tree

2 files changed

+30
-24
lines changed

2 files changed

+30
-24
lines changed

encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ mod tests {
217217
}
218218
}
219219

220+
#[ignore = "TODO(connor): need to filter in pipeline driver step"]
220221
#[test]
221222
fn test_bitpack_pipeline_dense_75_percent() {
222223
// Create exactly 1024 elements (0 to 1023).

vortex-array/src/pipeline/driver/mod.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::hash::{BuildHasher, Hash, Hasher};
1010

1111
use itertools::Itertools;
1212
use vortex_dtype::DType;
13-
use vortex_error::{VortexResult, vortex_bail};
13+
use vortex_error::{VortexResult, vortex_ensure};
1414
use vortex_mask::Mask;
1515
use vortex_utils::aliases::hash_map::{HashMap, RandomState};
1616
use vortex_vector::{Vector, VectorMut, VectorMutOps};
@@ -270,7 +270,7 @@ impl Pipeline {
270270

271271
/// Perform a single step of the pipeline.
272272
fn step(&mut self, selection: &BitView, output: &mut VectorMut) -> VortexResult<()> {
273-
// Loop over the kernels in toposorted execution order
273+
// Loop over the kernels in toposorted execution order.
274274
for &node_idx in self.exec_order.iter() {
275275
let kernel = &mut self.kernels[node_idx];
276276

@@ -283,38 +283,43 @@ impl Pipeline {
283283
assert!(tail.is_empty());
284284

285285
kernel.step(&self.ctx, selection, &mut tail)?;
286-
if tail.len() != N && tail.len() != selection.true_count() {
287-
vortex_bail!(
288-
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
289-
N,
290-
selection.true_count(),
291-
tail.len()
292-
);
286+
287+
let len = tail.len();
288+
vortex_ensure!(
289+
len == N || len == selection.true_count(),
290+
"Kernel produced incorrect number of output elements, \
291+
expected either {N} or {}, got {len}",
292+
selection.true_count(),
293+
);
294+
295+
// Since we are writing to the final vector, there are no other kernels who we
296+
// can delegate filtering the selection mask out to, so check if we need to do
297+
// a final filter before we return.
298+
if selection.true_count() < N && len == N {
299+
// tail.filter(selection_mask)
300+
todo!("Filter via a bit mask")
293301
}
294302

295-
// Now we append the produced output back to the main output vector.
303+
// Now we join the produced output back to the main output vector.
296304
output.unsplit(tail);
297305
}
298306
OutputTarget::IntermediateVector(vector_id) => {
299307
let mut out_vector = self.ctx.take_output(vector_id);
300308
out_vector.clear();
309+
debug_assert!(out_vector.is_empty());
301310

302-
assert!(out_vector.is_empty());
303311
kernel.step(&self.ctx, selection, &mut out_vector)?;
304312

305-
match out_vector.len() {
306-
// Valid cases are all N elements, or only the selected elements.
307-
n if n == N || n == selection.true_count() => {
308-
// If the kernel added N elements, the output is in-place.
309-
self.ctx.replace_output(vector_id, out_vector);
310-
}
311-
_ => vortex_bail!(
312-
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
313-
N,
314-
selection.true_count(),
315-
out_vector.len()
316-
),
317-
}
313+
let len = out_vector.len();
314+
vortex_ensure!(
315+
len == N || len == selection.true_count(),
316+
"Kernel produced incorrect number of output elements, \
317+
expected either {N} or {}, got {len}",
318+
selection.true_count(),
319+
);
320+
321+
// If the kernel added N elements, the output is in-place.
322+
self.ctx.replace_output(vector_id, out_vector);
318323
}
319324
};
320325
}

0 commit comments

Comments
 (0)