Skip to content

Commit 217dfc2

Browse files
chore[bench]: generate partitioned tpch files (#3824)
Signed-off-by: Joe Isaacs <[email protected]> --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent 8afe0fe commit 217dfc2

File tree

10 files changed

+217
-246
lines changed

10 files changed

+217
-246
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ datafusion-physical-plan = { workspace = true }
3434
dirs = { workspace = true }
3535
erased-serde = { workspace = true }
3636
futures = { workspace = true }
37+
glob = { workspace = true }
3738
humansize = { workspace = true }
3839
indicatif = { workspace = true }
3940
itertools = { workspace = true }
@@ -44,6 +45,7 @@ opentelemetry = { workspace = true }
4445
opentelemetry-otlp = { workspace = true, features = ["trace"] }
4546
opentelemetry_sdk = { workspace = true }
4647
parquet = { workspace = true, features = ["async"] }
48+
paste = { workspace = true }
4749
rand = { workspace = true }
4850
rayon = { workspace = true }
4951
regex = { workspace = true }

bench-vortex/src/clickbench.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use datafusion::datasource::listing::{
1616
};
1717
use datafusion::prelude::SessionContext;
1818
use futures::{StreamExt, TryStreamExt, stream};
19+
use glob::Pattern;
1920
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
2021
use reqwest::IntoUrl;
2122
use reqwest::blocking::Response;
@@ -207,18 +208,17 @@ pub async fn register_vortex_files(
207208
table_name: &str,
208209
input_path: &Url,
209210
schema: Option<Schema>,
210-
single_file: bool,
211+
glob_pattern: Option<Pattern>,
211212
) -> anyhow::Result<()> {
212-
let mut vortex_path = input_path.join(&format!("{}/", Format::OnDiskVortex.name()))?;
213-
if single_file {
214-
vortex_path = vortex_path.join("hits_0.vortex")?;
215-
}
216-
213+
let vortex_path = input_path.join(&format!("{}/", Format::OnDiskVortex.name()))?;
217214
let format = Arc::new(VortexFormat::default());
218215

219-
info!("Registering table from {vortex_path}");
216+
info!(
217+
"Registering table from {vortex_path} with glob {:?}",
218+
glob_pattern.as_ref().map(|p| p.as_str()).unwrap_or("")
219+
);
220220

221-
let table_url = ListingTableUrl::parse(vortex_path)?;
221+
let table_url = ListingTableUrl::try_new(vortex_path, glob_pattern)?;
222222

223223
let config = ListingTableConfig::new(table_url).with_listing_options(
224224
ListingOptions::new(format).with_session_config_options(session.state().config()),
@@ -241,16 +241,17 @@ pub fn register_parquet_files(
241241
table_name: &str,
242242
input_path: &Url,
243243
schema: &Schema,
244-
single_file: bool,
244+
glob_pattern: Option<Pattern>,
245245
) -> anyhow::Result<()> {
246246
let format = Arc::new(ParquetFormat::new());
247-
let mut table_path = input_path.join(&format!("{}/", Format::Parquet))?;
248-
if single_file {
249-
table_path = table_path.join("hits_0.parquet")?;
250-
}
247+
let table_path = input_path.join(&format!("{}/", Format::Parquet))?;
251248

252-
info!("Registering table from {}", &table_path);
253-
let table_url = ListingTableUrl::parse(table_path)?;
249+
info!(
250+
"Registering table from {} with glob {:?}",
251+
&table_path,
252+
glob_pattern.as_ref().map(|p| p.as_str()).unwrap_or("")
253+
);
254+
let table_url = ListingTableUrl::try_new(table_path, glob_pattern)?;
254255

255256
let config = ListingTableConfig::new(table_url)
256257
.with_listing_options(

bench-vortex/src/datasets/file.rs

Lines changed: 30 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
1010
use datafusion::datasource::listing::{
1111
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
1212
};
13-
use datafusion::prelude::{ParquetReadOptions, SessionContext};
14-
use object_store::ObjectStore;
15-
use object_store::path::Path as ObjectStorePath;
13+
use datafusion::prelude::SessionContext;
14+
use glob::Pattern;
1615
use tokio::fs::OpenOptions;
1716
use tracing::info;
1817
use url::Url;
@@ -21,7 +20,7 @@ use vortex_datafusion::VortexFormat;
2120

2221
use crate::conversions::parquet_to_vortex;
2322
use crate::datasets::BenchmarkDataset;
24-
use crate::{Format, idempotent_async};
23+
use crate::idempotent_async;
2524

2625
pub async fn convert_parquet_to_vortex(
2726
input_path: &Path,
@@ -41,36 +40,22 @@ pub async fn convert_parquet_to_vortex(
4140

4241
pub async fn register_parquet_files(
4342
session: &SessionContext,
44-
object_store: Arc<dyn ObjectStore>,
4543
table_name: &str,
4644
file_url: &Url,
45+
glob: Option<Pattern>,
4746
schema: Option<Schema>,
4847
dataset: &BenchmarkDataset,
4948
) -> Result<()> {
5049
match dataset {
5150
BenchmarkDataset::TpcH { .. } => {
52-
let parquet_url = file_url.clone();
53-
ensure_parquet_file_exists(object_store.as_ref(), &parquet_url).await?;
54-
55-
session
56-
.register_parquet(
57-
table_name,
58-
parquet_url.as_str(),
59-
ParquetReadOptions::default(),
60-
)
61-
.await?;
62-
}
63-
BenchmarkDataset::ClickBench { single_file, .. } => {
64-
// For ClickBench, we use simplified pre-built Parquet registration
51+
info!(
52+
"Registering table from {}, with glob {:?}",
53+
&file_url,
54+
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
55+
);
6556
let format = Arc::new(ParquetFormat::new());
66-
let mut parquet_path = dataset.format_path(Format::Parquet, file_url)?;
67-
68-
if *single_file {
69-
parquet_path = parquet_path.join("hits.parquet")?;
70-
}
7157

72-
info!("Registering table from {}", &parquet_path);
73-
let table_url = ListingTableUrl::parse(parquet_path)?;
58+
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
7459

7560
let config = ListingTableConfig::new(table_url).with_listing_options(
7661
ListingOptions::new(format).with_session_config_options(session.state().config()),
@@ -83,8 +68,18 @@ pub async fn register_parquet_files(
8368
};
8469

8570
let listing_table = Arc::new(ListingTable::try_new(config)?);
71+
8672
session.register_table(table_name, listing_table)?;
8773
}
74+
BenchmarkDataset::ClickBench { .. } => {
75+
crate::clickbench::register_parquet_files(
76+
session,
77+
table_name,
78+
file_url,
79+
&crate::clickbench::HITS_SCHEMA,
80+
glob,
81+
)?;
82+
}
8883
_ => todo!(),
8984
}
9085

@@ -93,17 +88,21 @@ pub async fn register_parquet_files(
9388

9489
pub async fn register_vortex_files(
9590
session: &SessionContext,
96-
_object_store: Arc<dyn ObjectStore>,
9791
table_name: &str,
9892
file_url: &Url,
93+
glob: Option<Pattern>,
9994
schema: Option<Schema>,
10095
dataset: &BenchmarkDataset,
10196
) -> Result<()> {
10297
match dataset {
10398
BenchmarkDataset::TpcH { .. } | BenchmarkDataset::TpcDS { .. } => {
104-
// Register the Vortex file
99+
info!(
100+
"Registering table from {}, with glob {:?}",
101+
&file_url,
102+
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
103+
);
105104
let format = Arc::new(VortexFormat::default());
106-
let table_url = ListingTableUrl::parse(file_url.as_str())?;
105+
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
107106
let config = ListingTableConfig::new(table_url).with_listing_options(
108107
ListingOptions::new(format).with_session_config_options(session.state().config()),
109108
);
@@ -115,15 +114,16 @@ pub async fn register_vortex_files(
115114
};
116115

117116
let listing_table = Arc::new(ListingTable::try_new(config)?);
117+
118118
session.register_table(table_name, listing_table)?;
119119
}
120-
BenchmarkDataset::ClickBench { single_file, .. } => {
120+
BenchmarkDataset::ClickBench { .. } => {
121121
crate::clickbench::register_vortex_files(
122122
session.clone(),
123123
table_name,
124124
file_url,
125125
schema,
126-
*single_file,
126+
glob,
127127
)
128128
.await?;
129129
}
@@ -133,29 +133,6 @@ pub async fn register_vortex_files(
133133
Ok(())
134134
}
135135

136-
async fn ensure_parquet_file_exists(
137-
object_store: &dyn ObjectStore,
138-
parquet_url: &Url,
139-
) -> Result<()> {
140-
let parquet_path = parquet_url.path();
141-
142-
if let Err(e) = object_store
143-
.head(&ObjectStorePath::parse(parquet_path)?)
144-
.await
145-
{
146-
info!(
147-
"Asserting file exist: File {} doesn't exist because {e}",
148-
parquet_url.as_str()
149-
);
150-
151-
if parquet_url.scheme() != "file" {
152-
anyhow::bail!("Writing to S3 does not seem to work!");
153-
}
154-
}
155-
156-
Ok(())
157-
}
158-
159136
pub async fn parquet_file_to_vortex(parquet_path: &Path, vortex_path: &PathBuf) -> Result<()> {
160137
idempotent_async(vortex_path, async |vtx_file| {
161138
info!("Converting {:?} to Vortex format", parquet_path);

bench-vortex/src/datasets/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,33 @@ impl BenchmarkDataset {
120120
// TPC-H tables are handled separately
121121
}
122122
(BenchmarkDataset::ClickBench { single_file, .. }, Format::Parquet) => {
123+
// Use glob pattern for partitioned files, specific file pattern for single file
124+
let glob = if *single_file {
125+
glob::Pattern::new("hits_0.parquet")?
126+
} else {
127+
glob::Pattern::new("*.parquet")?
128+
};
123129
clickbench::register_parquet_files(
124130
session,
125131
"hits",
126132
base_url,
127133
&clickbench::HITS_SCHEMA,
128-
*single_file,
134+
Some(glob),
129135
)?;
130136
}
131137
(BenchmarkDataset::ClickBench { single_file, .. }, Format::OnDiskVortex) => {
138+
// Use glob pattern for partitioned files, specific file pattern for single file
139+
let glob = if *single_file {
140+
Some(glob::Pattern::new("hits_0.vortex")?)
141+
} else {
142+
Some(glob::Pattern::new("*.vortex")?)
143+
};
132144
clickbench::register_vortex_files(
133145
session.clone(),
134146
"hits",
135147
base_url,
136148
Some(clickbench::HITS_SCHEMA.clone()),
137-
*single_file,
149+
glob,
138150
)
139151
.await?;
140152
}

bench-vortex/src/engines/ddb/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl DuckDBCtx {
146146
];
147147

148148
for table_name in &tables {
149-
let table_path = format!("{base_dir}{table_name}.{extension}");
149+
let table_path = format!("{base_dir}{table_name}_*.{extension}");
150150
commands.push_str(&format!(
151151
"CREATE {} IF NOT EXISTS {table_name} AS SELECT * FROM read_{extension}('{table_path}');\n",
152152
duckdb_object.to_str(),

bench-vortex/src/tpcds/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use datafusion::prelude::SessionContext;
88
use itertools::Itertools;
99
use url::Url;
1010

11-
use crate::df::{get_session_context, make_object_store};
11+
use crate::df::get_session_context;
1212
use crate::tpch::{register_arrow, register_parquet, register_vortex_file};
1313
use crate::{BenchmarkDataset, Format};
1414

@@ -34,8 +34,6 @@ pub async fn load_datasets(
3434
) -> anyhow::Result<SessionContext> {
3535
let context = get_session_context(disable_datafusion_cache);
3636

37-
let object_store = make_object_store(&context, base_dir)?;
38-
3937
let files = match dataset {
4038
dataset @ BenchmarkDataset::TpcDS { .. } => {
4139
dataset.tables().iter().map(|f| (*f, None)).collect_vec()
@@ -57,14 +55,12 @@ pub async fn load_datasets(
5755
}) {
5856
let path = path?;
5957
match format {
60-
Format::Arrow => register_arrow(&context, name, &path).await?,
58+
Format::Arrow => register_arrow(&context, name, &path, None).await?,
6159
Format::Parquet => {
62-
register_parquet(&context, object_store.clone(), name, &path, schema, dataset)
63-
.await?
60+
register_parquet(&context, name, &path, None, schema, dataset).await?
6461
}
6562
Format::OnDiskVortex => {
66-
register_vortex_file(&context, object_store.clone(), name, &path, schema, dataset)
67-
.await?
63+
register_vortex_file(&context, name, &path, None, schema, dataset).await?
6864
}
6965
Format::OnDiskDuckDB => unreachable!("duckdb never supported with datafusion"),
7066
Format::Csv => todo!(),

0 commit comments

Comments
 (0)