Skip to content

Commit b9aad17

Browse files
authored
Run clickbench file download on a local rayon pool (#3661)
Signed-off-by: Robert Kruszewski <[email protected]>
1 parent c57c90e commit b9aad17

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

bench-vortex/src/clickbench.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datafusion::datasource::listing::{
1313
};
1414
use datafusion::prelude::SessionContext;
1515
use futures::{StreamExt, TryStreamExt, stream};
16-
use rayon::iter::{IntoParallelIterator, ParallelIterator};
16+
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
1717
use reqwest::IntoUrl;
1818
use reqwest::blocking::Response;
1919
use tokio::fs::{OpenOptions, create_dir_all};
@@ -314,7 +314,10 @@ impl Flavor {
314314
Flavor::Partitioned => {
315315
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
316316
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
317-
let _ = (0_u32..100).into_par_iter().map(|idx| {
317+
let pool = rayon::ThreadPoolBuilder::new()
318+
.thread_name(|i| format!("clickbench download {i}"))
319+
.build()?;
320+
let _ = pool.install(|| (0_u32..100).into_par_iter().map(|idx| {
318321
let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet"));
319322
idempotent(&output_path, |output_path| {
320323
info!("Downloading file {idx}");
@@ -325,7 +328,7 @@ impl Flavor {
325328

326329
anyhow::Ok(())
327330
})
328-
}).collect::<anyhow::Result<Vec<_>>>()?;
331+
}).collect::<anyhow::Result<Vec<_>>>())?;
329332
}
330333
}
331334
Ok(())

0 commit comments

Comments
 (0)