Skip to content

Commit 2a3d701

Browse files
liuyongvsyongliu
andauthored
[FLINK-36986][table] Migrate SplitRemoteConditionFromJoinRule to java
Co-authored-by: yongliu <yongen.ly@antgroup.com>
1 parent 73993fc commit 2a3d701

File tree

5 files changed

+204
-131
lines changed

5 files changed

+204
-131
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ public class AsyncCalcSplitRule {
6262
public static final RelOptRule ONE_PER_CALC_SPLIT =
6363
new AsyncCalcSplitOnePerCalcRule(ASYNC_CALL_FINDER);
6464
public static final RelOptRule NO_ASYNC_JOIN_CONDITIONS =
65-
new SplitRemoteConditionFromJoinRule(
66-
ASYNC_CALL_FINDER,
67-
JavaScalaConversionUtil.toScala(
65+
SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig.DEFAULT
66+
.withRemoteCallFinder(ASYNC_CALL_FINDER)
67+
.withErrorOnUnsplittableRemoteCall(
6868
Optional.of(
69-
"AsyncScalarFunction not supported for non inner join condition")));
69+
"AsyncScalarFunction not supported for non inner join condition"))
70+
.toRule();
7071

7172
private static boolean hasNestedCalls(List<RexNode> projects) {
7273
return projects.stream()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.table.planner.plan.rules.logical;
19+
20+
import org.apache.calcite.plan.RelOptRule;
21+
22+
public class SplitPythonConditionFromJoinRule {
23+
24+
private static final RemoteCallFinder callFinder = new PythonRemoteCallFinder();
25+
26+
public static final RelOptRule INSTANCE =
27+
SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig.DEFAULT
28+
.withRemoteCallFinder(callFinder)
29+
.toRule();
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.table.planner.plan.rules.logical;
19+
20+
import org.apache.flink.table.api.TableException;
21+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
22+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
23+
24+
import org.apache.calcite.plan.RelOptRuleCall;
25+
import org.apache.calcite.plan.RelOptUtil;
26+
import org.apache.calcite.plan.RelRule;
27+
import org.apache.calcite.rel.core.JoinRelType;
28+
import org.apache.calcite.rex.RexBuilder;
29+
import org.apache.calcite.rex.RexNode;
30+
import org.apache.calcite.rex.RexProgram;
31+
import org.apache.calcite.rex.RexProgramBuilder;
32+
import org.apache.calcite.rex.RexUtil;
33+
import org.immutables.value.Value;
34+
35+
import java.util.List;
36+
import java.util.Optional;
37+
import java.util.stream.Collectors;
38+
39+
/**
40+
* Rule will split the {@link FlinkLogicalJoin} which contains Remote Functions in join condition
41+
* into a {@link FlinkLogicalJoin} and a {@link FlinkLogicalCalc} with Remote Functions. Currently,
42+
* only inner join is supported.
43+
*
44+
* <p>After this rule is applied, there will be no Remote Functions in the condition of the {@link
45+
* FlinkLogicalJoin}.
46+
*/
47+
@Value.Enclosing
48+
public class SplitRemoteConditionFromJoinRule
49+
extends RelRule<SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig> {
50+
51+
protected SplitRemoteConditionFromJoinRule(
52+
SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig config) {
53+
super(config);
54+
}
55+
56+
@Override
57+
public boolean matches(RelOptRuleCall call) {
58+
FlinkLogicalJoin join = call.rel(0);
59+
JoinRelType joinType = join.getJoinType();
60+
// matches if it is inner join and it contains Remote functions in condition
61+
if (join.getCondition() != null
62+
&& config.remoteCallFinder().containsRemoteCall(join.getCondition())) {
63+
if (joinType == JoinRelType.INNER) {
64+
return true;
65+
} else if (config.errorOnUnsplittableRemoteCall().isPresent()) {
66+
throw new TableException(config.errorOnUnsplittableRemoteCall().get());
67+
}
68+
}
69+
return false;
70+
}
71+
72+
public void onMatch(RelOptRuleCall call) {
73+
FlinkLogicalJoin join = call.rel(0);
74+
RexBuilder rexBuilder = join.getCluster().getRexBuilder();
75+
76+
List<RexNode> joinFilters = RelOptUtil.conjunctions(join.getCondition());
77+
List<RexNode> remoteFilters =
78+
joinFilters.stream()
79+
.filter(config.remoteCallFinder()::containsRemoteCall)
80+
.collect(Collectors.toList());
81+
List<RexNode> remainingFilters =
82+
joinFilters.stream()
83+
.filter(f -> !config.remoteCallFinder().containsRemoteCall(f))
84+
.collect(Collectors.toList());
85+
86+
RexNode newJoinCondition = RexUtil.composeConjunction(rexBuilder, remainingFilters);
87+
FlinkLogicalJoin bottomJoin =
88+
new FlinkLogicalJoin(
89+
join.getCluster(),
90+
join.getTraitSet(),
91+
join.getLeft(),
92+
join.getRight(),
93+
newJoinCondition,
94+
join.getHints(),
95+
join.getJoinType());
96+
97+
RexProgramBuilder rexProgramBuilder =
98+
new RexProgramBuilder(bottomJoin.getRowType(), rexBuilder);
99+
RexProgram rexProgram = rexProgramBuilder.getProgram();
100+
RexNode topCalcCondition = RexUtil.composeConjunction(rexBuilder, remoteFilters);
101+
102+
FlinkLogicalCalc topCalc =
103+
new FlinkLogicalCalc(
104+
join.getCluster(),
105+
join.getTraitSet(),
106+
bottomJoin,
107+
RexProgram.create(
108+
bottomJoin.getRowType(),
109+
rexProgram.getExprList(),
110+
topCalcCondition,
111+
bottomJoin.getRowType(),
112+
rexBuilder));
113+
114+
call.transformTo(topCalc);
115+
}
116+
117+
// Consider the rules to be equal if they are the same class and their call finders are the same
118+
// class.
119+
@Override
120+
public boolean equals(Object obj) {
121+
if (obj == this) {
122+
return true;
123+
}
124+
if (!(obj instanceof SplitRemoteConditionFromJoinRule)) {
125+
return false;
126+
}
127+
128+
SplitRemoteConditionFromJoinRule rule = (SplitRemoteConditionFromJoinRule) obj;
129+
return super.equals(rule)
130+
&& config.remoteCallFinder()
131+
.getClass()
132+
.equals(rule.config.remoteCallFinder().getClass())
133+
&& config.errorOnUnsplittableRemoteCall()
134+
.equals(rule.config.errorOnUnsplittableRemoteCall());
135+
}
136+
137+
/** Rule configuration. */
138+
@Value.Immutable(singleton = false)
139+
public interface SplitRemoteConditionFromJoinRuleConfig extends RelRule.Config {
140+
SplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig DEFAULT =
141+
ImmutableSplitRemoteConditionFromJoinRule.SplitRemoteConditionFromJoinRuleConfig
142+
.builder()
143+
.operandSupplier(b0 -> b0.operand(FlinkLogicalJoin.class).anyInputs())
144+
.description("SplitRemoteConditionFromJoinRule")
145+
.build();
146+
147+
@Value.Default
148+
default RemoteCallFinder remoteCallFinder() {
149+
return new PythonRemoteCallFinder();
150+
}
151+
152+
/** Sets {@link #remoteCallFinder()}. */
153+
SplitRemoteConditionFromJoinRuleConfig withRemoteCallFinder(RemoteCallFinder callFinder);
154+
155+
@Value.Default
156+
default Optional<String> errorOnUnsplittableRemoteCall() {
157+
return Optional.empty();
158+
}
159+
160+
/** Sets {@link #errorOnUnsplittableRemoteCall()}. */
161+
SplitRemoteConditionFromJoinRuleConfig withErrorOnUnsplittableRemoteCall(
162+
Optional<String> errorOnUnsplittableRemoteCall);
163+
164+
@Override
165+
default SplitRemoteConditionFromJoinRule toRule() {
166+
return new SplitRemoteConditionFromJoinRule(this);
167+
}
168+
}
169+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromJoinRule.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitRemoteConditionFromJoinRule.scala

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)