Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ APPEND: 'APPEND';
ARRAY: 'ARRAY';
AS: 'AS';
ASC: 'ASC';
ASOF: 'ASOF';
AT: 'AT';
AUTHORS: 'AUTHORS';
AUTO: 'AUTO';
Expand Down Expand Up @@ -342,6 +343,7 @@ MATCH: 'MATCH';
MATCHED: 'MATCHED';
MATCH_ALL: 'MATCH_ALL';
MATCH_ANY: 'MATCH_ANY';
MATCH_CONDITION: 'MATCH_CONDITION';
MATCH_NAME: 'MATCH_NAME';
MATCH_NAME_GLOB: 'MATCH_NAME_GLOB';
MATCH_PHRASE: 'MATCH_PHRASE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,11 @@ relation
;

joinRelation
: (joinType) JOIN distributeType? right=relationPrimary joinCriteria?
: (joinType) JOIN distributeType? right=relationPrimary matchCondition? joinCriteria?
;

matchCondition
: MATCH_CONDITION LEFT_PAREN valueExpression RIGHT_PAREN
;

// Just like `opt_plan_hints` in legacy CUP parser.
Expand Down Expand Up @@ -1388,6 +1392,8 @@ joinType
| RIGHT SEMI
| LEFT ANTI
| RIGHT ANTI
| ASOF LEFT?
| ASOF INNER
;

joinCriteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ public enum JoinOperator {
NULL_AWARE_LEFT_ANTI_JOIN("NULL AWARE LEFT ANTI JOIN",
TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN),
NULL_AWARE_LEFT_SEMI_JOIN("NULL AWARE LEFT SEMI JOIN",
TJoinOp.NULL_AWARE_LEFT_SEMI_JOIN);
TJoinOp.NULL_AWARE_LEFT_SEMI_JOIN),
ASOF_LEFT_INNER_JOIN("ASOF_LEFT_INNER_JOIN", TJoinOp.ASOF_LEFT_INNER_JOIN),
ASOF_RIGHT_INNER_JOIN("ASOF_RIGHT_INNER_JOIN", TJoinOp.ASOF_RIGHT_INNER_JOIN),
ASOF_LEFT_OUTER_JOIN("ASOF_LEFT_OUTER_JOIN", TJoinOp.ASOF_LEFT_OUTER_JOIN),
ASOF_RIGHT_OUTER_JOIN("ASOF_LEFT_INNER_JOIN", TJoinOp.ASOF_RIGHT_OUTER_JOIN);

private final String description;
private final TJoinOp thriftJoinOp;
Expand All @@ -57,66 +61,4 @@ public String toString() {
public TJoinOp toThrift() {
return thriftJoinOp;
}

public boolean isOuterJoin() {
return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
}

public boolean isSemiAntiJoin() {
return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN
|| this == NULL_AWARE_LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
}

public boolean isSemiJoin() {
return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN
|| this == JoinOperator.RIGHT_SEMI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN
|| this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}

public boolean isSemiOrAntiJoinNoNullAware() {
return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN
|| this == JoinOperator.RIGHT_SEMI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN;
}

public boolean isAntiJoinNullAware() {
return this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}

public boolean isAntiJoinNoNullAware() {
return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN;
}

public boolean isLeftSemiJoin() {
return this.thriftJoinOp == TJoinOp.LEFT_SEMI_JOIN;
}

public boolean isInnerJoin() {
return this.thriftJoinOp == TJoinOp.INNER_JOIN;
}

public boolean isAntiJoin() {
return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN
|| this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}

public boolean supportMarkJoin() {
return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.LEFT_SEMI_JOIN
|| this == JoinOperator.CROSS_JOIN || this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}

public boolean isCrossJoin() {
return this == CROSS_JOIN;
}

public boolean isFullOuterJoin() {
return this == FULL_OUTER_JOIN;
}

public boolean isLeftOuterJoin() {
return this == LEFT_OUTER_JOIN;
}

public boolean isRightOuterJoin() {
return this == RIGHT_OUTER_JOIN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1705,15 +1705,17 @@ public PlanFragment visitPhysicalHashJoin(
}

// set slots as nullable for outer join
if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.ASOF_LEFT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN) {
for (SlotDescriptor sd : rightIntermediateSlotDescriptor) {
sd.setIsNullable(true);
SlotRef slotRef = new SlotRef(sd);
ExprId exprId = context.findExprId(sd.getId());
context.addExprIdSlotRefPair(exprId, slotRef);
}
}
if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.ASOF_RIGHT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN) {
for (SlotDescriptor sd : leftIntermediateSlotDescriptor) {
sd.setIsNullable(true);
SlotRef slotRef = new SlotRef(sd);
Expand Down Expand Up @@ -1878,15 +1880,17 @@ public PlanFragment visitPhysicalNestedLoopJoin(
}

// set slots as nullable for outer join
if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
if (joinType == JoinType.LEFT_OUTER_JOIN || joinType == JoinType.ASOF_LEFT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN) {
for (SlotDescriptor sd : rightIntermediateSlotDescriptor) {
sd.setIsNullable(true);
SlotRef slotRef = new SlotRef(sd);
ExprId exprId = context.findExprId(sd.getId());
context.addExprIdSlotRefPair(exprId, slotRef);
}
}
if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.FULL_OUTER_JOIN) {
if (joinType == JoinType.RIGHT_OUTER_JOIN || joinType == JoinType.ASOF_RIGHT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN) {
for (SlotDescriptor sd : leftIntermediateSlotDescriptor) {
sd.setIsNullable(true);
SlotRef slotRef = new SlotRef(sd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4304,6 +4304,7 @@ protected LogicalPlan withSelectQuerySpecification(
private LogicalPlan withJoinRelations(LogicalPlan input, RelationContext ctx) {
LogicalPlan last = input;
for (JoinRelationContext join : ctx.joinRelation()) {
boolean isAsofJoin = false;
JoinType joinType;
if (join.joinType().CROSS() != null) {
joinType = JoinType.CROSS_JOIN;
Expand All @@ -4321,6 +4322,10 @@ private LogicalPlan withJoinRelations(LogicalPlan input, RelationContext ctx) {
} else {
joinType = JoinType.RIGHT_ANTI_JOIN;
}
} else if (join.joinType().ASOF() != null) {
joinType = join.joinType().INNER() != null
? JoinType.ASOF_LEFT_INNER_JOIN : JoinType.ASOF_LEFT_OUTER_JOIN;
isAsofJoin = true;
} else if (join.joinType().LEFT() != null) {
joinType = JoinType.LEFT_OUTER_JOIN;
} else if (join.joinType().RIGHT() != null) {
Expand All @@ -4332,6 +4337,23 @@ private LogicalPlan withJoinRelations(LogicalPlan input, RelationContext ctx) {
} else {
joinType = JoinType.CROSS_JOIN;
}
Expression matchCondition = null;
if (join.matchCondition() != null) {
if (!isAsofJoin) {
throw new ParseException("only ASOF JOIN support MATCH_CONDITION", join);
}
matchCondition = typedVisit(join.matchCondition().valueExpression());
if (!(matchCondition instanceof LessThan
|| matchCondition instanceof LessThanEqual
|| matchCondition instanceof GreaterThan
|| matchCondition instanceof GreaterThanEqual)) {
throw new ParseException("ASOF JOIN's MATCH_CONDITION must be <, <=, >, >=", join);
}
} else {
if (isAsofJoin) {
throw new ParseException("ASOF JOIN must specify MATCH_CONDITION", join);
}
}
DistributeHint distributeHint = new DistributeHint(DistributeType.NONE);
if (join.distributeType() != null) {
distributeHint = visitDistributeType(join.distributeType());
Expand All @@ -4352,21 +4374,48 @@ private LogicalPlan withJoinRelations(LogicalPlan input, RelationContext ctx) {
.collect(ImmutableList.toImmutableList());
}
} else {
if (isAsofJoin) {
throw new ParseException("ASOF JOIN must have on or using clause", join);
}
// keep same with original planner, allow cross/inner join
if (!joinType.isInnerOrCrossJoin()) {
throw new ParseException("on mustn't be empty except for cross/inner join", join);
}
}

if (ids == null) {
last = new LogicalJoin<>(joinType, ExpressionUtils.EMPTY_CONDITION,
condition.map(ExpressionUtils::extractConjunction)
.orElse(ExpressionUtils.EMPTY_CONDITION),
distributeHint,
Optional.empty(),
last,
plan(join.relationPrimary()), null);
if (isAsofJoin) {
if (!condition.isPresent()) {
throw new ParseException("ASOF JOIN can't be used without ON clause", join);
}
List<Expression> conjuncts = ExpressionUtils.extractConjunction(condition.get());
for (Expression expression : conjuncts) {
if (!(expression instanceof EqualTo)) {
throw new ParseException("ASOF JOIN's ON clause must be one or more EQUAL(=) conjuncts",
join);
}
if (expression.child(0).isConstant() || expression.child(1).isConstant()) {
throw new ParseException("ASOF JOIN's EQUAL conjunct's children must not be constant");
}
}
last = new LogicalJoin<>(joinType, conjuncts,
ImmutableList.of(matchCondition),
distributeHint,
Optional.empty(),
last,
plan(join.relationPrimary()), null);
} else {
last = new LogicalJoin<>(joinType, ExpressionUtils.EMPTY_CONDITION,
condition.map(ExpressionUtils::extractConjunction)
.orElse(ExpressionUtils.EMPTY_CONDITION),
distributeHint,
Optional.empty(),
last,
plan(join.relationPrimary()), null);
}
} else {
last = new LogicalUsingJoin<>(joinType, last, plan(join.relationPrimary()), ids, distributeHint);
last = new LogicalUsingJoin<>(joinType, last, plan(join.relationPrimary()), ids,
matchCondition != null ? Optional.of(matchCondition) : Optional.empty(), distributeHint);

}
if (distributeHint.distributeType != DistributeType.NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
JoinType.LEFT_ANTI_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_OUTER_JOIN,
JoinType.ASOF_LEFT_OUTER_JOIN,
JoinType.NULL_AWARE_LEFT_ANTI_JOIN
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitors;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
Expand Down Expand Up @@ -280,7 +279,7 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
PhysicalRelation target2 = null;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) {
if (!join.equals(ctx.builderNode)
&& (join.getJoinType() == JoinType.INNER_JOIN || join.getJoinType().isSemiJoin())) {
&& (join.getJoinType().isInnerJoin() || join.getJoinType().isSemiJoin())) {
for (Expression expr : join.getHashJoinConjuncts()) {
EqualPredicate equalTo = (EqualPredicate) expr;
if (ctx.probeExpr.equals(equalTo.left())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case CROSS_JOIN:
case ASOF_LEFT_INNER_JOIN:
case ASOF_RIGHT_INNER_JOIN:
if (shuffleSide == ShuffleSide.LEFT) {
return new PhysicalProperties(
DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
Expand All @@ -597,6 +599,7 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
case LEFT_OUTER_JOIN:
case ASOF_LEFT_OUTER_JOIN:
if (shuffleSide == ShuffleSide.LEFT || shuffleSide == ShuffleSide.BOTH) {
return new PhysicalProperties(
leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
Expand All @@ -609,6 +612,7 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
case ASOF_RIGHT_OUTER_JOIN:
if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.BOTH) {
return new PhysicalProperties(
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
Expand Down Expand Up @@ -645,17 +649,21 @@ private PhysicalProperties legacyComputeShuffleJoinOutputProperties(
DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) {
switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case ASOF_LEFT_INNER_JOIN:
case ASOF_RIGHT_INNER_JOIN:
case CROSS_JOIN:
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, leftHashSpec.getShuffleType()));
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
case NULL_AWARE_LEFT_ANTI_JOIN:
case LEFT_OUTER_JOIN:
case ASOF_LEFT_OUTER_JOIN:
return new PhysicalProperties(leftHashSpec);
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
case ASOF_RIGHT_OUTER_JOIN:
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
DistributionSpecHash rightHashSpec) {
boolean isJoinTypeInScope = (joinType == JoinType.RIGHT_ANTI_JOIN
|| joinType == JoinType.RIGHT_OUTER_JOIN
|| joinType == JoinType.ASOF_RIGHT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN);
boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
|| rightHashSpec.getShuffleType() == ShuffleType.NATURAL);
Expand Down
Loading
Loading