Skip to content

Commit c0b1702

Browse files
committed
opt bucket shuffle down grade
1 parent 3a50f4f commit c0b1702

File tree

1 file changed

+62
-94
lines changed

1 file changed

+62
-94
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java

Lines changed: 62 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ public List<List<PhysicalProperties>> visitPhysicalHashJoin(
370370
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
371371
Optional<PhysicalProperties> updatedForRight = Optional.empty();
372372

373+
boolean shouldCheckLeftBucketDownGrade = false;
374+
boolean shouldCheckrightBucketDownGrade = false;
375+
373376
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
374377
// check colocate join with scan
375378
return ImmutableList.of(originChildrenProperties);
@@ -384,142 +387,85 @@ public List<List<PhysicalProperties>> visitPhysicalHashJoin(
384387
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
385388
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
386389
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
387-
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
388-
updatedForLeft = Optional.of(calAnotherSideRequired(
389-
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
390-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
391-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
392-
updatedForRight = Optional.of(calAnotherSideRequired(
393-
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
394-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
395-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
396-
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
397-
updatedForLeft = Optional.of(calAnotherSideRequired(
398-
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
399-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
400-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
401-
updatedForRight = Optional.of(calAnotherSideRequired(
402-
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, rightHashSpec,
403-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
404-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
405390
} else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
406391
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
392+
shouldCheckLeftBucketDownGrade = true;
407393
updatedForRight = Optional.of(calAnotherSideRequired(
408394
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
409395
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
410396
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
411397
} else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
412398
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
413-
if (SessionVariable.canUseNereidsDistributePlanner()) {
414-
List<PhysicalProperties> shuffleToLeft = Lists.newArrayList(originChildrenProperties);
415-
PhysicalProperties enforceShuffleRight = calAnotherSideRequired(
416-
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
417-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
418-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec());
419-
updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft);
420-
421-
List<PhysicalProperties> shuffleToRight = Lists.newArrayList(originChildrenProperties);
422-
PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
423-
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
424-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
425-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()
426-
);
427-
updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight);
428-
return ImmutableList.of(shuffleToLeft, shuffleToRight);
429-
}
430-
399+
shouldCheckLeftBucketDownGrade = true;
431400
// must add enforce because shuffle algorithm is not same between NATURAL and BUCKETED
432401
updatedForRight = Optional.of(calAnotherSideRequired(
433402
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
434403
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
435404
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
436405
} else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
437406
&& rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED) {
438-
if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
407+
shouldCheckLeftBucketDownGrade = true;
408+
if (!bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
439409
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
440410
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())) {
441-
return ImmutableList.of(originChildrenProperties);
411+
updatedForRight = Optional.of(calAnotherSideRequired(
412+
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
413+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
414+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
442415
}
416+
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
417+
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
418+
// legacy coordinator could not do right be selection in this case,
419+
// since it always to check the left most node whether olap scan node.
420+
// so we can only shuffle right to left side to do normal shuffle join
443421
updatedForRight = Optional.of(calAnotherSideRequired(
444-
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
422+
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
445423
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
446424
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
447425
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
448-
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
449-
if (SessionVariable.canUseNereidsDistributePlanner()) {
450-
// nereids coordinator can exchange left side to right side to do bucket shuffle join
451-
// TODO: maybe we should check if left child is PhysicalDistribute.
452-
// If so add storage bucketed shuffle on left side. Other wise,
453-
// add execution bucketed shuffle on right side.
454-
// updatedForLeft = Optional.of(calAnotherSideRequired(
455-
// ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
456-
// (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
457-
// (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
458-
List<PhysicalProperties> shuffleToLeft = Lists.newArrayList(originChildrenProperties);
459-
PhysicalProperties enforceShuffleRight = calAnotherSideRequired(
460-
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
461-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
462-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec());
463-
updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft);
426+
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
464427

465-
List<PhysicalProperties> shuffleToRight = Lists.newArrayList(originChildrenProperties);
466-
PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
467-
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
468-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
469-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()
470-
);
471-
updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight);
472-
return ImmutableList.of(shuffleToLeft, shuffleToRight);
473-
} else {
474-
// legacy coordinator could not do right be selection in this case,
475-
// since it always to check the left most node whether olap scan node.
476-
// so we can only shuffle right to left side to do normal shuffle join
428+
if (!bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
429+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
430+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) {
431+
shouldCheckLeftBucketDownGrade = true;
477432
updatedForRight = Optional.of(calAnotherSideRequired(
478433
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
479434
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
480435
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
481436
}
482-
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
483-
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
484-
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
485-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
486-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) {
487-
return ImmutableList.of(originChildrenProperties);
488-
}
489-
updatedForRight = Optional.of(calAnotherSideRequired(
490-
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
491-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
492-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
493437
} else if ((leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
494438
&& rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED)) {
495439
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
440+
shouldCheckrightBucketDownGrade = true;
496441
updatedForLeft = Optional.of(calAnotherSideRequired(
497442
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
498443
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
499444
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
500445
} else {
446+
shouldCheckLeftBucketDownGrade = true;
501447
updatedForRight = Optional.of(calAnotherSideRequired(
502448
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
503449
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
504450
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
505451
}
506452
} else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
507453
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
508-
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
509-
// since it always to check the left most node whether olap scan node.
510-
// after we fix coordinator problem, we could do right to left bucket shuffle
454+
shouldCheckLeftBucketDownGrade = true;
511455
updatedForRight = Optional.of(calAnotherSideRequired(
512456
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
513457
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
514458
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
515459
} else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
516460
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED)) {
517461
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
462+
shouldCheckrightBucketDownGrade = true;
518463
updatedForLeft = Optional.of(calAnotherSideRequired(
519464
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
520465
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
521466
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
522467
} else {
468+
shouldCheckLeftBucketDownGrade = true;
523469
updatedForRight = Optional.of(calAnotherSideRequired(
524470
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
525471
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
@@ -528,24 +474,46 @@ public List<List<PhysicalProperties>> visitPhysicalHashJoin(
528474

529475
} else if ((leftHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED
530476
&& rightHashSpec.getShuffleType() == ShuffleType.STORAGE_BUCKETED)) {
531-
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
477+
if (!bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
532478
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
533479
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) {
534-
return ImmutableList.of(originChildrenProperties);
535-
}
536-
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
537-
updatedForLeft = Optional.of(calAnotherSideRequired(
538-
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
539-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
540-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
541-
} else {
542-
updatedForRight = Optional.of(calAnotherSideRequired(
543-
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
544-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
545-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
480+
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
481+
shouldCheckrightBucketDownGrade = true;
482+
updatedForLeft = Optional.of(calAnotherSideRequired(
483+
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
484+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
485+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
486+
} else {
487+
shouldCheckLeftBucketDownGrade = true;
488+
updatedForRight = Optional.of(calAnotherSideRequired(
489+
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
490+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
491+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
492+
}
546493
}
547494
}
548495

496+
if (shouldCheckLeftBucketDownGrade && isBucketShuffleDownGrade(leftChild, leftHashSpec)) {
497+
updatedForLeft = Optional.of(calAnotherSideRequired(
498+
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
499+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
500+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
501+
updatedForRight = Optional.of(calAnotherSideRequired(
502+
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
503+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
504+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
505+
}
506+
if (shouldCheckrightBucketDownGrade && isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
507+
updatedForLeft = Optional.of(calAnotherSideRequired(
508+
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
509+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
510+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
511+
updatedForRight = Optional.of(calAnotherSideRequired(
512+
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, rightHashSpec,
513+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
514+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
515+
}
516+
549517
updatedForLeft.ifPresent(physicalProperties -> updateChildEnforceAndCost(0, physicalProperties));
550518
updatedForRight.ifPresent(physicalProperties -> updateChildEnforceAndCost(1, physicalProperties));
551519

0 commit comments

Comments
 (0)