Skip to content

Commit d53d066

Browse files
feat[bench-vortex]: benchmark vortex compact format (#3886)
Signed-off-by: Joe Isaacs <[email protected]> --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent aedde43 commit d53d066

File tree

14 files changed

+233
-79
lines changed

14 files changed

+233
-79
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,16 @@ tracing-subscriber = { workspace = true, features = [
7171
] }
7272
url = { workspace = true }
7373
uuid = { workspace = true, features = ["v4"] }
74-
vortex = { workspace = true, features = ["object_store", "parquet", "files"] }
74+
vortex = { workspace = true, features = [
75+
"object_store",
76+
"parquet",
77+
"files",
78+
"zstd",
79+
] }
7580
vortex-datafusion = { workspace = true }
7681
vortex-duckdb = { workspace = true }
82+
vortex-file = { workspace = true }
83+
vortex-layout = { workspace = true }
7784
xshell = { workspace = true }
7885

7986
[features]

bench-vortex/src/clickbench/clickbench_benchmark.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use url::Url;
99
use vortex::error::VortexExpect;
1010

1111
use crate::benchmark_trait::Benchmark;
12-
use crate::clickbench::{Flavor, clickbench_queries};
12+
use crate::clickbench::{Flavor, clickbench_queries, convert_parquet_to_vortex};
1313
use crate::engines::EngineCtx;
14-
use crate::{BenchmarkDataset, Format, IdempotentPath, Target};
14+
use crate::{BenchmarkDataset, CompactionStrategy, Format, IdempotentPath, Target};
1515

1616
/// ClickBench benchmark implementation
1717
pub struct ClickBenchBenchmark {
@@ -80,16 +80,16 @@ impl Benchmark for ClickBenchBenchmark {
8080
match self.data_url.scheme() {
8181
"file" => {
8282
let basepath = clickbench_flavor(self.flavor).to_data_path();
83+
let client = reqwest::blocking::Client::default();
8384

8485
match target.format() {
85-
Format::Parquet => {
86+
Format::Parquet | Format::OnDiskDuckDB => {
8687
// Download Parquet files (idempotent - won't re-download if already present)
87-
let client = reqwest::blocking::Client::default();
88+
// For DuckDB format, we typically start with Parquet and let DuckDB handle it
8889
self.flavor.download(&client, basepath.as_path())?;
8990
}
90-
Format::OnDiskVortex => {
91+
Format::OnDiskVortex | Format::VortexCompact => {
9192
// First ensure Parquet files exist
92-
let client = reqwest::blocking::Client::default();
9393
self.flavor.download(&client, basepath.as_path())?;
9494

9595
// Then convert to Vortex format (idempotent)
@@ -98,20 +98,29 @@ impl Benchmark for ClickBenchBenchmark {
9898
anyhow::anyhow!("invalid file URL: {}", self.data_url)
9999
})?;
100100

101-
let dataset = self.dataset();
102-
103101
// Use tokio runtime to handle async conversion
104102
let rt = tokio::runtime::Runtime::new()?;
105103
rt.block_on(async {
106-
crate::file::convert_parquet_to_vortex(&file_path, &dataset).await
107-
})?;
104+
match target.format {
105+
Format::OnDiskVortex => {
106+
convert_parquet_to_vortex(
107+
&file_path,
108+
CompactionStrategy::Default,
109+
)
110+
.await
111+
}
112+
Format::VortexCompact => {
113+
convert_parquet_to_vortex(
114+
&file_path,
115+
CompactionStrategy::Compact,
116+
)
117+
.await
118+
}
119+
_ => unreachable!(),
120+
}
121+
})?
108122
}
109123
}
110-
Format::OnDiskDuckDB => {
111-
// For DuckDB format, we typically start with Parquet and let DuckDB handle it
112-
let client = reqwest::blocking::Client::default();
113-
self.flavor.download(&client, basepath.as_path())?;
114-
}
115124
f => {
116125
todo!("format {f} unsupported in clickbench")
117126
}

bench-vortex/src/clickbench/clickbench_data.rs

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@ use datafusion::datasource::listing::{
1717
use datafusion::prelude::SessionContext;
1818
use futures::{StreamExt, TryStreamExt, stream};
1919
use glob::Pattern;
20+
use log::trace;
2021
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
2122
use reqwest::IntoUrl;
2223
use reqwest::blocking::Response;
2324
use serde::Serialize;
2425
use tokio::fs::{OpenOptions, create_dir_all};
25-
use tracing::{debug, info, warn};
26+
use tracing::{info, warn};
2627
use url::Url;
2728
use vortex::error::VortexExpect;
28-
use vortex::file::{VORTEX_FILE_EXTENSION, VortexWriteOptions};
29+
use vortex::file::VortexWriteOptions;
2930
use vortex_datafusion::VortexFormat;
3031

31-
use crate::Format;
3232
use crate::conversions::parquet_to_vortex;
3333
use crate::utils::file_utils::{idempotent, idempotent_async};
34+
use crate::{CompactionStrategy, Format};
3435

3536
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
3637
use DataType::*;
@@ -151,14 +152,22 @@ pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
151152
])
152153
});
153154

154-
pub async fn convert_parquet_to_vortex(input_path: &Path) -> anyhow::Result<()> {
155-
let vortex_dir = input_path.join(Format::OnDiskVortex.name());
155+
pub async fn convert_parquet_to_vortex(
156+
input_path: &Path,
157+
compaction: CompactionStrategy,
158+
) -> anyhow::Result<()> {
159+
let (format, dir_name) = match compaction {
160+
CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()),
161+
CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()),
162+
};
163+
164+
let vortex_dir = input_path.join(dir_name);
156165
let parquet_path = input_path.join(Format::Parquet.name());
157166
create_dir_all(&vortex_dir).await?;
158167

159168
let parquet_inputs = fs::read_dir(&parquet_path)?.collect::<std::io::Result<Vec<_>>>()?;
160169

161-
debug!(
170+
trace!(
162171
"Found {} parquet files in {}",
163172
parquet_inputs.len(),
164173
parquet_path.to_str().unwrap()
@@ -176,11 +185,14 @@ pub async fn convert_parquet_to_vortex(input_path: &Path) -> anyhow::Result<()>
176185
temp.file_name().unwrap().to_str().unwrap().to_string()
177186
};
178187
let parquet_file_path = parquet_path.join(format!("{filename}.parquet"));
179-
let output_path = vortex_dir.join(format!("{filename}.{VORTEX_FILE_EXTENSION}"));
188+
let output_path = vortex_dir.join(format!("{filename}.{}", format.ext()));
180189

181190
tokio::spawn(async move {
182191
idempotent_async(&output_path, move |vtx_file| async move {
183-
info!("Processing file '{filename}'");
192+
info!(
193+
"Processing file '{filename}' with {:?} strategy",
194+
compaction
195+
);
184196
let array_stream = parquet_to_vortex(parquet_file_path)?;
185197
let f = OpenOptions::new()
186198
.write(true)
@@ -189,7 +201,9 @@ pub async fn convert_parquet_to_vortex(input_path: &Path) -> anyhow::Result<()>
189201
.open(&vtx_file)
190202
.await?;
191203

192-
VortexWriteOptions::default().write(f, array_stream).await?;
204+
let write_options = compaction.apply_options(VortexWriteOptions::default());
205+
206+
write_options.write(f, array_stream).await?;
193207

194208
anyhow::Ok(())
195209
})
@@ -236,6 +250,39 @@ pub async fn register_vortex_files(
236250
Ok(())
237251
}
238252

253+
pub async fn register_vortex_compact_files(
254+
session: SessionContext,
255+
table_name: &str,
256+
input_path: &Url,
257+
schema: Option<Schema>,
258+
glob_pattern: Option<Pattern>,
259+
) -> anyhow::Result<()> {
260+
let vortex_compact_path = input_path.join(&format!("{}/", Format::VortexCompact.name()))?;
261+
let format = Arc::new(VortexFormat::default());
262+
263+
info!(
264+
"Registering vortex-compact table from {vortex_compact_path} with glob {:?}",
265+
glob_pattern.as_ref().map(|p| p.as_str()).unwrap_or("")
266+
);
267+
268+
let table_url = ListingTableUrl::try_new(vortex_compact_path, glob_pattern)?;
269+
270+
let config = ListingTableConfig::new(table_url).with_listing_options(
271+
ListingOptions::new(format).with_session_config_options(session.state().config()),
272+
);
273+
274+
let config = if let Some(schema) = schema {
275+
config.with_schema(schema.into())
276+
} else {
277+
config.infer_schema(&session.state()).await?
278+
};
279+
280+
let listing_table = Arc::new(ListingTable::try_new(config)?);
281+
session.register_table(table_name, listing_table)?;
282+
283+
Ok(())
284+
}
285+
239286
pub fn register_parquet_files(
240287
session: &SessionContext,
241288
table_name: &str,

bench-vortex/src/datasets/file.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::path::{Path, PathBuf};
54
use std::sync::Arc;
65

76
use anyhow::Result;
@@ -12,31 +11,11 @@ use datafusion::datasource::listing::{
1211
};
1312
use datafusion::prelude::SessionContext;
1413
use glob::Pattern;
15-
use tokio::fs::OpenOptions;
1614
use tracing::info;
1715
use url::Url;
18-
use vortex::file::VortexWriteOptions;
1916
use vortex_datafusion::VortexFormat;
2017

21-
use crate::conversions::parquet_to_vortex;
2218
use crate::datasets::BenchmarkDataset;
23-
use crate::idempotent_async;
24-
25-
pub async fn convert_parquet_to_vortex(
26-
input_path: &Path,
27-
dataset: &BenchmarkDataset,
28-
) -> Result<()> {
29-
match dataset {
30-
BenchmarkDataset::TpcH { .. } => {
31-
// This is done on-demand by the register_vortex_file function
32-
Ok(())
33-
}
34-
BenchmarkDataset::ClickBench { .. } => {
35-
crate::clickbench::convert_parquet_to_vortex(input_path).await
36-
}
37-
_ => todo!(),
38-
}
39-
}
4019

4120
pub async fn register_parquet_files(
4221
session: &SessionContext,
@@ -133,24 +112,49 @@ pub async fn register_vortex_files(
133112
Ok(())
134113
}
135114

136-
pub async fn parquet_file_to_vortex(parquet_path: &Path, vortex_path: &PathBuf) -> Result<()> {
137-
idempotent_async(vortex_path, async |vtx_file| {
138-
info!("Converting {:?} to Vortex format", parquet_path);
139-
140-
let array_stream = parquet_to_vortex(parquet_path.to_path_buf())?;
115+
pub async fn register_vortex_compact_files(
116+
session: &SessionContext,
117+
table_name: &str,
118+
file_url: &Url,
119+
glob: Option<Pattern>,
120+
schema: Option<Schema>,
121+
dataset: &BenchmarkDataset,
122+
) -> Result<()> {
123+
match dataset {
124+
BenchmarkDataset::TpcH { .. } | BenchmarkDataset::TpcDS { .. } => {
125+
info!(
126+
"Registering vortex-compact table from {}, with glob {:?}",
127+
&file_url,
128+
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
129+
);
130+
let format = Arc::new(VortexFormat::default());
131+
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
132+
let config = ListingTableConfig::new(table_url).with_listing_options(
133+
ListingOptions::new(format).with_session_config_options(session.state().config()),
134+
);
141135

142-
let f = OpenOptions::new()
143-
.write(true)
144-
.truncate(true)
145-
.create(true)
146-
.open(&vtx_file)
147-
.await?;
136+
let config = if let Some(schema) = schema {
137+
config.with_schema(schema.into())
138+
} else {
139+
config.infer_schema(&session.state()).await?
140+
};
148141

149-
VortexWriteOptions::default().write(f, array_stream).await?;
142+
let listing_table = Arc::new(ListingTable::try_new(config)?);
150143

151-
anyhow::Ok(())
152-
})
153-
.await?;
144+
session.register_table(table_name, listing_table)?;
145+
}
146+
BenchmarkDataset::ClickBench { .. } => {
147+
crate::clickbench::register_vortex_compact_files(
148+
session.clone(),
149+
table_name,
150+
file_url,
151+
schema,
152+
glob,
153+
)
154+
.await?;
155+
}
156+
BenchmarkDataset::PublicBi { .. } => todo!(),
157+
}
154158

155159
Ok(())
156160
}

bench-vortex/src/datasets/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,10 @@ impl BenchmarkDataset {
134134
Some(glob),
135135
)?;
136136
}
137-
(BenchmarkDataset::ClickBench { single_file, .. }, Format::OnDiskVortex) => {
137+
(
138+
BenchmarkDataset::ClickBench { single_file, .. },
139+
Format::OnDiskVortex | Format::VortexCompact,
140+
) => {
138141
// Use glob pattern for partitioned files, specific file pattern for single file
139142
let glob = if *single_file {
140143
Some(glob::Pattern::new("hits_0.vortex")?)

bench-vortex/src/engines/ddb/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl DuckDBCtx {
8383
dataset: &BenchmarkDataset,
8484
) -> Result<()> {
8585
let object = match file_format {
86-
Format::Parquet | Format::OnDiskVortex => DuckDBObject::View,
86+
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => DuckDBObject::View,
8787
Format::OnDiskDuckDB => DuckDBObject::Table,
8888
format => anyhow::bail!("Format {format} isn't supported for DuckDB"),
8989
};
@@ -96,15 +96,14 @@ impl DuckDBCtx {
9696

9797
let effective_url = self.resolve_storage_url(base_url, load_format, dataset)?;
9898
let extension = match load_format {
99-
Format::Parquet => "parquet",
100-
Format::OnDiskVortex => "vortex",
99+
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => load_format.ext(),
101100
other => anyhow::bail!("Format {other} isn't supported for DuckDB"),
102101
};
103102

104103
// Generate and execute table registration commands
105104
let commands = self.generate_table_commands(&effective_url, extension, dataset, object);
106-
self.execute_query(&commands)?;
107105
trace!("Executing table registration commands: {commands}");
106+
self.execute_query(&commands)?;
108107

109108
Ok(())
110109
}
@@ -116,7 +115,10 @@ impl DuckDBCtx {
116115
file_format: Format,
117116
dataset: &BenchmarkDataset,
118117
) -> Result<Url> {
119-
if file_format == Format::OnDiskVortex || file_format == Format::Parquet {
118+
if file_format == Format::OnDiskVortex
119+
|| file_format == Format::Parquet
120+
|| file_format == Format::VortexCompact
121+
{
120122
match dataset.format_path(file_format, base_url) {
121123
Ok(vortex_url) => Ok(vortex_url),
122124
Err(_) => Ok(base_url.clone()),

0 commit comments

Comments
 (0)