Skip to content

Commit bbe2da7

Browse files
andygroveclaude
andcommitted
test: add benchmark for struct column processing
Add a Criterion benchmark to measure the performance of struct column processing in native shuffle. Tests various struct sizes (5, 10, 20 fields) and row counts (1K, 10K rows). Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 471fb2a commit bbe2da7

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

native/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,7 @@ harness = false
131131
[[bench]]
132132
name = "parquet_decode"
133133
harness = false
134+
135+
[[bench]]
136+
name = "struct_conversion"
137+
harness = false
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmark for struct column processing in native shuffle.
19+
//!
20+
//! This benchmark measures the performance of converting Spark UnsafeRow
21+
//! with struct columns to Arrow arrays.
22+
23+
use arrow::datatypes::{DataType, Field, Fields};
24+
use comet::execution::shuffle::row::{
25+
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
26+
};
27+
use comet::execution::shuffle::CompressionCodec;
28+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
29+
use std::sync::Arc;
30+
use tempfile::Builder;
31+
32+
const BATCH_SIZE: usize = 5000;
33+
34+
/// Create a struct schema with the given number of int64 fields.
35+
fn make_struct_schema(num_fields: usize) -> DataType {
36+
let fields: Vec<Field> = (0..num_fields)
37+
.map(|i| Field::new(format!("f{}", i), DataType::Int64, true))
38+
.collect();
39+
DataType::Struct(Fields::from(fields))
40+
}
41+
42+
/// Calculate the row size for a struct with the given number of fields.
43+
/// UnsafeRow layout: [null bits] [fixed-length values]
44+
/// For struct: the struct value is stored as offset+size (8 bytes) pointing to nested row
45+
fn get_row_size(num_struct_fields: usize) -> usize {
46+
// Top-level row has 1 column (the struct)
47+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
48+
// Struct pointer (offset + size) is 8 bytes
49+
let struct_pointer_size = 8;
50+
// Nested struct row
51+
let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields);
52+
let nested_data_size = num_struct_fields * 8; // int64 values
53+
54+
top_level_bitset_width + struct_pointer_size + nested_bitset_width + nested_data_size
55+
}
56+
57+
struct RowData {
58+
data: Vec<u8>,
59+
}
60+
61+
impl RowData {
62+
fn new(num_struct_fields: usize) -> Self {
63+
let row_size = get_row_size(num_struct_fields);
64+
let mut data = vec![0u8; row_size];
65+
66+
// Top-level row layout:
67+
// [null bits for 1 field] [struct pointer (offset, size)]
68+
let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1);
69+
70+
// Nested struct starts after top-level row header + pointer
71+
let nested_offset = top_level_bitset_width + 8;
72+
let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields);
73+
let nested_size = nested_bitset_width + num_struct_fields * 8;
74+
75+
// Write struct pointer (offset in upper 32 bits, size in lower 32 bits)
76+
let offset_and_size = ((nested_offset as i64) << 32) | (nested_size as i64);
77+
data[top_level_bitset_width..top_level_bitset_width + 8]
78+
.copy_from_slice(&offset_and_size.to_le_bytes());
79+
80+
// Fill nested struct with some data
81+
for i in 0..num_struct_fields {
82+
let value_offset = nested_offset + nested_bitset_width + i * 8;
83+
let value = (i as i64) * 100;
84+
data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes());
85+
}
86+
87+
RowData { data }
88+
}
89+
90+
fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) {
91+
spark_row.point_to_slice(&self.data);
92+
}
93+
}
94+
95+
fn benchmark_struct_conversion(c: &mut Criterion) {
96+
let mut group = c.benchmark_group("struct_conversion");
97+
98+
// Test with different struct sizes and row counts
99+
for num_fields in [5, 10, 20] {
100+
for num_rows in [1000, 10000] {
101+
let schema = vec![make_struct_schema(num_fields)];
102+
103+
// Create row data
104+
let rows: Vec<RowData> = (0..num_rows).map(|_| RowData::new(num_fields)).collect();
105+
106+
let mut spark_rows: Vec<SparkUnsafeRow> = rows
107+
.iter()
108+
.map(|row_data| {
109+
let mut spark_row = SparkUnsafeRow::new_with_num_fields(1);
110+
row_data.to_spark_row(&mut spark_row);
111+
// Mark the struct column as not null
112+
spark_row.set_not_null_at(0);
113+
spark_row
114+
})
115+
.collect();
116+
117+
let mut row_addresses: Vec<i64> = spark_rows
118+
.iter()
119+
.map(|row| row.get_row_addr())
120+
.collect();
121+
let mut row_sizes: Vec<i32> = spark_rows
122+
.iter()
123+
.map(|row| row.get_row_size())
124+
.collect();
125+
126+
let row_address_ptr = row_addresses.as_mut_ptr();
127+
let row_size_ptr = row_sizes.as_mut_ptr();
128+
129+
group.bench_with_input(
130+
BenchmarkId::new(
131+
format!("fields_{}", num_fields),
132+
format!("rows_{}", num_rows),
133+
),
134+
&(num_rows, &schema),
135+
|b, (num_rows, schema)| {
136+
b.iter(|| {
137+
let tempfile = Builder::new().tempfile().unwrap();
138+
139+
process_sorted_row_partition(
140+
*num_rows,
141+
BATCH_SIZE,
142+
row_address_ptr,
143+
row_size_ptr,
144+
schema,
145+
tempfile.path().to_str().unwrap().to_string(),
146+
1.0,
147+
false,
148+
0,
149+
None,
150+
&CompressionCodec::Zstd(1),
151+
)
152+
.unwrap();
153+
});
154+
},
155+
);
156+
157+
// Keep spark_rows alive for the benchmark
158+
std::mem::drop(spark_rows);
159+
}
160+
}
161+
162+
group.finish();
163+
}
164+
165+
fn config() -> Criterion {
166+
Criterion::default()
167+
}
168+
169+
criterion_group! {
170+
name = benches;
171+
config = config();
172+
targets = benchmark_struct_conversion
173+
}
174+
criterion_main!(benches);

0 commit comments

Comments
 (0)