@@ -138,8 +138,35 @@ fn update_sort_ctx_children_data(
138138/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
139139/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
140140/// connected to a `CoalescePartitionsExec` via its children.
141+ ///
142+ /// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
143+ ///
144+ /// This requires a bottom-up traversal was previously performed, updating the
145+ /// children previously.
141146pub type PlanWithCorrespondingCoalescePartitions = PlanContext < bool > ;
142147
148+ /// Discovers the linked Coalesce->Sort cascades.
149+ ///
150+ /// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
151+ /// remove the linked coalesces in the subplan. Then afterwards, an SPM is added
152+ /// at the root of the subplan (just after the sort) in order to parallelize sorts.
153+ /// Refer to the [`parallelize_sorts`] for more details on sort parallelization.
154+ ///
155+ /// Example of linked Coalesce->Sort:
156+ /// ```text
157+ /// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
158+ /// ...nodes... ctx.data=true (e.g. are linked in cascade)
159+ /// Coalesce ctx.data=true (e.g. is a coalesce)
160+ /// ```
161+ ///
162+ /// The link should not be continued (and the coalesce not removed) if the distribution
163+ /// is changed between the Coalesce->Sort cascade. Example:
164+ /// ```text
165+ /// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
166+ /// AggregateExec ctx.data=false, to stop the link
167+ /// ...nodes... ctx.data=true (e.g. are linked in cascade)
168+ /// Coalesce ctx.data=true (e.g. is a coalesce)
169+ /// ```
143170fn update_coalesce_ctx_children (
144171 coalesce_context : & mut PlanWithCorrespondingCoalescePartitions ,
145172) {
@@ -316,8 +343,19 @@ fn replace_with_partial_sort(
316343/// are transformed into
317344/// ```text
318345/// "SortPreservingMergeExec: \[a@0 ASC\]",
319- /// " ...nodes..."
320- /// " SortExec: expr=\[a@0 ASC\]",
346+ /// " SortExec: expr=\[a@0 ASC\]",
347+ /// " ...nodes..."
348+ /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
349+ /// ```
350+ /// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
351+ /// By performing sorting in parallel, we can increase performance in some scenarios.
352+ ///
353+ /// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
354+ /// which require single partitioning. Do not parallelize when the following scenario occurs:
355+ /// ```text
356+ /// "SortExec: expr=\[a@0 ASC\]",
357+ /// " ...nodes requiring single partitioning..."
358+ /// " CoalescePartitionsExec",
321359/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
322360/// ```
323361pub fn parallelize_sorts (
0 commit comments