Skip to content

Commit 8d02407

Browse files
authored
perf: use filter instead of take in Patches::filter (#2093)
Replace usage of `take` with `filter` in `Patches::filter`. Take performs a per-element binary search over `patch_indices`, which is unnecessary can can just be performed using a filter after building the sorted filter mask. Benchmark results before/after change (with `-C target-cpu=native` applied): **BEFORE** ``` filter_then_canonical/0.001 time: [18.591 µs 18.646 µs 18.703 µs] change: [-2.2719% -1.8183% -1.3886%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe filter_then_canonical/0.01 time: [45.116 µs 45.287 µs 45.469 µs] change: [+0.9772% +1.3910% +1.8349%] (p = 0.00 < 0.05) Change within noise threshold. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild filter_then_canonical/0.1 time: [55.525 µs 55.895 µs 56.334 µs] change: [-9.1744% -8.2913% -7.4359%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 4 (4.00%) high mild filter_then_canonical/0.5 time: [167.36 µs 168.97 µs 170.64 µs] change: [-6.8090% -5.6899% -4.6273%] (p = 0.00 < 0.05) Performance has improved. filter_then_canonical/0.9 time: [267.43 µs 274.14 µs 280.44 µs] change: [-5.1936% -0.6966% +3.0989%] (p = 0.77 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild filter_then_canonical/0.99 time: [218.80 µs 222.99 µs 227.39 µs] change: [-12.117% -9.8998% -7.5168%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild filter_then_canonical/0.999 time: [235.28 µs 242.05 µs 249.06 µs] change: [+7.4971% +11.002% +14.803%] (p = 0.00 < 0.05) Performance has regressed. filter_then_canonical/1 time: [44.103 µs 44.969 µs 45.811 µs] change: [+1.1924% +2.5506% +4.0096%] (p = 0.00 < 0.05) Performance has regressed. Found 16 outliers among 100 measurements (16.00%) 6 (6.00%) high mild 10 (10.00%) high severe canonical_then_filter/0.001 time: [42.622 µs 42.801 µs 43.047 µs] change: [+2.8863% +3.8982% +5.2951%] (p = 0.00 < 0.05) Performance has regressed. Found 12 outliers among 100 measurements (12.00%) 12 (12.00%) high mild canonical_then_filter/0.01 time: [45.406 µs 46.116 µs 46.893 µs] change: [+7.4139% +8.7784% +10.114%] (p = 0.00 < 0.05) Performance has regressed. canonical_then_filter/0.1 time: [55.605 µs 57.323 µs 59.165 µs] change: [-3.8630% -1.9663% -0.0013%] (p = 0.06 > 0.05) No change in performance detected. Found 13 outliers among 100 measurements (13.00%) 12 (12.00%) low mild 1 (1.00%) high mild canonical_then_filter/0.5 time: [58.457 µs 59.673 µs 61.200 µs] change: [-3.9607% -1.5104% +0.6748%] (p = 0.23 > 0.05) No change in performance detected. canonical_then_filter/0.9 time: [101.31 µs 102.97 µs 104.51 µs] change: [-5.6743% -4.0261% -2.4754%] (p = 0.00 < 0.05) Performance has improved. canonical_then_filter/0.99 time: [71.770 µs 72.499 µs 73.223 µs] change: [+12.322% +14.444% +16.977%] (p = 0.00 < 0.05) Performance has regressed. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) low mild 1 (1.00%) high mild 2 (2.00%) high severe canonical_then_filter/0.999 time: [70.196 µs 71.040 µs 71.869 µs] change: [+0.7287% +1.7791% +2.9515%] (p = 0.00 < 0.05) Change within noise threshold. Found 7 outliers among 100 measurements (7.00%) 2 (2.00%) low mild 4 (4.00%) high mild 1 (1.00%) high severe canonical_then_filter/1 time: [46.668 µs 47.762 µs 48.889 µs] change: [+0.7750% +3.1566% +5.6672%] (p = 0.01 < 0.05) Change within noise threshold. ``` **AFTER** ``` filter_then_canonical/0.001 time: [14.971 µs 15.118 µs 15.310 µs] change: [-20.044% -19.194% -18.225%] (p = 0.00 < 0.05) Performance has improved. Found 13 outliers among 100 measurements (13.00%) 8 (8.00%) high mild 5 (5.00%) high severe filter_then_canonical/0.01 time: [25.974 µs 26.012 µs 26.053 µs] change: [-42.546% -42.237% -41.891%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 8 (8.00%) high mild 4 (4.00%) high severe filter_then_canonical/0.1 time: [59.520 µs 59.888 µs 60.260 µs] change: [+6.8618% +7.8890% +8.9357%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild filter_then_canonical/0.5 time: [167.38 µs 169.59 µs 171.66 µs] change: [-2.4051% -1.4161% -0.3761%] (p = 0.01 < 0.05) Change within noise threshold. Found 29 outliers among 100 measurements (29.00%) 8 (8.00%) low severe 2 (2.00%) low mild 3 (3.00%) high mild 16 (16.00%) high severe filter_then_canonical/0.9 time: [258.88 µs 259.30 µs 259.76 µs] change: [-5.1144% -3.5762% -2.0444%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe filter_then_canonical/0.99 time: [187.55 µs 189.52 µs 191.52 µs] change: [-13.451% -11.806% -10.251%] (p = 0.00 < 0.05) Performance has improved. filter_then_canonical/0.999 time: [179.95 µs 181.57 µs 182.99 µs] change: [-32.654% -30.839% -28.982%] (p = 0.00 < 0.05) Performance has improved. filter_then_canonical/1 time: [42.540 µs 42.615 µs 42.681 µs] change: [-3.6925% -2.5866% -1.5384%] (p = 0.00 < 0.05) Performance has improved. canonical_then_filter/0.001 time: [42.678 µs 42.759 µs 42.828 µs] change: [-4.6690% -3.5527% -2.5030%] (p = 0.00 < 0.05) Performance has improved. canonical_then_filter/0.01 time: [47.107 µs 47.695 µs 48.234 µs] change: [+2.0595% +3.5847% +5.1306%] (p = 0.00 < 0.05) Performance has regressed. Found 26 outliers among 100 measurements (26.00%) 13 (13.00%) low severe 6 (6.00%) high mild 7 (7.00%) high severe canonical_then_filter/0.1 time: [61.105 µs 61.594 µs 62.052 µs] change: [-1.7992% +0.2192% +2.3952%] (p = 0.84 > 0.05) No change in performance detected. canonical_then_filter/0.5 time: [69.184 µs 69.549 µs 69.928 µs] change: [+2.6673% +5.1603% +7.8241%] (p = 0.00 < 0.05) Performance has regressed. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) low mild canonical_then_filter/0.9 time: [103.06 µs 103.93 µs 104.90 µs] change: [+4.4514% +6.2341% +7.9956%] (p = 0.00 < 0.05) Performance has regressed. canonical_then_filter/0.99 time: [68.937 µs 69.321 µs 69.680 µs] change: [-8.9882% -7.3397% -5.9331%] (p = 0.00 < 0.05) Performance has improved. Found 11 outliers among 100 measurements (11.00%) 9 (9.00%) low mild 2 (2.00%) high mild canonical_then_filter/0.999 time: [67.411 µs 67.657 µs 67.917 µs] change: [-5.0588% -4.0257% -2.9739%] (p = 0.00 < 0.05) Performance has improved. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild canonical_then_filter/1 time: [44.883 µs 45.997 µs 46.996 µs] change: [-4.4256% -2.0499% +0.2734%] (p = 0.09 > 0.05) No change in performance detected. ```
1 parent f17f32e commit 8d02407

File tree

5 files changed

+222
-28
lines changed

5 files changed

+222
-28
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,7 @@ debug = true
229229
[profile.benchtest]
230230
inherits = "bench"
231231
debug-assertions = true
232+
233+
[profile.samply]
234+
inherits = "release"
235+
debug = true

bench-vortex/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ tokio = { workspace = true, features = ["full"] }
6161
uuid = { workspace = true, features = ["v4"] }
6262
vortex = { workspace = true, features = ["object_store", "parquet"] }
6363
vortex-datafusion = { workspace = true }
64+
vortex-mask = { workspace = true }
6465
xshell = { workspace = true }
6566

6667
[dev-dependencies]
@@ -100,3 +101,8 @@ harness = false
100101
name = "clickbench"
101102
test = false
102103
harness = false
104+
105+
[[bench]]
106+
name = "sel_vec"
107+
test = false
108+
harness = false

bench-vortex/benches/sel_vec.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#![allow(unused_imports, unused, dead_code)]
2+
//! Various tests for the selection vector being present.
3+
4+
use criterion::{BenchmarkId, Criterion};
5+
use rand::Rng;
6+
use vortex::array::PrimitiveArray;
7+
use vortex::compute::filter;
8+
use vortex::dtype::{DType, Nullability, PType};
9+
use vortex::encoding::{ArrayEncodingRef, Encoding};
10+
use vortex::encodings::alp::{ALPArray, ALPEncoding};
11+
use vortex::sampling_compressor::compressors::alp::ALPCompressor;
12+
use vortex::sampling_compressor::compressors::bitpacked::{
13+
BitPackedCompressor, BITPACK_NO_PATCHES, BITPACK_WITH_PATCHES,
14+
};
15+
use vortex::sampling_compressor::compressors::r#for::FoRCompressor;
16+
use vortex::sampling_compressor::compressors::EncodingCompressor;
17+
use vortex::sampling_compressor::SamplingCompressor;
18+
use vortex::variants::PrimitiveArrayTrait;
19+
use vortex::{ArrayData, IntoArrayData, IntoCanonical};
20+
use vortex_mask::Mask;
21+
22+
// criterion benchmark setup:
23+
fn bench_sel_vec(c: &mut Criterion) {
24+
let mut group = c.benchmark_group("filter_then_canonical");
25+
26+
// Run ALP + BitPacking.
27+
let compressor = SamplingCompressor::default().including_only(&[
28+
&ALPCompressor as &dyn EncodingCompressor,
29+
&BITPACK_NO_PATCHES,
30+
// &FoRCompressor,
31+
]);
32+
33+
// Create a low-precision primitive array of f64
34+
let arr = PrimitiveArray::from_iter((0..=65535).map(|x| (x as f64) * 0.2f64));
35+
assert_eq!(arr.ptype(), PType::F64);
36+
37+
let arr = compressor
38+
.compress(&arr.into_array(), None)
39+
.unwrap()
40+
.into_array();
41+
assert_eq!(arr.encoding().id(), ALPEncoding::ID);
42+
43+
println!("tree: {}", arr.tree_display());
44+
45+
// Try for various mask
46+
let max = 65536;
47+
for selectivity in [0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999, 1.0] {
48+
// Create a random mask of the given size
49+
let true_count = (selectivity * max as f64) as usize;
50+
// Create a randomized mask with the correct length and true_count.
51+
let mask = create_mask(max, true_count);
52+
assert_eq!(mask.len(), max);
53+
assert_eq!(mask.true_count(), true_count);
54+
group.bench_with_input(
55+
BenchmarkId::from_parameter(selectivity),
56+
&mask,
57+
|b, mask| {
58+
// Filter then into_canonical
59+
b.iter(|| filter_then_canonical(&arr, mask))
60+
},
61+
);
62+
}
63+
group.finish();
64+
65+
let mut group = c.benchmark_group("canonical_then_filter");
66+
for selectivity in [0.001, 0.01, 0.1, 0.5, 0.9, 0.99, 0.999, 1.0] {
67+
// Create a random mask of the given size
68+
let true_count = (selectivity * max as f64) as usize;
69+
// Create a randomized mask with the correct length and true_count.
70+
let mask = create_mask(max, true_count);
71+
group.bench_with_input(
72+
BenchmarkId::from_parameter(selectivity),
73+
&mask,
74+
|b, mask| {
75+
// Filter then into_canonical
76+
b.iter(|| canonical_then_filter(&arr, mask))
77+
},
78+
);
79+
}
80+
group.finish();
81+
}
82+
83+
fn filter_then_canonical(array: &ArrayData, mask: &Mask) -> ArrayData {
84+
let filtered = filter(array, mask).unwrap();
85+
filtered.into_canonical().unwrap().into_array()
86+
}
87+
88+
fn canonical_then_filter(array: &ArrayData, mask: &Mask) -> ArrayData {
89+
let canonical = array.clone().into_canonical().unwrap().into_array();
90+
filter(&canonical, mask).unwrap()
91+
}
92+
93+
fn create_mask(len: usize, true_count: usize) -> Mask {
94+
let mut mask = vec![false; len];
95+
// randomly distribute true_count true values
96+
let mut rng = rand::thread_rng();
97+
let mut set = 0;
98+
while set < true_count {
99+
let index = rng.gen_range(0..len);
100+
if !mask[index] {
101+
mask[index] = true;
102+
set += 1;
103+
}
104+
}
105+
Mask::from_iter(mask)
106+
}
107+
108+
criterion::criterion_group!(sel_vec, bench_sel_vec);
109+
criterion::criterion_main!(sel_vec);

vortex-array/src/patches.rs

Lines changed: 102 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1+
use std::cmp::Ordering;
12
use std::fmt::Debug;
23

34
use itertools::Itertools as _;
5+
use num_traits::{AsPrimitive, NumCast, ToPrimitive};
46
use serde::{Deserialize, Serialize};
57
use vortex_buffer::BufferMut;
68
use vortex_dtype::Nullability::NonNullable;
7-
use vortex_dtype::{match_each_integer_ptype, DType, PType};
9+
use vortex_dtype::{match_each_integer_ptype, DType, NativePType, PType};
810
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
911
use vortex_mask::Mask;
1012
use vortex_scalar::Scalar;
1113

1214
use crate::aliases::hash_map::HashMap;
1315
use crate::array::PrimitiveArray;
1416
use crate::compute::{
15-
scalar_at, search_sorted, search_sorted_usize, search_sorted_usize_many, slice, sub_scalar,
16-
take, SearchResult, SearchSortedSide,
17+
filter, scalar_at, search_sorted, search_sorted_usize, search_sorted_usize_many, slice,
18+
sub_scalar, take, SearchResult, SearchSortedSide,
1719
};
1820
use crate::stats::{ArrayStatistics, Stat};
1921
use crate::variants::PrimitiveArrayTrait;
@@ -212,33 +214,14 @@ impl Patches {
212214
return Ok(None);
213215
}
214216

215-
// TODO(ngates): add functions to operate with Mask directly
216-
let buffer = mask.boolean_buffer();
217-
let mut coordinate_indices = BufferMut::<u64>::empty();
218-
let mut value_indices = BufferMut::<u64>::empty();
219-
let mut last_inserted_index: usize = 0;
220-
221217
let flat_indices = self.indices().clone().into_primitive()?;
222218
match_each_integer_ptype!(flat_indices.ptype(), |$I| {
223-
for (value_idx, coordinate) in flat_indices.as_slice::<$I>().iter().enumerate() {
224-
if buffer.value(*coordinate as usize) {
225-
// We count the number of truthy values between this coordinate and the previous truthy one
226-
let adjusted_coordinate = buffer.slice(last_inserted_index, (*coordinate as usize) - last_inserted_index).count_set_bits() as u64;
227-
coordinate_indices.push(adjusted_coordinate + coordinate_indices.last().copied().unwrap_or_default());
228-
last_inserted_index = *coordinate as usize;
229-
value_indices.push(value_idx as u64);
230-
}
231-
}
232-
});
233-
234-
if coordinate_indices.is_empty() {
235-
return Ok(None);
236-
}
237-
238-
let indices = coordinate_indices.into_array();
239-
let values = take(self.values(), value_indices.into_array())?;
240-
241-
Ok(Some(Self::new(mask.len(), indices, values)))
219+
filter_patches_with_mask(
220+
flat_indices.as_slice::<$I>(),
221+
self.values(),
222+
mask
223+
)
224+
})
242225
}
243226

244227
/// Slice the patches by a range of the patched array.
@@ -396,6 +379,97 @@ impl Patches {
396379
}
397380
}
398381

382+
/// Filter patches with the provided mask (in flattened space).
383+
///
384+
/// The filter mask may contain indices that are non-patched. The return value of this function
385+
/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
386+
/// patch values.
387+
fn filter_patches_with_mask<T: ToPrimitive + Copy + Ord>(
388+
patch_indices: &[T],
389+
patch_values: &ArrayData,
390+
mask: &Mask,
391+
) -> VortexResult<Option<Patches>> {
392+
let mut new_patch_indices = BufferMut::<u64>::with_capacity(mask.true_count());
393+
let mut new_mask_indices = Vec::with_capacity(mask.true_count());
394+
395+
// Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
396+
// the patches are relatively sparse compared to the overall mask, and so many indices in the
397+
// mask will end up being skipped.
398+
const STRIDE: usize = 4;
399+
400+
let mut mask_idx = 0usize;
401+
let mut true_idx = 0usize;
402+
403+
let mask_indices = mask.indices();
404+
405+
while mask_idx < patch_indices.len() && true_idx < mask.true_count() {
406+
// NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
407+
// and `mask_indices`. We assume that Patches are sparse relative to the global space of
408+
// the mask (which covers both patch and non-patch values of the parent array), and so to
409+
// quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
410+
// elements on each iteration. If we cannot rule out overlap due to min/max values, we
411+
// fallback to performing a two-way iterator merge.
412+
if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
413+
// Load a vector of each into our registers.
414+
let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min");
415+
let left_max = patch_indices[mask_idx + STRIDE]
416+
.to_usize()
417+
.vortex_expect("left_max");
418+
let right_min = mask_indices[true_idx];
419+
let right_max = mask_indices[true_idx + STRIDE];
420+
421+
if left_min > right_max {
422+
// Advance right side
423+
true_idx += STRIDE;
424+
continue;
425+
} else if right_min > left_max {
426+
mask_idx += STRIDE;
427+
continue;
428+
} else {
429+
// Fallthrough to direct comparison path.
430+
}
431+
}
432+
433+
// Two-way sorted iterator merge:
434+
435+
let left = patch_indices[mask_idx].to_usize().vortex_expect("left");
436+
let right = mask_indices[true_idx];
437+
438+
match left.cmp(&right) {
439+
Ordering::Less => {
440+
mask_idx += 1;
441+
}
442+
Ordering::Greater => {
443+
true_idx += 1;
444+
}
445+
Ordering::Equal => {
446+
// Save the mask index as well as the positional index.
447+
new_mask_indices.push(mask_idx);
448+
new_patch_indices.push(true_idx as u64);
449+
450+
mask_idx += 1;
451+
true_idx += 1;
452+
}
453+
}
454+
}
455+
456+
if new_mask_indices.is_empty() {
457+
return Ok(None);
458+
}
459+
460+
let new_patch_indices = new_patch_indices.into_array();
461+
let new_patch_values = filter(
462+
patch_values,
463+
&Mask::from_indices(patch_values.len(), new_mask_indices),
464+
)?;
465+
466+
Ok(Some(Patches::new(
467+
mask.true_count(),
468+
new_patch_indices,
469+
new_patch_values,
470+
)))
471+
}
472+
399473
#[cfg(test)]
400474
mod test {
401475
use rstest::{fixture, rstest};

0 commit comments

Comments
 (0)