Skip to content

Commit 1a2f8dc

Browse files
committed
Addressed in latest PR
1 parent 91e2904 commit 1a2f8dc

File tree

1 file changed

+4
-9
lines changed

1 file changed

+4
-9
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,7 @@ fn add_merge_on_top(
951951
let new_plan = if let Some(req) = input.plan.output_ordering() {
952952
Arc::new(
953953
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
954-
.with_fetch(*fetch),
954+
.with_fetch(fetch.take()),
955955
) as _
956956
} else {
957957
// If there is no input order, we can simply coalesce partitions:
@@ -1223,7 +1223,7 @@ pub fn ensure_distribution(
12231223
children,
12241224
},
12251225
mut fetch,
1226-
_spm,
1226+
spm,
12271227
) = remove_dist_changing_operators(dist_context)?;
12281228

12291229
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
@@ -1408,13 +1408,8 @@ pub fn ensure_distribution(
14081408
// It was removed by `remove_dist_changing_operators`
14091409
// and we need to add it back.
14101410
if fetch.is_some() {
1411-
// We can make sure that `plan` has an ordering because
1412-
// `SortPreservingMergeExec` requires ordering to be constructed.
1413-
// If there is no ordering, `SortPreservingMergeExec::new` will panic
1414-
let ordering = plan.output_ordering().cloned().unwrap();
1415-
let plan = Arc::new(
1416-
SortPreservingMergeExec::new(ordering, plan).with_fetch(fetch.take()),
1417-
);
1411+
// It's safe to unwrap because `spm` is set only if `fetch` is set.
1412+
let plan = spm.unwrap().with_fetch(fetch.take()).unwrap();
14181413
optimized_distribution_ctx =
14191414
DistributionContext::new(plan, data, vec![optimized_distribution_ctx]);
14201415
}

0 commit comments

Comments
 (0)