Skip to content

Commit b6edd04

Browse files
committed
Move code around for compression
Signed-off-by: Adam Gutglick <[email protected]>
1 parent f6e346a commit b6edd04

File tree

10 files changed

+74
-159
lines changed

10 files changed

+74
-159
lines changed

.github/workflows/sql-benchmarks.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ jobs:
146146
# Build options string if scale_factor is set
147147
opts=""
148148
if [ -n "${{ matrix.scale_factor }}" ]; then
149-
opts="--opt scale_factor=${{ matrix.scale_factor }}"
149+
opts="--opt scale-factor=${{ matrix.scale_factor }}"
150150
fi
151151
152152
# Generate all data formats with a single command
@@ -198,7 +198,7 @@ jobs:
198198
# Build options string if scale_factor is set
199199
opts=""
200200
if [ -n "${{ matrix.scale_factor }}" ]; then
201-
opts="--opt scale_factor=${{ matrix.scale_factor }}"
201+
opts="--opt scale-factor=${{ matrix.scale_factor }}"
202202
fi
203203
204204
touch results.json
@@ -252,9 +252,9 @@ jobs:
252252
df_formats=$(echo "${{ matrix.targets }}" | tr ',' '\n' | grep '^datafusion:' | sed 's/datafusion://' | tr '\n' ',' | sed 's/,$//')
253253
254254
# Build options string if scale_factor is set
255-
opts="--opt remote_data_dir=${{ matrix.remote_storage }}"
255+
opts="--opt remote-data-dir=${{ matrix.remote_storage }}"
256256
if [ -n "${{ matrix.scale_factor }}" ]; then
257-
opts="--opt scale_factor=${{ matrix.scale_factor }} ${opts}"
257+
opts="--opt scale-factor=${{ matrix.scale_factor }} ${opts}"
258258
fi
259259
260260
touch results.json

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/compress-bench/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,17 @@ publish = false
1616

1717
[dependencies]
1818
anyhow = { workspace = true }
19+
arrow-array = { workspace = true }
20+
arrow-schema = { workspace = true }
21+
async-trait = { workspace = true }
22+
bytes = { workspace = true }
1923
clap = { workspace = true, features = ["derive"] }
24+
futures = { workspace = true }
2025
indicatif = { workspace = true }
2126
itertools = { workspace = true }
27+
parquet = { workspace = true }
2228
regex = { workspace = true }
29+
serde = { workspace = true }
2330
tokio = { workspace = true, features = ["full"] }
2431
tracing = { workspace = true }
2532
vortex = { workspace = true }
Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,9 @@ use clap::ValueEnum;
1313
use serde::Serialize;
1414
use vortex::array::Array;
1515
use vortex::utils::aliases::hash_map::HashMap;
16-
17-
use crate::Format;
18-
use crate::measurements::CompressionTimingMeasurement;
19-
use crate::measurements::CustomUnitMeasurement;
20-
21-
// ============================================================================
22-
// Measurement types
23-
// ============================================================================
16+
use vortex_bench::Format;
17+
use vortex_bench::measurements::CompressionTimingMeasurement;
18+
use vortex_bench::measurements::CustomUnitMeasurement;
2419

2520
#[derive(Default)]
2621
pub struct CompressMeasurements {
@@ -79,10 +74,6 @@ pub struct DecompressResult {
7974
pub timing: CompressionTimingMeasurement,
8075
}
8176

82-
// ============================================================================
83-
// Compressor trait
84-
// ============================================================================
85-
8677
/// Trait for format-specific compression/decompression operations.
8778
///
8879
/// Implementations handle the actual compression logic for a specific format
@@ -94,16 +85,12 @@ pub trait Compressor: Send + Sync {
9485
fn format(&self) -> Format;
9586

9687
/// Compress the array data, returning the compressed bytes.
97-
async fn compress(&self, array: &dyn Array) -> Result<Bytes>;
88+
async fn compress(&self, array: &dyn Array) -> Result<(Bytes, Duration)>;
9889

9990
/// Decompress the data, returning the decompressed size in bytes.
10091
async fn decompress(&self, data: Bytes) -> Result<usize>;
10192
}
10293

103-
// ============================================================================
104-
// Benchmark functions
105-
// ============================================================================
106-
10794
/// Run a compression benchmark for the given compressor.
10895
///
10996
/// Executes compression `iterations` times and returns timing statistics.
@@ -118,9 +105,7 @@ pub async fn benchmark_compress(
118105
let mut compressed_size = 0u64;
119106

120107
for _ in 0..iterations {
121-
let start = Instant::now();
122-
let compressed = compressor.compress(uncompressed).await?;
123-
let elapsed = start.elapsed();
108+
let (compressed, elapsed) = compressor.compress(uncompressed).await?;
124109

125110
compressed_size = compressed.len() as u64;
126111
fastest = fastest.min(elapsed);
@@ -160,7 +145,7 @@ pub async fn benchmark_decompress(
160145
let format = compressor.format();
161146

162147
// First compress to get the bytes we'll decompress
163-
let compressed = compressor.compress(uncompressed).await?;
148+
let (compressed, _) = compressor.compress(uncompressed).await?;
164149

165150
let mut fastest = Duration::MAX;
166151

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,9 @@ pub mod bench;
1212
pub mod parquet;
1313
pub mod vortex;
1414

15-
// Re-export the trait and compressor implementations for convenience
16-
pub use bench::CompressMeasurements;
17-
pub use bench::CompressOp;
18-
pub use bench::CompressResult;
19-
pub use bench::Compressor;
20-
pub use bench::DecompressResult;
21-
pub use bench::benchmark_compress;
22-
pub use bench::benchmark_decompress;
23-
pub use bench::calculate_ratios;
24-
pub use parquet::ParquetCompressor;
25-
pub use vortex::VortexCompressor;
26-
27-
pub fn chunked_to_vec_record_batch(chunked: ChunkedArray) -> (Vec<RecordBatch>, Arc<Schema>) {
15+
pub fn chunked_to_vec_record_batch(
16+
chunked: ChunkedArray,
17+
) -> anyhow::Result<(Vec<RecordBatch>, Arc<Schema>)> {
2818
let chunks_vec = chunked.chunks();
2919
assert!(!chunks_vec.is_empty(), "empty chunks");
3020

@@ -34,10 +24,10 @@ pub fn chunked_to_vec_record_batch(chunked: ChunkedArray) -> (Vec<RecordBatch>,
3424
// TODO(connor)[ListView]: The rust Parquet implementation does not support writing
3525
// `ListView` to Parquet files yet.
3626
let converted_array = recursive_list_from_list_view(array.clone());
37-
RecordBatch::try_from(converted_array.as_ref()).unwrap()
27+
Ok(RecordBatch::try_from(converted_array.as_ref())?)
3828
})
39-
.collect::<Vec<_>>();
29+
.collect::<anyhow::Result<Vec<_>>>()?;
4030

4131
let schema = batches[0].schema();
42-
(batches, schema)
32+
Ok((batches, schema))
4333
}

benchmarks/compress-bench/src/main.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use clap::Parser;
8+
use compress_bench::bench::CompressMeasurements;
9+
use compress_bench::bench::CompressOp;
10+
use compress_bench::bench::Compressor;
11+
use compress_bench::bench::benchmark_compress;
12+
use compress_bench::bench::benchmark_decompress;
13+
use compress_bench::bench::calculate_ratios;
14+
use compress_bench::parquet::ParquetCompressor;
15+
use compress_bench::vortex::VortexCompressor;
816
use indicatif::ProgressBar;
917
use itertools::Itertools;
1018
use regex::Regex;
@@ -17,14 +25,6 @@ use vortex_bench::BenchmarkOutput;
1725
use vortex_bench::Engine;
1826
use vortex_bench::Format;
1927
use vortex_bench::Target;
20-
use vortex_bench::compress::CompressMeasurements;
21-
use vortex_bench::compress::CompressOp;
22-
use vortex_bench::compress::Compressor;
23-
use vortex_bench::compress::ParquetCompressor;
24-
use vortex_bench::compress::VortexCompressor;
25-
use vortex_bench::compress::benchmark_compress;
26-
use vortex_bench::compress::benchmark_decompress;
27-
use vortex_bench::compress::calculate_ratios;
2828
use vortex_bench::datasets::Dataset;
2929
use vortex_bench::datasets::struct_list_of_ints::StructListOfInts;
3030
use vortex_bench::datasets::taxi_data::TaxiData;
Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
use std::io::Cursor;
55
use std::sync::Arc;
6+
use std::time::Duration;
7+
use std::time::Instant;
68

7-
use anyhow::Result;
89
use arrow_array::RecordBatch;
910
use arrow_schema::Schema;
1011
use async_trait::async_trait;
@@ -16,10 +17,10 @@ use parquet::basic::ZstdLevel;
1617
use parquet::file::properties::WriterProperties;
1718
use vortex::array::Array;
1819
use vortex::array::arrays::ChunkedVTable;
20+
use vortex_bench::Format;
1921

20-
use crate::Format;
21-
use crate::compress::bench::Compressor;
22-
use crate::compress::chunked_to_vec_record_batch;
22+
use crate::bench::Compressor;
23+
use crate::chunked_to_vec_record_batch;
2324

2425
/// Compressor implementation for Parquet format with ZSTD compression.
2526
pub struct ParquetCompressor {
@@ -50,17 +51,19 @@ impl Compressor for ParquetCompressor {
5051
Format::Parquet
5152
}
5253

53-
async fn compress(&self, array: &dyn Array) -> Result<Bytes> {
54+
async fn compress(&self, array: &dyn Array) -> anyhow::Result<(Bytes, Duration)> {
5455
let chunked = array.as_::<ChunkedVTable>().clone();
55-
let (batches, schema) = chunked_to_vec_record_batch(chunked);
56+
let (batches, schema) = chunked_to_vec_record_batch(chunked)?;
5657

5758
let mut buf = Vec::new();
58-
parquet_compress_write(batches, schema, self.compression, &mut buf);
59-
Ok(Bytes::from(buf))
59+
let start = Instant::now();
60+
parquet_compress_write(batches, schema, self.compression, &mut buf)?;
61+
let elapsed = start.elapsed();
62+
Ok((Bytes::from(buf), elapsed))
6063
}
6164

62-
async fn decompress(&self, data: Bytes) -> Result<usize> {
63-
Ok(parquet_decompress_read(data))
65+
async fn decompress(&self, data: Bytes) -> anyhow::Result<usize> {
66+
parquet_decompress_read(data)
6467
}
6568
}
6669

@@ -70,28 +73,29 @@ pub fn parquet_compress_write(
7073
schema: Arc<Schema>,
7174
compression: Compression,
7275
buf: &mut Vec<u8>,
73-
) -> usize {
76+
) -> anyhow::Result<usize> {
7477
let mut buf = Cursor::new(buf);
7578
let writer_properties = WriterProperties::builder()
7679
.set_compression(compression)
7780
.build();
78-
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_properties)).unwrap();
81+
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_properties))?;
7982
for batch in batches {
80-
writer.write(&batch).unwrap();
83+
writer.write(&batch)?;
8184
}
82-
writer.flush().unwrap();
85+
writer.flush()?;
8386
let n_bytes = writer.bytes_written();
84-
writer.close().unwrap();
85-
n_bytes
87+
writer.close()?;
88+
Ok(n_bytes)
8689
}
8790

8891
#[inline(never)]
89-
pub fn parquet_decompress_read(buf: Bytes) -> usize {
90-
let builder = ParquetRecordBatchReaderBuilder::try_new(buf).unwrap();
91-
let reader = builder.build().unwrap();
92+
pub fn parquet_decompress_read(buf: Bytes) -> anyhow::Result<usize> {
93+
let builder = ParquetRecordBatchReaderBuilder::try_new(buf)?;
94+
let reader = builder.build()?;
9295
let mut nbytes = 0;
9396
for batch in reader {
94-
nbytes += batch.unwrap().get_array_memory_size()
97+
nbytes += batch?.get_array_memory_size()
9598
}
96-
nbytes
99+
100+
Ok(nbytes)
97101
}
Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
use std::io::Cursor;
55
use std::sync::Arc;
6+
use std::time::Duration;
7+
use std::time::Instant;
68

79
use anyhow::Result;
810
use async_trait::async_trait;
@@ -12,10 +14,10 @@ use futures::pin_mut;
1214
use vortex::array::Array;
1315
use vortex::file::OpenOptionsSessionExt;
1416
use vortex::file::WriteOptionsSessionExt;
17+
use vortex_bench::Format;
18+
use vortex_bench::SESSION;
1519

16-
use crate::Format;
17-
use crate::SESSION;
18-
use crate::compress::bench::Compressor;
20+
use crate::bench::Compressor;
1921

2022
/// Compressor implementation for Vortex format.
2123
pub struct VortexCompressor;
@@ -26,10 +28,12 @@ impl Compressor for VortexCompressor {
2628
Format::OnDiskVortex
2729
}
2830

29-
async fn compress(&self, array: &dyn Array) -> Result<Bytes> {
31+
async fn compress(&self, array: &dyn Array) -> Result<(Bytes, Duration)> {
3032
let mut buf = Vec::new();
33+
let start = Instant::now();
3134
vortex_compress_write(array, &mut buf).await?;
32-
Ok(Bytes::from(buf))
35+
let elapsed = start.elapsed();
36+
Ok((Bytes::from(buf), elapsed))
3337
}
3438

3539
async fn decompress(&self, data: Bytes) -> Result<usize> {

0 commit comments

Comments
 (0)