Skip to content

Commit 73993fc

Browse files
authored
[FLINK-38885][table] Migrate StreamPhysicalJoinRuleBase and its children
1 parent 08648c5 commit 73993fc

File tree

9 files changed

+537
-395
lines changed

9 files changed

+537
-395
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.physical.stream;
20+
21+
import org.apache.flink.api.java.tuple.Tuple2;
22+
import org.apache.flink.table.api.TableException;
23+
import org.apache.flink.table.api.ValidationException;
24+
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
25+
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
26+
import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
27+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
28+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin;
29+
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
30+
31+
import org.apache.calcite.plan.RelOptRule;
32+
import org.apache.calcite.plan.RelOptRuleCall;
33+
import org.apache.calcite.plan.RelTraitSet;
34+
import org.apache.calcite.rel.RelNode;
35+
import org.apache.calcite.rel.type.RelDataType;
36+
import org.apache.calcite.rex.RexNode;
37+
import org.immutables.value.Value;
38+
39+
import java.util.Collection;
40+
import java.util.function.Function;
41+
import java.util.stream.Collectors;
42+
43+
import scala.Option;
44+
45+
/**
46+
* Rule that converts non-SEMI/ANTI {@link FlinkLogicalJoin} with window bounds in join condition to
47+
* {@link StreamPhysicalIntervalJoin}.
48+
*/
49+
@Value.Enclosing
50+
public class StreamPhysicalIntervalJoinRule
51+
extends StreamPhysicalJoinRuleBase<
52+
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig> {
53+
public static final RelOptRule INSTANCE = StreamPhysicalIntervalJoinRuleConfig.DEFAULT.toRule();
54+
55+
public StreamPhysicalIntervalJoinRule(StreamPhysicalIntervalJoinRuleConfig config) {
56+
super(config);
57+
}
58+
59+
@Override
60+
public boolean matches(RelOptRuleCall call) {
61+
FlinkLogicalJoin join = call.rel(0);
62+
63+
if (!IntervalJoinUtil.satisfyIntervalJoin(join)) {
64+
return false;
65+
}
66+
67+
// validate the join
68+
IntervalJoinSpec.WindowBounds windowBounds = extractWindowBounds(join).f0.get();
69+
70+
if (windowBounds.isEventTime()) {
71+
RelDataType leftTimeAttributeType =
72+
join.getLeft()
73+
.getRowType()
74+
.getFieldList()
75+
.get(windowBounds.getLeftTimeIdx())
76+
.getType();
77+
RelDataType rightTimeAttributeType =
78+
join.getRight()
79+
.getRowType()
80+
.getFieldList()
81+
.get(windowBounds.getRightTimeIdx())
82+
.getType();
83+
if (leftTimeAttributeType.getSqlTypeName() != rightTimeAttributeType.getSqlTypeName()) {
84+
throw new ValidationException(
85+
String.format(
86+
"Interval join with rowtime attribute requires same rowtime types,"
87+
+ " but the types are %s and %s.",
88+
leftTimeAttributeType, rightTimeAttributeType));
89+
}
90+
} else {
91+
// Check that no event-time attributes are in the input because the processing time
92+
// window
93+
// join does not correctly hold back watermarks.
94+
// We rely on projection pushdown to remove unused attributes before the join.
95+
RelDataType joinRowType = join.getRowType();
96+
boolean containsRowTime =
97+
joinRowType.getFieldList().stream()
98+
.anyMatch(f -> FlinkTypeFactory.isRowtimeIndicatorType(f.getType()));
99+
if (containsRowTime) {
100+
throw new TableException(
101+
"Interval join with proctime attribute requires no event-time attributes are in the "
102+
+ "join inputs.");
103+
}
104+
}
105+
return true;
106+
}
107+
108+
@Override
109+
public Collection<Integer> computeJoinLeftKeys(FlinkLogicalJoin join) {
110+
Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
111+
extractWindowBounds(join);
112+
return join.analyzeCondition().leftKeys.stream()
113+
.filter(k -> tuple2.f0.get().getLeftTimeIdx() != k)
114+
.collect(Collectors.toList());
115+
}
116+
117+
@Override
118+
public Collection<Integer> computeJoinRightKeys(FlinkLogicalJoin join) {
119+
Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
120+
extractWindowBounds(join);
121+
return join.analyzeCondition().rightKeys.stream()
122+
.filter(k -> tuple2.f0.get().getRightTimeIdx() != k)
123+
.collect(Collectors.toList());
124+
}
125+
126+
@Override
127+
public FlinkRelNode transform(
128+
FlinkLogicalJoin join,
129+
FlinkRelNode leftInput,
130+
Function<RelNode, RelNode> leftConversion,
131+
FlinkRelNode rightInput,
132+
Function<RelNode, RelNode> rightConversion,
133+
RelTraitSet providedTraitSet) {
134+
Tuple2<Option<IntervalJoinSpec.WindowBounds>, Option<RexNode>> tuple2 =
135+
extractWindowBounds(join);
136+
return new StreamPhysicalIntervalJoin(
137+
join.getCluster(),
138+
providedTraitSet,
139+
leftConversion.apply(leftInput),
140+
rightConversion.apply(rightInput),
141+
join.getJoinType(),
142+
join.getCondition(),
143+
tuple2.f1.getOrElse(() -> join.getCluster().getRexBuilder().makeLiteral(true)),
144+
tuple2.f0.get());
145+
}
146+
147+
/** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
148+
@Value.Immutable
149+
public interface StreamPhysicalIntervalJoinRuleConfig
150+
extends StreamPhysicalJoinRuleBaseRuleConfig {
151+
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig DEFAULT =
152+
ImmutableStreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
153+
.builder()
154+
.build()
155+
.withOperandSupplier(StreamPhysicalJoinRuleBaseRuleConfig.OPERAND_TRANSFORM)
156+
.withDescription("StreamPhysicalJoinRuleBase")
157+
.as(
158+
StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig
159+
.class);
160+
161+
@Override
162+
default StreamPhysicalIntervalJoinRule toRule() {
163+
return new StreamPhysicalIntervalJoinRule(this);
164+
}
165+
}
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.physical.stream;
20+
21+
import org.apache.flink.table.api.TableException;
22+
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
23+
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode;
24+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
25+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
26+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
27+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
28+
import org.apache.flink.table.planner.plan.utils.JoinUtil;
29+
import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
30+
import org.apache.flink.util.Preconditions;
31+
32+
import org.apache.calcite.plan.RelOptRuleCall;
33+
import org.apache.calcite.plan.RelTraitSet;
34+
import org.apache.calcite.rel.RelNode;
35+
import org.immutables.value.Value;
36+
37+
import java.util.function.Function;
38+
39+
/**
40+
* Rule that converts {@link FlinkLogicalJoin} without window bounds in join condition to {@link
41+
* StreamPhysicalJoin}.
42+
*/
43+
@Value.Enclosing
44+
public class StreamPhysicalJoinRule
45+
extends StreamPhysicalJoinRuleBase<StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig> {
46+
public static final StreamPhysicalJoinRule INSTANCE =
47+
StreamPhysicalJoinRuleConfig.DEFAULT.toRule();
48+
49+
public StreamPhysicalJoinRule(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig config) {
50+
super(config);
51+
}
52+
53+
@Override
54+
public boolean matches(RelOptRuleCall call) {
55+
final FlinkLogicalJoin join = call.rel(0);
56+
final FlinkLogicalRel left = call.rel(1);
57+
final FlinkLogicalRel right = call.rel(2);
58+
59+
if (!JoinUtil.satisfyRegularJoin(join, left, right)) {
60+
return false;
61+
}
62+
63+
// validate the join
64+
if (left instanceof FlinkLogicalSnapshot) {
65+
throw new TableException(
66+
"Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table.");
67+
}
68+
69+
// INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in case which
70+
// fallback
71+
// to regular join
72+
Preconditions.checkState(
73+
!TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition()));
74+
75+
// Time attributes must not be in the output type of a regular join
76+
boolean timeAttrInOutput =
77+
join.getRowType().getFieldList().stream()
78+
.anyMatch(f -> FlinkTypeFactory.isTimeIndicatorType(f.getType()));
79+
Preconditions.checkState(!timeAttrInOutput);
80+
81+
// Join condition must not access time attributes
82+
boolean remainingPredsAccessTime =
83+
JoinUtil.accessesTimeAttribute(
84+
join.getCondition(), JoinUtil.combineJoinInputsRowType(join));
85+
Preconditions.checkState(!remainingPredsAccessTime);
86+
return true;
87+
}
88+
89+
@Override
90+
public FlinkRelNode transform(
91+
FlinkLogicalJoin join,
92+
FlinkRelNode leftInput,
93+
Function<RelNode, RelNode> leftConversion,
94+
FlinkRelNode rightInput,
95+
Function<RelNode, RelNode> rightConversion,
96+
RelTraitSet providedTraitSet) {
97+
return new StreamPhysicalJoin(
98+
join.getCluster(),
99+
providedTraitSet,
100+
leftConversion.apply(leftInput),
101+
rightConversion.apply(rightInput),
102+
join.getCondition(),
103+
join.getJoinType(),
104+
join.getHints());
105+
}
106+
107+
/** Configuration for {@link StreamPhysicalIntervalJoinRule}. */
108+
@Value.Immutable
109+
public interface StreamPhysicalJoinRuleConfig
110+
extends StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig {
111+
StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig DEFAULT =
112+
ImmutableStreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.builder()
113+
.build()
114+
.withOperandSupplier(OPERAND_TRANSFORM)
115+
.withDescription("StreamPhysicalJoinRule")
116+
.as(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.class);
117+
118+
@Override
119+
default StreamPhysicalJoinRule toRule() {
120+
return new StreamPhysicalJoinRule(this);
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)