Skip to content

Commit 63dd4e2

Browse files
authored
Automatically split large single RecordBatches in MemorySource into smaller batches (#16734)
- **Config Options** - Added `batch_split` execution option (enabled by default). - Made batch size configurable with default value `8192`. - **Execution Plan Enhancements** - Introduced `BatchSplitStream` to split large record batches before passing them downstream. - Integrated `BatchSplitStream` within `DataSourceExec`. - **Metrics** - Added `SplitMetrics` with `batches_splitted` counter. - Exposed split metrics via the execution plan's metrics system. - **Testing** - Added new integration tests for batch splitting scenarios in `datasource_split.rs`: - Large batches split into multiple parts. - Exact batch size yields no split. - Small batches pass through unchanged. - Empty batches handled correctly. - Multiple empty batches processed safely. - Added unit tests for `BatchSplitStream` behavior. - Updated SQL logic tests (`.slt` files) to reflect changed execution plans and batch sizes. - **SQL Logic Test Adjustments** - Updated expected physical plans and outputs across various `.slt` files. - Changed default execution batch size in tests for consistency.
1 parent 6abc162 commit 63dd4e2

File tree

12 files changed

+468
-133
lines changed

12 files changed

+468
-133
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,7 +2026,6 @@ mod tests {
20262026
#[tokio::test]
20272027
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
20282028
let mut config_map: HashMap<String, String> = HashMap::new();
2029-
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
20302029
config_map.insert(
20312030
"datafusion.execution.soft_max_rows_per_output_file".into(),
20322031
"10".into(),
@@ -2091,7 +2090,7 @@ mod tests {
20912090
"datafusion.execution.parquet.write_batch_size".into(),
20922091
"5".into(),
20932092
);
2094-
config_map.insert("datafusion.execution.batch_size".into(), "1".into());
2093+
config_map.insert("datafusion.execution.batch_size".into(), "10".into());
20952094
helper_test_append_new_files_to_table(
20962095
ParquetFormat::default().get_ext(),
20972096
FileCompressionType::UNCOMPRESSED,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::{
19+
array::{ArrayRef, Int32Array},
20+
datatypes::{DataType, Field, Schema},
21+
record_batch::RecordBatch,
22+
};
23+
use datafusion_datasource::memory::MemorySourceConfig;
24+
use datafusion_execution::TaskContext;
25+
use datafusion_physical_plan::{common::collect, ExecutionPlan};
26+
use std::sync::Arc;
27+
28+
/// Helper function to create a memory source with the given batch size and collect all batches
29+
async fn create_and_collect_batches(
30+
batch_size: usize,
31+
) -> datafusion_common::Result<Vec<RecordBatch>> {
32+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
33+
let array = Int32Array::from_iter_values(0..batch_size as i32);
34+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array) as ArrayRef])?;
35+
let exec = MemorySourceConfig::try_new_exec(&[vec![batch]], schema, None)?;
36+
let ctx = Arc::new(TaskContext::default());
37+
let stream = exec.execute(0, ctx)?;
38+
collect(stream).await
39+
}
40+
41+
/// Helper function to create a memory source with multiple batches and collect all results
42+
async fn create_and_collect_multiple_batches(
43+
input_batches: Vec<RecordBatch>,
44+
) -> datafusion_common::Result<Vec<RecordBatch>> {
45+
let schema = input_batches[0].schema();
46+
let exec = MemorySourceConfig::try_new_exec(&[input_batches], schema, None)?;
47+
let ctx = Arc::new(TaskContext::default());
48+
let stream = exec.execute(0, ctx)?;
49+
collect(stream).await
50+
}
51+
52+
#[tokio::test]
53+
async fn datasource_splits_large_batches() -> datafusion_common::Result<()> {
54+
let batch_size = 20000;
55+
let batches = create_and_collect_batches(batch_size).await?;
56+
57+
assert!(batches.len() > 1);
58+
let max = batches.iter().map(|b| b.num_rows()).max().unwrap();
59+
assert!(
60+
max <= datafusion_execution::config::SessionConfig::new()
61+
.options()
62+
.execution
63+
.batch_size
64+
);
65+
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
66+
assert_eq!(total, batch_size);
67+
Ok(())
68+
}
69+
70+
#[tokio::test]
71+
async fn datasource_exact_batch_size_no_split() -> datafusion_common::Result<()> {
72+
let session_config = datafusion_execution::config::SessionConfig::new();
73+
let configured_batch_size = session_config.options().execution.batch_size;
74+
75+
let batches = create_and_collect_batches(configured_batch_size).await?;
76+
77+
// Should not split when exactly equal to batch_size
78+
assert_eq!(batches.len(), 1);
79+
assert_eq!(batches[0].num_rows(), configured_batch_size);
80+
Ok(())
81+
}
82+
83+
#[tokio::test]
84+
async fn datasource_small_batch_no_split() -> datafusion_common::Result<()> {
85+
// Test with batch smaller than the batch size (8192)
86+
let small_batch_size = 512; // Less than 8192
87+
88+
let batches = create_and_collect_batches(small_batch_size).await?;
89+
90+
// Should not split small batches below the batch size
91+
assert_eq!(batches.len(), 1);
92+
assert_eq!(batches[0].num_rows(), small_batch_size);
93+
Ok(())
94+
}
95+
96+
#[tokio::test]
97+
async fn datasource_empty_batch_clean_termination() -> datafusion_common::Result<()> {
98+
let batches = create_and_collect_batches(0).await?;
99+
100+
// Empty batch should result in one empty batch
101+
assert_eq!(batches.len(), 1);
102+
assert_eq!(batches[0].num_rows(), 0);
103+
Ok(())
104+
}
105+
106+
#[tokio::test]
107+
async fn datasource_multiple_empty_batches() -> datafusion_common::Result<()> {
108+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
109+
let empty_array = Int32Array::from_iter_values(std::iter::empty::<i32>());
110+
let empty_batch =
111+
RecordBatch::try_new(schema.clone(), vec![Arc::new(empty_array) as ArrayRef])?;
112+
113+
// Create multiple empty batches
114+
let input_batches = vec![empty_batch.clone(), empty_batch.clone(), empty_batch];
115+
let batches = create_and_collect_multiple_batches(input_batches).await?;
116+
117+
// Should preserve empty batches without issues
118+
assert_eq!(batches.len(), 3);
119+
for batch in &batches {
120+
assert_eq!(batch.num_rows(), 0);
121+
}
122+
Ok(())
123+
}

datafusion/core/tests/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
// under the License.
1717

1818
mod coop;
19+
mod datasource_split;
1920
mod logical_plan;

datafusion/datasource/src/source.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ use std::sync::Arc;
2525
use datafusion_physical_plan::execution_plan::{
2626
Boundedness, EmissionType, SchedulingType,
2727
};
28+
use datafusion_physical_plan::metrics::SplitMetrics;
2829
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2930
use datafusion_physical_plan::projection::ProjectionExec;
31+
use datafusion_physical_plan::stream::BatchSplitStream;
3032
use datafusion_physical_plan::{
3133
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
3234
};
@@ -267,7 +269,18 @@ impl ExecutionPlan for DataSourceExec {
267269
partition: usize,
268270
context: Arc<TaskContext>,
269271
) -> Result<SendableRecordBatchStream> {
270-
self.data_source.open(partition, Arc::clone(&context))
272+
let stream = self.data_source.open(partition, Arc::clone(&context))?;
273+
let batch_size = context.session_config().batch_size();
274+
log::debug!(
275+
"Batch splitting enabled for partition {partition}: batch_size={batch_size}"
276+
);
277+
let metrics = self.data_source.metrics();
278+
let split_metrics = SplitMetrics::new(&metrics, partition);
279+
Ok(Box::pin(BatchSplitStream::new(
280+
stream,
281+
batch_size,
282+
split_metrics,
283+
)))
271284
}
272285

273286
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,23 @@ impl SpillMetrics {
169169
}
170170
}
171171

172+
/// Metrics for tracking [`crate::stream::BatchSplitStream`] activity
173+
#[derive(Debug, Clone)]
174+
pub struct SplitMetrics {
175+
/// Number of times an input [`RecordBatch`] was split
176+
pub batches_splitted: Count,
177+
}
178+
179+
impl SplitMetrics {
180+
/// Create a new [`SplitMetrics`]
181+
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
182+
Self {
183+
batches_splitted: MetricBuilder::new(metrics)
184+
.counter("batches_splitted", partition),
185+
}
186+
}
187+
}
188+
172189
/// Trait for things that produce output rows as a result of execution.
173190
pub trait RecordOutput {
174191
/// Record that some number of output rows have been produced

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::{
3232
use datafusion_common::HashMap;
3333

3434
// public exports
35-
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics};
35+
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
3636
pub use builder::MetricBuilder;
3737
pub use custom::CustomMetricValue;
3838
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};

0 commit comments

Comments
 (0)