Skip to content

Commit c8f9e07

Browse files
committed
Restore lance compress benchmarks
Signed-off-by: Adam Gutglick <[email protected]>
1 parent 6f360de commit c8f9e07

File tree

18 files changed

+306
-135
lines changed

18 files changed

+306
-135
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ jobs:
530530
- name: Rust Tests (Windows)
531531
if: matrix.os == 'windows-x64'
532532
run: |
533-
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench
533+
cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-python --exclude vortex-duckdb --exclude vortex-fuzz --exclude duckdb-bench --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench --exclude compress-bench
534534
- name: Rust Tests (Other)
535535
if: matrix.os != 'windows-x64'
536536
run: cargo nextest run --locked --workspace --all-features --no-fail-fast --exclude vortex-bench --exclude vortex-duckdb

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/compress-bench/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ clap = { workspace = true, features = ["derive"] }
2424
futures = { workspace = true }
2525
indicatif = { workspace = true }
2626
itertools = { workspace = true }
27+
lance-bench = { path = "../lance-bench", optional = true }
2728
parquet = { workspace = true }
2829
regex = { workspace = true }
2930
serde = { workspace = true }
@@ -32,5 +33,8 @@ tracing = { workspace = true }
3233
vortex = { workspace = true }
3334
vortex-bench = { workspace = true }
3435

36+
[features]
37+
lance = ["dep:lance-bench"]
38+
3539
[lints]
3640
workspace = true

benchmarks/compress-bench/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use ::vortex::array::arrays::recursive_list_from_list_view;
88
use arrow_array::RecordBatch;
99
use arrow_schema::Schema;
1010

11-
pub mod bench;
1211
pub mod parquet;
1312
pub mod vortex;
1413

benchmarks/compress-bench/src/main.rs

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,22 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use clap::Parser;
8-
use compress_bench::bench::CompressMeasurements;
9-
use compress_bench::bench::CompressOp;
10-
use compress_bench::bench::Compressor;
11-
use compress_bench::bench::benchmark_compress;
12-
use compress_bench::bench::benchmark_decompress;
13-
use compress_bench::bench::calculate_ratios;
148
use compress_bench::parquet::ParquetCompressor;
159
use compress_bench::vortex::VortexCompressor;
1610
use indicatif::ProgressBar;
1711
use itertools::Itertools;
1812
use regex::Regex;
19-
use vortex::array::IntoArray;
20-
use vortex::array::arrays::ChunkedArray;
21-
use vortex::array::arrays::ChunkedVTable;
22-
use vortex::array::builders::builder_with_capacity;
2313
use vortex::utils::aliases::hash_map::HashMap;
2414
use vortex_bench::BenchmarkOutput;
2515
use vortex_bench::Engine;
2616
use vortex_bench::Format;
2717
use vortex_bench::Target;
18+
use vortex_bench::compress::CompressMeasurements;
19+
use vortex_bench::compress::CompressOp;
20+
use vortex_bench::compress::Compressor;
21+
use vortex_bench::compress::benchmark_compress;
22+
use vortex_bench::compress::benchmark_decompress;
23+
use vortex_bench::compress::calculate_ratios;
2824
use vortex_bench::datasets::Dataset;
2925
use vortex_bench::datasets::struct_list_of_ints::StructListOfInts;
3026
use vortex_bench::datasets::taxi_data::TaxiData;
@@ -95,6 +91,8 @@ fn get_compressor(format: Format) -> Box<dyn Compressor> {
9591
match format {
9692
Format::OnDiskVortex => Box::new(VortexCompressor),
9793
Format::Parquet => Box::new(ParquetCompressor::new()),
94+
#[cfg(feature = "lance")]
95+
Format::Lance => todo!(),
9896
_ => unimplemented!("Compress bench not implemented for {format}"),
9997
}
10098
}
@@ -202,19 +200,8 @@ async fn run_benchmark_for_dataset(
202200
let bench_name = dataset_handle.name();
203201
tracing::info!("Running {bench_name} benchmark");
204202

205-
let vx_array = dataset_handle.to_vortex_array().await?;
206-
let uncompressed = ChunkedArray::from_iter(
207-
vx_array
208-
.as_::<ChunkedVTable>()
209-
.chunks()
210-
.iter()
211-
.map(|chunk| {
212-
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
213-
chunk.append_to_builder(builder.as_mut());
214-
builder.finish()
215-
}),
216-
)
217-
.into_array();
203+
// Get the parquet file path for this dataset
204+
let parquet_path = dataset_handle.to_parquet_path().await?;
218205

219206
let mut ratios = Vec::new();
220207
let mut timings = Vec::new();
@@ -229,7 +216,7 @@ async fn run_benchmark_for_dataset(
229216
CompressOp::Compress => {
230217
let result = benchmark_compress(
231218
compressor.as_ref(),
232-
&uncompressed,
219+
&parquet_path,
233220
iterations,
234221
bench_name,
235222
)
@@ -242,7 +229,7 @@ async fn run_benchmark_for_dataset(
242229
CompressOp::Decompress => {
243230
let result = benchmark_decompress(
244231
compressor.as_ref(),
245-
&uncompressed,
232+
&parquet_path,
246233
iterations,
247234
bench_name,
248235
)

benchmarks/compress-bench/src/parquet.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::fs::File;
45
use std::io::Cursor;
6+
use std::path::Path;
57
use std::sync::Arc;
68
use std::time::Duration;
79
use std::time::Instant;
@@ -15,12 +17,8 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1517
use parquet::basic::Compression;
1618
use parquet::basic::ZstdLevel;
1719
use parquet::file::properties::WriterProperties;
18-
use vortex::array::Array;
19-
use vortex::array::arrays::ChunkedVTable;
2020
use vortex_bench::Format;
21-
22-
use crate::bench::Compressor;
23-
use crate::chunked_to_vec_record_batch;
21+
use vortex_bench::compress::Compressor;
2422

2523
/// Compressor implementation for Parquet format with ZSTD compression.
2624
pub struct ParquetCompressor {
@@ -51,19 +49,35 @@ impl Compressor for ParquetCompressor {
5149
Format::Parquet
5250
}
5351

54-
async fn compress(&self, array: &dyn Array) -> anyhow::Result<(Bytes, Duration)> {
55-
let chunked = array.as_::<ChunkedVTable>().clone();
56-
let (batches, schema) = chunked_to_vec_record_batch(chunked)?;
52+
async fn compress(&self, parquet_path: &Path) -> anyhow::Result<(u64, Duration)> {
53+
// Read the input parquet file
54+
let file = File::open(parquet_path)?;
55+
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
56+
let schema = builder.schema().clone();
57+
let reader = builder.build()?;
58+
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
5759

60+
// Compress with our compression settings
5861
let mut buf = Vec::new();
5962
let start = Instant::now();
60-
parquet_compress_write(batches, schema, self.compression, &mut buf)?;
63+
let size = parquet_compress_write(batches, schema, self.compression, &mut buf)?;
6164
let elapsed = start.elapsed();
62-
Ok((Bytes::from(buf), elapsed))
65+
Ok((size as u64, elapsed))
6366
}
6467

65-
async fn decompress(&self, data: Bytes) -> anyhow::Result<usize> {
66-
parquet_decompress_read(data)
68+
async fn decompress(&self, parquet_path: &Path) -> anyhow::Result<usize> {
69+
// First compress to get the bytes we'll decompress
70+
let file = File::open(parquet_path)?;
71+
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
72+
let schema = builder.schema().clone();
73+
let reader = builder.build()?;
74+
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
75+
76+
let mut buf = Vec::new();
77+
parquet_compress_write(batches, schema, self.compression, &mut buf)?;
78+
79+
// Now decompress
80+
parquet_decompress_read(Bytes::from(buf))
6781
}
6882
}
6983

benchmarks/compress-bench/src/vortex.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::io::Cursor;
5+
use std::path::Path;
56
use std::sync::Arc;
67
use std::time::Duration;
78
use std::time::Instant;
@@ -11,13 +12,12 @@ use async_trait::async_trait;
1112
use bytes::Bytes;
1213
use futures::StreamExt;
1314
use futures::pin_mut;
14-
use vortex::array::Array;
1515
use vortex::file::OpenOptionsSessionExt;
1616
use vortex::file::WriteOptionsSessionExt;
1717
use vortex_bench::Format;
1818
use vortex_bench::SESSION;
19-
20-
use crate::bench::Compressor;
19+
use vortex_bench::compress::Compressor;
20+
use vortex_bench::conversions::parquet_to_vortex;
2121

2222
/// Compressor implementation for Vortex format.
2323
pub struct VortexCompressor;
@@ -28,15 +28,34 @@ impl Compressor for VortexCompressor {
2828
Format::OnDiskVortex
2929
}
3030

31-
async fn compress(&self, array: &dyn Array) -> Result<(Bytes, Duration)> {
31+
async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> {
32+
// Read the parquet file as an array stream
33+
let array_stream = parquet_to_vortex(parquet_path.to_path_buf())?;
34+
3235
let mut buf = Vec::new();
3336
let start = Instant::now();
34-
vortex_compress_write(array, &mut buf).await?;
37+
let mut cursor = Cursor::new(&mut buf);
38+
SESSION
39+
.write_options()
40+
.write(&mut cursor, array_stream)
41+
.await?;
3542
let elapsed = start.elapsed();
36-
Ok((Bytes::from(buf), elapsed))
43+
44+
Ok((buf.len() as u64, elapsed))
3745
}
3846

39-
async fn decompress(&self, data: Bytes) -> Result<usize> {
47+
async fn decompress(&self, parquet_path: &Path) -> Result<usize> {
48+
// First compress to get the bytes we'll decompress
49+
let array_stream = parquet_to_vortex(parquet_path.to_path_buf())?;
50+
let mut buf = Vec::new();
51+
let mut cursor = Cursor::new(&mut buf);
52+
SESSION
53+
.write_options()
54+
.write(&mut cursor, array_stream)
55+
.await?;
56+
57+
// Now decompress
58+
let data = Bytes::from(buf);
4059
let scan = SESSION.open_options().open_buffer(data)?.scan()?;
4160
let schema = Arc::new(scan.dtype()?.to_arrow_schema()?);
4261

@@ -50,13 +69,3 @@ impl Compressor for VortexCompressor {
5069
Ok(nbytes)
5170
}
5271
}
53-
54-
#[inline(never)]
55-
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> Result<u64> {
56-
let mut cursor = Cursor::new(buf);
57-
SESSION
58-
.write_options()
59-
.write(&mut cursor, array.to_array_stream())
60-
.await?;
61-
Ok(cursor.position())
62-
}

benchmarks/lance-bench/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ lance-encoding = { version = "0.39.0" }
2020

2121
anyhow = { workspace = true }
2222
arrow-cast = { workspace = true }
23-
arrow-schema = { workspace = true }
2423
async-trait = { workspace = true }
2524
clap = { workspace = true, features = ["derive"] }
2625
futures = { workspace = true }

0 commit comments

Comments
 (0)