Skip to content

Commit 17a38a4

Browse files
committed
perf(cube): Add execution.dont_parallelize_sort_preserving_merge_exec_inputs config option
1 parent c228f82 commit 17a38a4

File tree

3 files changed

+16
-1
lines changed

3 files changed

+16
-1
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,12 @@ config_namespace! {
387387
/// We plan to make this the default in the future.
388388
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
389389

390+
/// Cube: If true, makes SortPreservingMergeExec not parallelize merge sort inputs, to save
391+
/// memory consumption of intermediate batches (1 in the mpsc channel buffer, and 1 in the
392+
/// subtask waiting to be pushed onto the buffer). If false, maintains upstream DF
393+
/// behavior.
394+
pub dont_parallelize_sort_preserving_merge_exec_inputs: bool, default = false
395+
390396
/// Should DataFusion enforce batch size in joins or not. By default,
391397
/// DataFusion will not enforce batch size in joins. Enforcing batch size
392398
/// in joins can reduce memory usage when joining large

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,18 @@ impl ExecutionPlan for SortPreservingMergeExec {
297297
}
298298
},
299299
_ => {
300+
// Cube: If true, overrides upstream DF default behavior. The parallelized
301+
// implementation will store one RecordBatch in the mpsc channel, and one more
302+
// RecordBatch in the blocked subtask waiting to be pushed onto the channel.
303+
304+
// Cube TODO: If memory tracking can be made to account for those batches, make use of it (in general).
305+
let dont_parallelize = context.session_config().options().execution.dont_parallelize_sort_preserving_merge_exec_inputs;
306+
300307
let receivers = (0..input_partitions)
301308
.map(|partition| {
302309
let stream =
303310
self.input.execute(partition, Arc::clone(&context))?;
304-
Ok(spawn_buffered(stream, 1))
311+
Ok(if dont_parallelize { stream } else { spawn_buffered(stream, 1) })
305312
})
306313
.collect::<Result<_>>()?;
307314

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ datafusion.catalog.newlines_in_values false
184184
datafusion.execution.batch_size 8192
185185
datafusion.execution.coalesce_batches true
186186
datafusion.execution.collect_statistics false
187+
datafusion.execution.dont_parallelize_sort_preserving_merge_exec_inputs false
187188
datafusion.execution.enable_recursive_ctes true
188189
datafusion.execution.enforce_batch_size_in_joins false
189190
datafusion.execution.keep_partition_by_columns false
@@ -281,6 +282,7 @@ datafusion.catalog.newlines_in_values false Specifies whether newlines in (quote
281282
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
282283
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
283284
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
285+
datafusion.execution.dont_parallelize_sort_preserving_merge_exec_inputs false Cube: If true, makes SortPreservingMergeExec not parallelize merge sort inputs, to save memory consumption of intermediate batches (1 in the mpsc channel buffer, and 1 in the subtask waiting to be pushed onto the buffer). If false, maintains upstream DF behavior.
284286
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
285287
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
286288
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches

0 commit comments

Comments
 (0)