Skip to content

Commit 5b7db3b

Browse files
committed
benchmark + generate data
1 parent 769f367 commit 5b7db3b

File tree

3 files changed

+283
-0
lines changed

3 files changed

+283
-0
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ name = "distinct_query_sql"
206206
harness = false
207207
name = "push_down_filter"
208208

209+
[[bench]]
210+
harness = false
211+
name = "repartition_dict"
212+
209213
[[bench]]
210214
harness = false
211215
name = "sort_limit_query_sql"
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for RepartitionExec with dictionary-encoded columns.
19+
//!
20+
//! Builds physical plans by hand — no SQL, no optimizer:
21+
//!
22+
//! CoalescePartitionsExec
23+
//! RepartitionExec: Hash([pod], 8)
24+
//! DataSourceExec (ParquetSource, streams from disk)
25+
//!
26+
//! Data lives on disk so memory usage is bounded to a few in-flight batches,
27+
//! avoiding the OOM that a hash-join self-join would cause at 150M rows.
28+
//!
29+
//! Two parquet files are written during setup:
30+
//! access_log_dict.parquet — Dictionary(Int32, Utf8) columns (native)
31+
//! access_log_plain.parquet — same data, dict columns cast to Utf8
32+
//!
33+
//! Run:
34+
//! cargo bench -p datafusion --bench repartition_dict
35+
//!
36+
//! Profile with samply:
37+
//! RUSTFLAGS="-C force-frame-pointers=yes" cargo bench \
38+
//! -p datafusion --bench repartition_dict --no-run
39+
//! samply record cargo bench \
40+
//! -p datafusion --bench repartition_dict -- dict --profile-time 10
41+
42+
use std::fs::File;
43+
use std::path::PathBuf;
44+
use std::sync::Arc;
45+
use std::time::Instant;
46+
47+
use arrow::array::ArrayRef;
48+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
49+
use arrow::record_batch::RecordBatch;
50+
use criterion::{criterion_group, criterion_main, Criterion};
51+
use datafusion::execution::context::SessionContext;
52+
use datafusion::physical_plan::{
53+
coalesce_partitions::CoalescePartitionsExec, ExecutionPlan,
54+
};
55+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
56+
use datafusion_datasource::source::DataSourceExec;
57+
use datafusion_datasource::PartitionedFile;
58+
use datafusion_datasource_parquet::source::ParquetSource;
59+
use datafusion_execution::object_store::ObjectStoreUrl;
60+
use datafusion_physical_expr::expressions::col;
61+
use datafusion_physical_plan::repartition::RepartitionExec;
62+
use datafusion_physical_plan::Partitioning;
63+
use futures::StreamExt;
64+
use parquet::arrow::ArrowWriter;
65+
use parquet::file::properties::WriterProperties;
66+
use test_utils::AccessLogGenerator;
67+
use tokio::runtime::Runtime;
68+
69+
/// Total rows written to each parquet file.
70+
const NUM_ROWS: usize = 150_000_000;
71+
72+
/// Rows per batch during generation and per parquet row-group.
73+
const BATCH_SIZE: usize = 8_192;
74+
75+
/// Number of output partitions for RepartitionExec.
76+
const NUM_PARTITIONS: usize = 12;
77+
78+
/// Write an iterator of RecordBatches to a parquet file.
79+
/// Skips writing if the file already exists.
80+
fn ensure_parquet(
81+
path: &PathBuf,
82+
label: &str,
83+
batches: impl Iterator<Item = RecordBatch>,
84+
) -> SchemaRef {
85+
// If the file exists already, just read its schema and return.
86+
if path.exists() {
87+
println!("Reusing {label} parquet: {}", path.display());
88+
let file = File::open(path).unwrap();
89+
let reader =
90+
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)
91+
.unwrap();
92+
return reader.schema().clone();
93+
}
94+
95+
println!("Writing {label} parquet ({NUM_ROWS} rows) → {} …", path.display());
96+
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
97+
98+
let file = File::create(path).unwrap();
99+
let props = WriterProperties::builder().build();
100+
let mut writer: Option<ArrowWriter<File>> = None;
101+
let mut schema: Option<SchemaRef> = None;
102+
103+
for batch in batches {
104+
if writer.is_none() {
105+
schema = Some(batch.schema());
106+
writer = Some(
107+
ArrowWriter::try_new(
108+
file.try_clone().unwrap(),
109+
batch.schema(),
110+
Some(props.clone()),
111+
)
112+
.unwrap(),
113+
);
114+
}
115+
writer.as_mut().unwrap().write(&batch).unwrap();
116+
}
117+
writer.unwrap().close().unwrap();
118+
println!("Done.");
119+
schema.unwrap()
120+
}
121+
122+
/// Cast all Dictionary(Int32, Utf8) columns in a batch to plain Utf8.
123+
fn batch_dict_to_plain(batch: &RecordBatch, plain_schema: &SchemaRef) -> RecordBatch {
124+
let new_cols: Vec<ArrayRef> = batch
125+
.columns()
126+
.iter()
127+
.zip(batch.schema().fields().iter())
128+
.map(|(arr, field)| match field.data_type() {
129+
DataType::Dictionary(_, _) => Arc::new(
130+
arrow::compute::cast(arr, &DataType::Utf8).unwrap(),
131+
) as ArrayRef,
132+
_ => Arc::clone(arr),
133+
})
134+
.collect();
135+
RecordBatch::try_new(plain_schema.clone(), new_cols).unwrap()
136+
}
137+
138+
/// Derive a plain-Utf8 schema from a dict schema.
139+
fn plain_schema(dict_schema: &SchemaRef) -> SchemaRef {
140+
let fields: Vec<Field> = dict_schema
141+
.fields()
142+
.iter()
143+
.map(|f| match f.data_type() {
144+
DataType::Dictionary(_, _) => {
145+
Field::new(f.name(), DataType::Utf8, f.is_nullable())
146+
}
147+
_ => f.as_ref().clone(),
148+
})
149+
.collect();
150+
Arc::new(Schema::new(fields))
151+
}
152+
153+
/// Build and execute:
154+
/// CoalescePartitionsExec
155+
/// RepartitionExec(Hash([partition_col], NUM_PARTITIONS))
156+
/// DataSourceExec (ParquetSource)
157+
fn run_repartition(
158+
rt: &Runtime,
159+
task_ctx: Arc<datafusion::execution::TaskContext>,
160+
schema: SchemaRef,
161+
parquet_path: &PathBuf,
162+
partition_col: &str,
163+
) {
164+
let file_size = std::fs::metadata(parquet_path).unwrap().len();
165+
let pfile = PartitionedFile::new(parquet_path.to_str().unwrap().to_owned(), file_size);
166+
167+
let source = Arc::new(ParquetSource::new(schema.clone()));
168+
let scan_config = FileScanConfigBuilder::new(
169+
ObjectStoreUrl::local_filesystem(),
170+
source,
171+
)
172+
.with_file(pfile)
173+
.build();
174+
175+
let scan = DataSourceExec::from_data_source(scan_config);
176+
177+
let hash_expr = vec![col(partition_col, &schema).unwrap()];
178+
let repartition = Arc::new(
179+
RepartitionExec::try_new(scan, Partitioning::Hash(hash_expr, NUM_PARTITIONS))
180+
.unwrap(),
181+
);
182+
let coalesce = Arc::new(CoalescePartitionsExec::new(repartition));
183+
184+
rt.block_on(async {
185+
let mut stream = coalesce.execute(0, Arc::clone(&task_ctx)).unwrap();
186+
while let Some(batch) = stream.next().await {
187+
std::hint::black_box(batch.unwrap());
188+
}
189+
});
190+
}
191+
192+
fn bench_repartition(c: &mut Criterion) {
193+
let rt = Runtime::new().unwrap();
194+
let session_ctx = SessionContext::new();
195+
let task_ctx = session_ctx.task_ctx();
196+
197+
let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
198+
.join("benches")
199+
.join("data");
200+
201+
let dict_path = data_dir.join("access_log_dict.parquet");
202+
let plain_path = data_dir.join("access_log_plain.parquet");
203+
204+
// --- one-time setup: write both parquet files ---
205+
206+
// High-cardinality generator: ~90% distinct pod values per batch.
207+
// pods_per_host=500..600 gives hundreds of distinct pods per host.
208+
// entries_per_container=1..2 means each pod/container combo contributes
209+
// only 1 row, so a batch of 8192 rows sees ~8000+ distinct pod values.
210+
let high_card_gen = || {
211+
AccessLogGenerator::new()
212+
.with_row_limit(NUM_ROWS)
213+
.with_max_batch_size(BATCH_SIZE)
214+
.with_pods_per_host(500..600)
215+
.with_entries_per_container(1..2)
216+
};
217+
218+
// Dict file — native output of AccessLogGenerator
219+
let dict_schema = ensure_parquet(&dict_path, "dict", high_card_gen());
220+
221+
// Plain file — same data with dict columns cast to Utf8
222+
let p_schema = plain_schema(&dict_schema);
223+
let p_schema_clone = p_schema.clone();
224+
ensure_parquet(
225+
&plain_path,
226+
"plain",
227+
high_card_gen().map(move |b| batch_dict_to_plain(&b, &p_schema_clone)),
228+
);
229+
230+
let mut group = c.benchmark_group("repartition");
231+
group.sample_size(10);
232+
233+
// -----------------------------------------------------------------------
234+
// Dictionary(Int32, Utf8) partition key
235+
// -----------------------------------------------------------------------
236+
group.bench_function("dict_pod_key", |b| {
237+
b.iter_custom(|iters| {
238+
let mut total = std::time::Duration::ZERO;
239+
for _ in 0..iters {
240+
let start = Instant::now();
241+
run_repartition(
242+
&rt,
243+
Arc::clone(&task_ctx),
244+
dict_schema.clone(),
245+
&dict_path,
246+
"pod",
247+
);
248+
total += start.elapsed();
249+
}
250+
total
251+
})
252+
});
253+
254+
// -----------------------------------------------------------------------
255+
// Plain Utf8 partition key
256+
// -----------------------------------------------------------------------
257+
group.bench_function("plain_pod_key", |b| {
258+
b.iter_custom(|iters| {
259+
let mut total = std::time::Duration::ZERO;
260+
for _ in 0..iters {
261+
let start = Instant::now();
262+
run_repartition(
263+
&rt,
264+
Arc::clone(&task_ctx),
265+
p_schema.clone(),
266+
&plain_path,
267+
"pod",
268+
);
269+
total += start.elapsed();
270+
}
271+
total
272+
})
273+
});
274+
275+
group.finish();
276+
}
277+
278+
criterion_group!(benches, bench_repartition);
279+
criterion_main!(benches);

profile.json.gz

1.04 MB
Binary file not shown.

0 commit comments

Comments
 (0)