Skip to content

Commit 997a96d

Browse files
committed
feat: pipeline-pcodec
Signed-off-by: Alexander Droste <[email protected]>
1 parent 1d5d34a commit 997a96d

File tree

7 files changed

+501
-5
lines changed

7 files changed

+501
-5
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/pco/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,20 @@ pco = { workspace = true }
2222
prost = { workspace = true }
2323
vortex-array = { workspace = true }
2424
vortex-buffer = { workspace = true }
25+
vortex-compute = { workspace = true }
2526
vortex-dtype = { workspace = true }
2627
vortex-error = { workspace = true }
2728
vortex-mask = { workspace = true }
2829
vortex-scalar = { workspace = true }
30+
vortex-vector = { workspace = true }
2931

3032
[dev-dependencies]
33+
divan = { workspace = true }
34+
mimalloc = { workspace = true }
35+
rand = { workspace = true }
3136
rstest = { workspace = true }
3237
vortex-array = { workspace = true, features = ["test-harness"] }
38+
39+
[[bench]]
40+
name = "pco"
41+
harness = false

encodings/pco/benches/pco.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![allow(clippy::unwrap_used)]
5+
6+
use divan::Bencher;
7+
use mimalloc::MiMalloc;
8+
use rand::prelude::StdRng;
9+
use rand::{Rng, SeedableRng};
10+
use vortex_array::compute::{filter, warm_up_vtables};
11+
use vortex_array::{IntoArray, ToCanonical};
12+
use vortex_buffer::{BitBuffer, BufferMut};
13+
use vortex_mask::Mask;
14+
use vortex_pco::PcoArray;
15+
16+
#[global_allocator]
17+
static GLOBAL: MiMalloc = MiMalloc;
18+
19+
pub fn main() {
20+
warm_up_vtables();
21+
divan::main();
22+
}
23+
24+
#[divan::bench(args = [
25+
(10_000, 0.1),
26+
(10_000, 0.5),
27+
(10_000, 0.9),
28+
(10_000, 1.0),
29+
(50_000, 0.1),
30+
(50_000, 0.5),
31+
(50_000, 0.9),
32+
(50_000, 1.0),
33+
(100_000, 0.1),
34+
(100_000, 0.5),
35+
(100_000, 0.9),
36+
(100_000, 1.0)]
37+
)]
38+
pub fn pco_pipeline(bencher: Bencher, (size, selectivity): (usize, f64)) {
39+
let mut rng = StdRng::seed_from_u64(42);
40+
let values = (0..size)
41+
.map(|i| (i % 10000) as i32)
42+
.collect::<BufferMut<i32>>()
43+
.into_array()
44+
.to_primitive();
45+
46+
let pco_array = PcoArray::from_primitive(&values, 3, 0).unwrap();
47+
let mask = (0..size)
48+
.map(|_| rng.random_bool(selectivity))
49+
.collect::<BitBuffer>();
50+
51+
bencher
52+
.with_inputs(|| Mask::from_buffer(mask.clone()))
53+
.bench_local_values(|mask| pco_array.execute_with_selection(&mask).unwrap());
54+
}
55+
56+
#[divan::bench(args = [
57+
(10_000, 0.1),
58+
(10_000, 0.5),
59+
(10_000, 0.9),
60+
(10_000, 1.0),
61+
(50_000, 0.1),
62+
(50_000, 0.5),
63+
(50_000, 0.9),
64+
(50_000, 1.0),
65+
(100_000, 0.1),
66+
(100_000, 0.5),
67+
(100_000, 0.9),
68+
(100_000, 1.0)]
69+
)]
70+
pub fn pco_canonical(bencher: Bencher, (size, selectivity): (usize, f64)) {
71+
let mut rng = StdRng::seed_from_u64(42);
72+
let values = (0..size)
73+
.map(|i| (i % 10000) as i32)
74+
.collect::<BufferMut<i32>>()
75+
.into_array()
76+
.to_primitive();
77+
78+
let pco_array = PcoArray::from_primitive(&values, 3, 0).unwrap();
79+
let mask = (0..size)
80+
.map(|_| rng.random_bool(selectivity))
81+
.collect::<BitBuffer>();
82+
83+
bencher
84+
.with_inputs(|| Mask::from_buffer(mask.clone()))
85+
.bench_values(|mask| filter(pco_array.to_canonical().as_ref(), &mask).unwrap());
86+
}

encodings/pco/src/array.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ use pco::{ChunkConfig, PagingSpec, match_number_enum};
1313
use prost::Message;
1414
use vortex_array::arrays::{PrimitiveArray, PrimitiveVTable};
1515
use vortex_array::compute::filter;
16+
use vortex_array::pipeline::PipelinedNode;
1617
use vortex_array::serde::ArrayChildren;
1718
use vortex_array::stats::{ArrayStats, StatsSetRef};
1819
use vortex_array::validity::Validity;
1920
use vortex_array::vtable::{
20-
ArrayVTable, CanonicalVTable, EncodeVTable, NotSupported, OperationsVTable, VTable,
21-
ValidityHelper, ValiditySliceHelper, ValidityVTableFromValiditySliceHelper, VisitorVTable,
21+
ArrayVTable, CanonicalVTable, EncodeVTable, NotSupported, OperationsVTable, OperatorVTable,
22+
VTable, ValidityHelper, ValiditySliceHelper, ValidityVTableFromValiditySliceHelper,
23+
VisitorVTable,
2224
};
2325
use vortex_array::{
2426
ArrayBufferVisitor, ArrayChildVisitor, ArrayEq, ArrayHash, ArrayRef, Canonical, EncodingId,
@@ -67,7 +69,7 @@ impl VTable for PcoVTable {
6769
type VisitorVTable = Self;
6870
type ComputeVTable = NotSupported;
6971
type EncodeVTable = Self;
70-
type OperatorVTable = NotSupported;
72+
type OperatorVTable = Self;
7173

7274
fn id(_encoding: &Self::Encoding) -> EncodingId {
7375
EncodingId::new_ref("vortex.pco")
@@ -129,7 +131,7 @@ impl VTable for PcoVTable {
129131
}
130132
}
131133

132-
fn number_type_from_dtype(dtype: &DType) -> NumberType {
134+
pub(crate) fn number_type_from_dtype(dtype: &DType) -> NumberType {
133135
let ptype = dtype.as_ptype();
134136
match ptype {
135137
PType::F16 => NumberType::F16,
@@ -150,7 +152,7 @@ fn collect_valid(parray: &PrimitiveArray) -> VortexResult<PrimitiveArray> {
150152
Ok(filter(&parray.to_array(), &mask)?.to_primitive())
151153
}
152154

153-
fn vortex_err_from_pco(err: PcoError) -> VortexError {
155+
pub(crate) fn vortex_err_from_pco(err: PcoError) -> VortexError {
154156
use pco::errors::ErrorKind::*;
155157
match err.kind {
156158
Io(io_kind) => VortexError::from(std::io::Error::new(io_kind, err.message)),
@@ -509,6 +511,12 @@ impl VisitorVTable<PcoVTable> for PcoVTable {
509511
}
510512
}
511513

514+
impl OperatorVTable<PcoVTable> for PcoVTable {
515+
fn pipeline_node(array: &PcoArray) -> Option<&dyn PipelinedNode> {
516+
Some(array)
517+
}
518+
}
519+
512520
#[cfg(test)]
513521
mod tests {
514522
use vortex_array::arrays::PrimitiveArray;

encodings/pco/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
mod array;
55
mod compute;
6+
mod pipeline;
67
#[cfg(test)]
78
mod test;
89

0 commit comments

Comments
 (0)