Skip to content

Commit 8a5dd07

Browse files
committed
Profile AggregateExec
1 parent 5b7db3b commit 8a5dd07

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ name = "distinct_query_sql"
206206
harness = false
207207
name = "push_down_filter"
208208

209+
[[bench]]
210+
harness = false
211+
name = "aggregate_dict"
212+
209213
[[bench]]
210214
harness = false
211215
name = "repartition_dict"
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for AggregateExec with dictionary-encoded group keys.
19+
//!
20+
//! Isolates the RowConverter overhead when grouping on a
21+
//! Dictionary(Int32, Utf8) column vs plain Utf8.
22+
//!
23+
//! Builds physical plans by hand — no SQL, no optimizer:
24+
//!
25+
//! AggregateExec (Single, GROUP BY pod, COUNT(*))
26+
//! DataSourceExec (ParquetSource, streams from disk)
27+
//!
28+
//! Reuses the parquet files written by repartition_dict.rs:
29+
//! benches/data/access_log_dict.parquet — Dictionary(Int32, Utf8) columns
30+
//! benches/data/access_log_plain.parquet — same data, dict columns cast to Utf8
31+
//!
32+
//! Run:
33+
//! cargo bench -p datafusion --bench aggregate_dict
34+
//!
35+
//! Profile with samply:
36+
//! Step 1 — build (once):
37+
//! RUSTFLAGS="-C force-frame-pointers=yes" \
38+
//! cargo bench -p datafusion --bench aggregate_dict \
39+
//! --profile profiling --no-run
40+
//!
41+
//! Step 2 — record (run directly against the binary):
42+
//! samply record ./target/profiling/deps/aggregate_dict-<HASH> \
43+
//! --bench "dict_pod_key" --profile-time 30
44+
45+
use std::fs::File;
46+
use std::path::PathBuf;
47+
use std::sync::Arc;
48+
use std::time::Instant;
49+
50+
use arrow::array::ArrayRef;
51+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52+
use arrow::record_batch::RecordBatch;
53+
use criterion::{criterion_group, criterion_main, Criterion};
54+
use datafusion::execution::context::SessionContext;
55+
use datafusion::physical_plan::ExecutionPlan;
56+
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
57+
use datafusion_datasource::source::DataSourceExec;
58+
use datafusion_datasource::PartitionedFile;
59+
use datafusion_datasource_parquet::source::ParquetSource;
60+
use datafusion_execution::object_store::ObjectStoreUrl;
61+
use datafusion_functions_aggregate::count::count_udaf;
62+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
63+
use datafusion_physical_expr::expressions::{col, lit};
64+
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
65+
use futures::StreamExt;
66+
use parquet::arrow::ArrowWriter;
67+
use parquet::file::properties::WriterProperties;
68+
use test_utils::AccessLogGenerator;
69+
use tokio::runtime::Runtime;
70+
71+
/// Total rows written to each parquet file.
72+
/// NOTE: kept below 150M to avoid RowConverter u32 offset overflow with
73+
/// high-cardinality string group keys (~13 bytes/row * 150M > u32::MAX).
74+
const NUM_ROWS: usize = 10_000_000;
75+
76+
/// Rows per batch during generation.
77+
const BATCH_SIZE: usize = 8_192;
78+
79+
/// Write an iterator of RecordBatches to a parquet file.
80+
/// Skips writing if the file already exists.
81+
fn ensure_parquet(
82+
path: &PathBuf,
83+
label: &str,
84+
batches: impl Iterator<Item = RecordBatch>,
85+
) -> SchemaRef {
86+
if path.exists() {
87+
println!("Reusing {label} parquet: {}", path.display());
88+
let file = File::open(path).unwrap();
89+
let reader =
90+
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(file)
91+
.unwrap();
92+
return reader.schema().clone();
93+
}
94+
95+
println!("Writing {label} parquet ({NUM_ROWS} rows) → {} …", path.display());
96+
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
97+
98+
let file = File::create(path).unwrap();
99+
let props = WriterProperties::builder().build();
100+
let mut writer: Option<ArrowWriter<File>> = None;
101+
let mut schema: Option<SchemaRef> = None;
102+
103+
for batch in batches {
104+
if writer.is_none() {
105+
schema = Some(batch.schema());
106+
writer = Some(
107+
ArrowWriter::try_new(
108+
file.try_clone().unwrap(),
109+
batch.schema(),
110+
Some(props.clone()),
111+
)
112+
.unwrap(),
113+
);
114+
}
115+
writer.as_mut().unwrap().write(&batch).unwrap();
116+
}
117+
writer.unwrap().close().unwrap();
118+
println!("Done.");
119+
schema.unwrap()
120+
}
121+
122+
/// Cast all Dictionary(Int32, Utf8) columns in a batch to plain Utf8.
123+
fn batch_dict_to_plain(batch: &RecordBatch, plain_schema: &SchemaRef) -> RecordBatch {
124+
let new_cols: Vec<ArrayRef> = batch
125+
.columns()
126+
.iter()
127+
.zip(batch.schema().fields().iter())
128+
.map(|(arr, field)| match field.data_type() {
129+
DataType::Dictionary(_, _) => Arc::new(
130+
arrow::compute::cast(arr, &DataType::Utf8).unwrap(),
131+
) as ArrayRef,
132+
_ => Arc::clone(arr),
133+
})
134+
.collect();
135+
RecordBatch::try_new(plain_schema.clone(), new_cols).unwrap()
136+
}
137+
138+
/// Derive a plain-Utf8 schema from a dict schema.
139+
fn plain_schema(dict_schema: &SchemaRef) -> SchemaRef {
140+
let fields: Vec<Field> = dict_schema
141+
.fields()
142+
.iter()
143+
.map(|f| match f.data_type() {
144+
DataType::Dictionary(_, _) => {
145+
Field::new(f.name(), DataType::Utf8, f.is_nullable())
146+
}
147+
_ => f.as_ref().clone(),
148+
})
149+
.collect();
150+
Arc::new(Schema::new(fields))
151+
}
152+
153+
/// Build and execute:
154+
/// AggregateExec (Single, GROUP BY group_col, COUNT(*))
155+
/// DataSourceExec (ParquetSource)
156+
fn run_aggregate(
157+
rt: &Runtime,
158+
task_ctx: Arc<datafusion::execution::TaskContext>,
159+
schema: SchemaRef,
160+
parquet_path: &PathBuf,
161+
group_col: &str,
162+
) {
163+
let file_size = std::fs::metadata(parquet_path).unwrap().len();
164+
let pfile = PartitionedFile::new(parquet_path.to_str().unwrap().to_owned(), file_size);
165+
166+
let source = Arc::new(ParquetSource::new(schema.clone()));
167+
let scan_config = FileScanConfigBuilder::new(
168+
ObjectStoreUrl::local_filesystem(),
169+
source,
170+
)
171+
.with_file(pfile)
172+
.build();
173+
174+
let scan = DataSourceExec::from_data_source(scan_config);
175+
176+
let group_by = PhysicalGroupBy::new_single(vec![(
177+
col(group_col, &schema).unwrap(),
178+
group_col.to_string(),
179+
)]);
180+
181+
let count_expr = Arc::new(
182+
AggregateExprBuilder::new(count_udaf(), vec![lit(1i64)])
183+
.schema(schema.clone())
184+
.alias("COUNT(*)")
185+
.build()
186+
.unwrap(),
187+
);
188+
189+
let agg = Arc::new(
190+
AggregateExec::try_new(
191+
AggregateMode::Single,
192+
group_by,
193+
vec![count_expr],
194+
vec![None],
195+
scan,
196+
schema,
197+
)
198+
.unwrap(),
199+
);
200+
201+
rt.block_on(async {
202+
let mut stream = agg.execute(0, Arc::clone(&task_ctx)).unwrap();
203+
while let Some(batch) = stream.next().await {
204+
std::hint::black_box(batch.unwrap());
205+
}
206+
});
207+
}
208+
209+
fn bench_aggregate(c: &mut Criterion) {
210+
let rt = Runtime::new().unwrap();
211+
let session_ctx = SessionContext::new();
212+
let task_ctx = session_ctx.task_ctx();
213+
214+
let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
215+
.join("benches")
216+
.join("data");
217+
218+
let dict_path = data_dir.join("access_log_dict_10m.parquet");
219+
let plain_path = data_dir.join("access_log_plain_10m.parquet");
220+
221+
// High-cardinality generator: ~90% distinct pod values per batch.
222+
// pods_per_host=500..600 gives hundreds of distinct pods per host.
223+
// entries_per_container=1..2 means each pod/container combo contributes
224+
// only 1 row, so a batch of 8192 rows sees ~8000+ distinct pod values.
225+
let high_card_gen = || {
226+
AccessLogGenerator::new()
227+
.with_row_limit(NUM_ROWS)
228+
.with_max_batch_size(BATCH_SIZE)
229+
.with_pods_per_host(500..600)
230+
.with_entries_per_container(1..2)
231+
};
232+
233+
// Dict file — native output of AccessLogGenerator (reused from repartition_dict if exists)
234+
let dict_schema = ensure_parquet(&dict_path, "dict", high_card_gen());
235+
236+
// Plain file — same data with dict columns cast to Utf8
237+
let p_schema = plain_schema(&dict_schema);
238+
let p_schema_clone = p_schema.clone();
239+
ensure_parquet(
240+
&plain_path,
241+
"plain",
242+
high_card_gen().map(move |b| batch_dict_to_plain(&b, &p_schema_clone)),
243+
);
244+
245+
let mut group = c.benchmark_group("aggregate");
246+
group.sample_size(10);
247+
248+
// -----------------------------------------------------------------------
249+
// Dictionary(Int32, Utf8) group key — RowConverter must resolve dict lookups
250+
// -----------------------------------------------------------------------
251+
group.bench_function("dict_pod_key", |b| {
252+
b.iter_custom(|iters| {
253+
let mut total = std::time::Duration::ZERO;
254+
for _ in 0..iters {
255+
let start = Instant::now();
256+
run_aggregate(
257+
&rt,
258+
Arc::clone(&task_ctx),
259+
dict_schema.clone(),
260+
&dict_path,
261+
"pod",
262+
);
263+
total += start.elapsed();
264+
}
265+
total
266+
})
267+
});
268+
269+
// -----------------------------------------------------------------------
270+
// Plain Utf8 group key — RowConverter encodes string bytes directly
271+
// -----------------------------------------------------------------------
272+
group.bench_function("plain_pod_key", |b| {
273+
b.iter_custom(|iters| {
274+
let mut total = std::time::Duration::ZERO;
275+
for _ in 0..iters {
276+
let start = Instant::now();
277+
run_aggregate(
278+
&rt,
279+
Arc::clone(&task_ctx),
280+
p_schema.clone(),
281+
&plain_path,
282+
"pod",
283+
);
284+
total += start.elapsed();
285+
}
286+
total
287+
})
288+
});
289+
290+
group.finish();
291+
}
292+
293+
criterion_group!(benches, bench_aggregate);
294+
criterion_main!(benches);

profile.json.gz

-935 KB
Binary file not shown.

0 commit comments

Comments
 (0)