Skip to content

Commit ca17b75

Browse files
committed
wip
1 parent bc30a80 commit ca17b75

File tree

4 files changed

+78
-77
lines changed

4 files changed

+78
-77
lines changed

bench-vortex/benches/clickbench.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#![feature(exit_status_error)]
2+
3+
use std::path::PathBuf;
4+
use std::process::Command;
5+
6+
use bench_vortex::clickbench::{clickbench_queries, HITS_SCHEMA};
7+
use bench_vortex::{clickbench, execute_query, get_session_with_cache, idempotent, IdempotentPath};
8+
use criterion::{criterion_group, criterion_main, Criterion};
9+
use tokio::runtime::Builder;
10+
11+
fn benchmark(c: &mut Criterion) {
12+
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
13+
let basepath = "clickbench".to_data_path();
14+
15+
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
16+
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
17+
for idx in 0..100 {
18+
let output_path = basepath.join(format!("hits_{idx}.parquet"));
19+
idempotent(&output_path, |output_path| {
20+
eprintln!("Fixing parquet file {idx}");
21+
let home = std::env::var("HOME").unwrap_or_else(|_| "/home/ci-runner".to_string());
22+
let command = format!(
23+
"
24+
SET home_directory='{home}';
25+
INSTALL HTTPFS;
26+
COPY (SELECT * REPLACE
27+
(epoch_ms(EventTime * 1000) AS EventTime, \
28+
epoch_ms(ClientEventTime * 1000) AS ClientEventTime, \
29+
epoch_ms(LocalEventTime * 1000) AS LocalEventTime, \
30+
DATE '1970-01-01' + INTERVAL (EventDate) DAYS AS EventDate) \
31+
FROM read_parquet('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{idx}.parquet', binary_as_string=True)) TO '{}' (FORMAT 'parquet');",
32+
output_path.to_str().unwrap()
33+
);
34+
Command::new("duckdb")
35+
.arg("-c")
36+
.arg(command)
37+
.status()?
38+
.exit_ok()?;
39+
40+
anyhow::Ok(PathBuf::from(output_path))
41+
})
42+
.unwrap();
43+
}
44+
45+
let session_context = get_session_with_cache();
46+
let context = session_context.clone();
47+
48+
runtime.block_on(async move {
49+
clickbench::register_vortex_files(context, "hits", basepath.as_path(), &HITS_SCHEMA)
50+
.await
51+
.unwrap();
52+
});
53+
54+
let mut group = c.benchmark_group("clickbench");
55+
56+
for (idx, query) in clickbench_queries().into_iter() {
57+
let context = session_context.clone();
58+
group.bench_function(format!("q-{:02}", idx), |b| {
59+
b.to_async(&runtime)
60+
.iter(|| async { execute_query(&context, &query).await.unwrap() });
61+
});
62+
}
63+
}
64+
65+
criterion_group!(
66+
name = benches;
67+
config = Criterion::default().sample_size(10);
68+
targets = benchmark
69+
);
70+
criterion_main!(benches);

encodings/alp/src/alp/compress.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ where
5353
let values_slice = values.as_slice::<T>();
5454

5555
let exponents = T::find_best_exponents(values_slice);
56-
let (encoded, exceptional_positions) = T::chunked_encode(values.as_slice::<T>(), exponents);
56+
let (encoded, exceptional_positions) = T::encode_chunkwise(values.as_slice::<T>(), exponents);
5757

5858
let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array();
59+
5960
let validity = values.logical_validity()?;
6061
let n_valid = validity.true_count();
6162
let exceptional_positions = if n_valid == 0 {

encodings/alp/src/alp/mod.rs

Lines changed: 1 addition & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static {
119119
/// Unlike [Self::encode], this operation processes no more than [Self::ENCODE_CHUNK_SIZE]
120120
/// elements at once which can make better use of the L1 cache because [Self::encode] makes two
121121
/// passes over `values`: first to encode and second to extract the exceptional values.
122-
fn chunked_encode(
122+
fn encode_chunkwise(
123123
values: &[Self],
124124
exponents: Exponents,
125125
) -> (Buffer<Self::ALPInt>, Buffer<u64>) {
@@ -209,79 +209,6 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static {
209209
}
210210
}
211211

212-
#[allow(clippy::cast_possible_truncation)]
213-
fn _encode_chunk_unchecked<T: ALPFloat>(
214-
chunk: &[T],
215-
exp: Exponents,
216-
encoded_output: &mut BufferMut<T::ALPInt>,
217-
patch_indices: &mut BufferMut<u64>,
218-
patch_values: &mut BufferMut<T>,
219-
fill_value: &mut Option<T::ALPInt>,
220-
) {
221-
let num_prev_encoded = encoded_output.len();
222-
let num_prev_patches = patch_indices.len();
223-
assert_eq!(patch_indices.len(), patch_values.len());
224-
let has_filled = fill_value.is_some();
225-
226-
// encode the chunk, counting the number of patches
227-
let mut chunk_patch_count = 0;
228-
encoded_output.extend(chunk.iter().map(|v| {
229-
let encoded = unsafe { T::encode_single_unchecked(*v, exp) };
230-
let decoded = T::decode_single(encoded, exp);
231-
let neq = (decoded != *v) as usize;
232-
chunk_patch_count += neq;
233-
encoded
234-
}));
235-
let chunk_patch_count = chunk_patch_count; // immutable hereafter
236-
assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len());
237-
238-
if chunk_patch_count > 0 {
239-
// we need to gather the patches for this chunk
240-
// preallocate space for the patches (plus one because our loop may attempt to write one past the end)
241-
patch_indices.reserve(chunk_patch_count + 1);
242-
patch_values.reserve(chunk_patch_count + 1);
243-
244-
// record the patches in this chunk
245-
let patch_indices_mut = patch_indices.spare_capacity_mut();
246-
let patch_values_mut = patch_values.spare_capacity_mut();
247-
let mut chunk_patch_index = 0;
248-
for i in num_prev_encoded..encoded_output.len() {
249-
let decoded = T::decode_single(encoded_output[i], exp);
250-
// write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op)
251-
patch_indices_mut[chunk_patch_index].write(i as u64);
252-
patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]);
253-
chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize;
254-
}
255-
assert_eq!(chunk_patch_index, chunk_patch_count);
256-
unsafe {
257-
patch_indices.set_len(num_prev_patches + chunk_patch_count);
258-
patch_values.set_len(num_prev_patches + chunk_patch_count);
259-
}
260-
}
261-
262-
// find the first successfully encoded value (i.e., not patched)
263-
// this is our fill value for missing values
264-
if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) {
265-
assert_eq!(num_prev_encoded, num_prev_patches);
266-
for i in num_prev_encoded..encoded_output.len() {
267-
if i >= patch_indices.len() || patch_indices[i] != i as u64 {
268-
*fill_value = Some(encoded_output[i]);
269-
break;
270-
}
271-
}
272-
}
273-
274-
// replace the patched values in the encoded array with the fill value
275-
// for better downstream compression
276-
if let Some(fill_value) = fill_value {
277-
// handle the edge case where the first N >= 1 chunks are all patches
278-
let start_patch = if !has_filled { 0 } else { num_prev_patches };
279-
for patch_idx in &patch_indices[start_patch..] {
280-
encoded_output[*patch_idx as usize] = *fill_value;
281-
}
282-
}
283-
}
284-
285212
impl ALPFloat for f32 {
286213
type ALPInt = i32;
287214
const FRACTIONAL_BITS: u8 = 23;

vortex-sampling-compressor/src/compressors/alp.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use vortex_alp::{alp_encode_components, ALPArray, ALPEncoding, ALPRDEncoding};
22
use vortex_array::aliases::hash_set::HashSet;
33
use vortex_array::array::PrimitiveArray;
4+
use vortex_array::compute::fill_null;
45
use vortex_array::variants::PrimitiveArrayTrait;
56
use vortex_array::{Array, Encoding, EncodingId, IntoArray, IntoArrayVariant};
67
use vortex_dtype::PType;
78
use vortex_error::VortexResult;
89
use vortex_fastlanes::BitPackedEncoding;
10+
use vortex_scalar::Scalar;
911

1012
use super::alp_rd::ALPRDCompressor;
1113
use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
@@ -41,8 +43,9 @@ impl EncodingCompressor for ALPCompressor {
4143
like: Option<CompressionTree<'a>>,
4244
ctx: SamplingCompressor<'a>,
4345
) -> VortexResult<CompressedArray<'a>> {
44-
let (exponents, encoded, patches) =
45-
alp_encode_components(&array.clone().into_primitive()?)?;
46+
let nulls_zeroed =
47+
fill_null(array, Scalar::from(0.0).cast(array.dtype())?)?.into_primitive()?;
48+
let (exponents, encoded, patches) = alp_encode_components(&nulls_zeroed)?;
4649

4750
let compressed_encoded = ctx
4851
.named("packed")

0 commit comments

Comments
 (0)