Skip to content

Commit 5310494

Browse files
authored
[fix](local shuffle) fix bucket shuffle + set operation + local shuffle compute wrong result (#60823)
fix bucket shuffle + set operation + local shuffle compute wrong result, because backend can not plan the correct local shuffle type, introduced by #59006. so we should disable bucket shuffle for set operation now, after support plan local shuffle in frontend, we can support this feature.
1 parent e8bc244 commit 5310494

File tree

7 files changed

+228
-211
lines changed

7 files changed

+228
-211
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2418,13 +2418,14 @@ public PlanFragment visitPhysicalSetOperation(
24182418
setOperationNode.setColocate(true);
24192419
}
24202420

2421-
for (Plan child : setOperation.children()) {
2422-
PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
2423-
if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
2424-
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
2425-
break;
2426-
}
2427-
}
2421+
// TODO: open comment when support `enable_local_shuffle_planner`
2422+
// for (Plan child : setOperation.children()) {
2423+
// PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
2424+
// if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
2425+
// setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
2426+
// break;
2427+
// }
2428+
// }
24282429

24292430
return setOperationFragment;
24302431
}

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.doris.nereids.trees.expressions.SlotReference;
3131
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
3232
import org.apache.doris.nereids.trees.plans.Plan;
33-
import org.apache.doris.nereids.trees.plans.algebra.Union;
3433
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
3534
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
3635
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
@@ -74,11 +73,9 @@
7473
import com.google.common.collect.Maps;
7574
import com.google.common.collect.Sets;
7675

77-
import java.util.ArrayList;
7876
import java.util.Arrays;
7977
import java.util.Collections;
8078
import java.util.HashSet;
81-
import java.util.LinkedHashMap;
8279
import java.util.List;
8380
import java.util.Map;
8481
import java.util.Objects;
@@ -459,52 +456,53 @@ public PhysicalProperties visitPhysicalSetOperation(PhysicalSetOperation setOper
459456
return PhysicalProperties.GATHER;
460457
}
461458

462-
int distributeToChildIndex
463-
= setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
464-
if (distributeToChildIndex >= 0
465-
&& childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) {
466-
DistributionSpecHash childDistribution
467-
= (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex);
468-
List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
469-
Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
470-
for (int j = 0; j < childToIndex.size(); j++) {
471-
idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
472-
}
473-
474-
List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns();
475-
List<ExprId> setOperationDistributeColumnIds = new ArrayList<>();
476-
for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
477-
Integer index = idToOutputIndex.get(tableDistributeColumnId);
478-
if (index == null) {
479-
break;
480-
}
481-
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
482-
}
483-
// check whether the set operation output all distribution columns of the child
484-
if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) {
485-
boolean isUnion = setOperation instanceof Union;
486-
boolean shuffleToRight = distributeToChildIndex > 0;
487-
if (!isUnion && shuffleToRight) {
488-
return new PhysicalProperties(
489-
new DistributionSpecHash(
490-
setOperationDistributeColumnIds,
491-
ShuffleType.EXECUTION_BUCKETED
492-
)
493-
);
494-
} else {
495-
// keep the distribution as the child
496-
return new PhysicalProperties(
497-
new DistributionSpecHash(
498-
setOperationDistributeColumnIds,
499-
childDistribution.getShuffleType(),
500-
childDistribution.getTableId(),
501-
childDistribution.getSelectedIndexId(),
502-
childDistribution.getPartitionIds()
503-
)
504-
);
505-
}
506-
}
507-
}
459+
// TODO: open comment when support `enable_local_shuffle_planner`
460+
// int distributeToChildIndex
461+
// = setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
462+
// if (distributeToChildIndex >= 0
463+
// && childrenDistribution.get(distributeToChildIndex) instanceof DistributionSpecHash) {
464+
// DistributionSpecHash childDistribution
465+
// = (DistributionSpecHash) childrenDistribution.get(distributeToChildIndex);
466+
// List<SlotReference> childToIndex = setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
467+
// Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
468+
// for (int j = 0; j < childToIndex.size(); j++) {
469+
// idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
470+
// }
471+
//
472+
// List<ExprId> orderedShuffledColumns = childDistribution.getOrderedShuffledColumns();
473+
// List<ExprId> setOperationDistributeColumnIds = new ArrayList<>();
474+
// for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
475+
// Integer index = idToOutputIndex.get(tableDistributeColumnId);
476+
// if (index == null) {
477+
// break;
478+
// }
479+
// setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
480+
// }
481+
// // check whether the set operation output all distribution columns of the child
482+
// if (setOperationDistributeColumnIds.size() == orderedShuffledColumns.size()) {
483+
// boolean isUnion = setOperation instanceof Union;
484+
// boolean shuffleToRight = distributeToChildIndex > 0;
485+
// if (!isUnion && shuffleToRight) {
486+
// return new PhysicalProperties(
487+
// new DistributionSpecHash(
488+
// setOperationDistributeColumnIds,
489+
// ShuffleType.EXECUTION_BUCKETED
490+
// )
491+
// );
492+
// } else {
493+
// // keep the distribution as the child
494+
// return new PhysicalProperties(
495+
// new DistributionSpecHash(
496+
// setOperationDistributeColumnIds,
497+
// childDistribution.getShuffleType(),
498+
// childDistribution.getTableId(),
499+
// childDistribution.getSelectedIndexId(),
500+
// childDistribution.getPartitionIds()
501+
// )
502+
// );
503+
// }
504+
// }
505+
// }
508506

509507
for (int i = 0; i < childrenDistribution.size(); i++) {
510508
DistributionSpec childDistribution = childrenDistribution.get(i);

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java

Lines changed: 69 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555

5656
import com.google.common.base.Preconditions;
5757
import com.google.common.collect.ImmutableList;
58-
import com.google.common.collect.ImmutableSet;
5958
import com.google.common.collect.Lists;
6059
import org.apache.logging.log4j.LogManager;
6160
import org.apache.logging.log4j.Logger;
@@ -626,80 +625,83 @@ public List<List<PhysicalProperties>> visitPhysicalSetOperation(PhysicalSetOpera
626625
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
627626
// TODO: should use the most common hash spec as basic
628627
DistributionSpecHash basic = (DistributionSpecHash) requiredDistributionSpec;
629-
int bucketShuffleBasicIndex = -1;
630-
double basicRowCount = -1;
628+
// TODO: open comment when support `enable_local_shuffle_planner`
629+
// int bucketShuffleBasicIndex = -1;
630+
// double basicRowCount = -1;
631631

632632
// find the bucket shuffle basic index
633-
try {
634-
ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of(
635-
ShuffleType.NATURAL,
636-
ShuffleType.STORAGE_BUCKETED
637-
);
638-
for (int i = 0; i < originChildrenProperties.size(); i++) {
639-
PhysicalProperties originChildrenProperty = originChildrenProperties.get(i);
640-
DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec();
641-
if (childDistribution instanceof DistributionSpecHash
642-
&& supportBucketShuffleTypes.contains(
643-
((DistributionSpecHash) childDistribution).getShuffleType())
644-
&& !(isBucketShuffleDownGrade(setOperation.child(i)))) {
645-
Statistics stats = setOperation.child(i).getStats();
646-
double rowCount = stats.getRowCount();
647-
if (rowCount > basicRowCount) {
648-
basicRowCount = rowCount;
649-
bucketShuffleBasicIndex = i;
650-
}
651-
}
652-
}
653-
} catch (Throwable t) {
654-
// catch stats exception
655-
LOG.warn("Can not find the most (bucket num, rowCount): " + t, t);
656-
bucketShuffleBasicIndex = -1;
657-
}
633+
// try {
634+
// ImmutableSet<ShuffleType> supportBucketShuffleTypes = ImmutableSet.of(
635+
// ShuffleType.NATURAL,
636+
// ShuffleType.STORAGE_BUCKETED
637+
// );
638+
// for (int i = 0; i < originChildrenProperties.size(); i++) {
639+
// PhysicalProperties originChildrenProperty = originChildrenProperties.get(i);
640+
// DistributionSpec childDistribution = originChildrenProperty.getDistributionSpec();
641+
// if (childDistribution instanceof DistributionSpecHash
642+
// && supportBucketShuffleTypes.contains(
643+
// ((DistributionSpecHash) childDistribution).getShuffleType())
644+
// && !(isBucketShuffleDownGrade(setOperation.child(i)))) {
645+
// Statistics stats = setOperation.child(i).getStats();
646+
// double rowCount = stats.getRowCount();
647+
// if (rowCount > basicRowCount) {
648+
// basicRowCount = rowCount;
649+
// bucketShuffleBasicIndex = i;
650+
// }
651+
// }
652+
// }
653+
// } catch (Throwable t) {
654+
// // catch stats exception
655+
// LOG.warn("Can not find the most (bucket num, rowCount): " + t, t);
656+
// bucketShuffleBasicIndex = -1;
657+
// }
658658

659659
// use bucket shuffle
660-
if (bucketShuffleBasicIndex >= 0) {
661-
DistributionSpecHash notShuffleSideRequire
662-
= (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex).getDistributionSpec();
663-
664-
DistributionSpecHash notNeedShuffleOutput
665-
= (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex)
666-
.getDistributionSpec();
667-
668-
for (int i = 0; i < originChildrenProperties.size(); i++) {
669-
DistributionSpecHash current
670-
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
671-
if (i == bucketShuffleBasicIndex) {
672-
continue;
673-
}
674-
675-
DistributionSpecHash currentRequire
676-
= (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec();
677-
678-
PhysicalProperties target = calAnotherSideRequired(
679-
ShuffleType.STORAGE_BUCKETED,
680-
notNeedShuffleOutput, current,
681-
notShuffleSideRequire,
682-
currentRequire);
683-
updateChildEnforceAndCost(i, target);
684-
}
685-
setOperation.setMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex);
660+
// if (bucketShuffleBasicIndex >= 0) {
661+
// DistributionSpecHash notShuffleSideRequire
662+
// = (DistributionSpecHash) requiredProperties.get(bucketShuffleBasicIndex)
663+
// .getDistributionSpec();
664+
//
665+
// DistributionSpecHash notNeedShuffleOutput
666+
// = (DistributionSpecHash) originChildrenProperties.get(bucketShuffleBasicIndex)
667+
// .getDistributionSpec();
668+
//
669+
// for (int i = 0; i < originChildrenProperties.size(); i++) {
670+
// DistributionSpecHash current
671+
// = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
672+
// if (i == bucketShuffleBasicIndex) {
673+
// continue;
674+
// }
675+
//
676+
// DistributionSpecHash currentRequire
677+
// = (DistributionSpecHash) requiredProperties.get(i).getDistributionSpec();
678+
//
679+
// PhysicalProperties target = calAnotherSideRequired(
680+
// ShuffleType.STORAGE_BUCKETED,
681+
// notNeedShuffleOutput, current,
682+
// notShuffleSideRequire,
683+
// currentRequire);
684+
// updateChildEnforceAndCost(i, target);
685+
// }
686+
// setOperation.setMutableState(
687+
// PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, bucketShuffleBasicIndex);
686688
// use partitioned shuffle
687-
} else {
688-
for (int i = 0; i < originChildrenProperties.size(); i++) {
689-
DistributionSpecHash current
690-
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
691-
if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
692-
|| !bothSideShuffleKeysAreSameOrder(basic, current,
689+
// } else {
690+
for (int i = 0; i < originChildrenProperties.size(); i++) {
691+
DistributionSpecHash current
692+
= (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec();
693+
if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
694+
|| !bothSideShuffleKeysAreSameOrder(basic, current,
695+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
696+
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) {
697+
PhysicalProperties target = calAnotherSideRequired(
698+
ShuffleType.EXECUTION_BUCKETED, basic, current,
693699
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
694-
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec())) {
695-
PhysicalProperties target = calAnotherSideRequired(
696-
ShuffleType.EXECUTION_BUCKETED, basic, current,
697-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
698-
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec());
699-
updateChildEnforceAndCost(i, target);
700-
}
700+
(DistributionSpecHash) requiredProperties.get(i).getDistributionSpec());
701+
updateChildEnforceAndCost(i, target);
701702
}
702703
}
704+
// }
703705
}
704706
return ImmutableList.of(originChildrenProperties);
705707
}

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,14 @@ public Void visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
307307
// shuffle all column
308308
// TODO: for wide table, may be we should add a upper limit of shuffle columns
309309

310+
// TODO: open comment when support `enable_local_shuffle_planner` and change to REQUIRE
310311
// intersect/except always need hash distribution, we use REQUIRE to auto select
311312
// bucket shuffle or execution shuffle
312313
addRequestPropertyToChildren(setOperation.getRegularChildrenOutputs().stream()
313314
.map(childOutputs -> childOutputs.stream()
314315
.map(SlotReference::getExprId)
315316
.collect(ImmutableList.toImmutableList()))
316-
.map(l -> PhysicalProperties.createHash(l, ShuffleType.REQUIRE))
317+
.map(l -> PhysicalProperties.createHash(l, ShuffleType.EXECUTION_BUCKETED))
317318
.collect(Collectors.toList()));
318319
}
319320
return null;

0 commit comments

Comments
 (0)