Skip to content

Commit d335b24

Browse files
authored
[FLINK-38086][table] Make ChangelogNormalize reusable if possible
1 parent b1fd0b8 commit d335b24

File tree

7 files changed

+848
-114
lines changed

7 files changed

+848
-114
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.physical.stream;
20+
21+
import org.apache.flink.table.planner.calcite.FlinkRexBuilder;
22+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
23+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
24+
import org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
25+
import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
26+
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
27+
28+
import org.apache.calcite.rel.RelNode;
29+
import org.apache.calcite.rel.core.TableScan;
30+
import org.apache.calcite.rex.RexNode;
31+
32+
import java.util.ArrayList;
33+
import java.util.Comparator;
34+
import java.util.HashMap;
35+
import java.util.HashSet;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Set;
39+
40+
/**
41+
* A {@link FlinkOptimizeProgram} that marks ChangelogNormalize nodes using the same source and
42+
* determines common filters if any. This program is a preparation step for {@link
43+
* PushCalcPastChangelogNormalizeRule}.
44+
*
45+
* <p>There might be several scenarios:
46+
*
47+
* <p>1. Same conditions for ChangelogNormalize nodes example of the query
48+
*
49+
* <pre>
50+
* {@code SELECT * FROM T WHERE f1 < 0
51+
* UNION ALL
52+
* SELECT * FROM T WHERE f1 < 0}
53+
* </pre>
54+
*
55+
* <p>The plan before {@link PushCalcPastChangelogNormalizeRule}
56+
*
57+
* <pre>
58+
* {@code Union(all=[true], union=[f0, f1])
59+
* :- Calc(select=[f0, f1], where=[(f1 < 0)])(reuse_id=[1])
60+
* : +- ChangelogNormalize(key=[f1])
61+
* : +- Exchange(distribution=[hash[f1]])
62+
* : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1])
63+
* +- Reused(reference_id=[1])}
64+
* </pre>
65+
*
66+
* <p>Since the filter condition is same for both, it will be pushed down. ChangelogNormalize node
67+
* will be reused.
68+
*
69+
* <pre>
70+
* {@code Union(all=[true], union=[f0, f1])
71+
* :- ChangelogNormalize(key=[f1])(reuse_id=[1])
72+
* : +- Exchange(distribution=[hash[f1]])
73+
* : +- Calc(select=[f0, f1], where=[(f1 < 0)])
74+
* : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1])
75+
* +- Reused(reference_id=[1])}
76+
* </pre>
77+
*
78+
* <p>2. Conditions are different
79+
*
80+
* <pre>
81+
* {@code SELECT * FROM T WHERE f1 < 0
82+
* UNION ALL
83+
* SELECT * FROM T WHERE f1 < 10}
84+
* </pre>
85+
*
86+
* <p>The plans before and after are the same {@link PushCalcPastChangelogNormalizeRule}. Conditions
87+
* are different, thus, to keep reusing ChangelogNormalize they will not be pushed down.
88+
*
89+
* <pre>
90+
* {@code Union(all=[true], union=[f0, f1])
91+
* :- Calc(select=[f0, f1], where=[(f1 < 0)])
92+
* : +- ChangelogNormalize(key=[f1])(reuse_id=[1])
93+
* : +- Exchange(distribution=[hash[f1]])
94+
* : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1])
95+
* +- Calc(select=[f0, f1], where=[(f1 < 10)])
96+
* +- Reused(reference_id=[1])}
97+
* </pre>
98+
*
99+
* <p>3. Conditions are partially overlapping
100+
*
101+
* <pre>{@code SELECT * FROM T WHERE f1 < 10 AND f1 > 0
102+
* UNION ALL
103+
* SELECT * FROM T WHERE f1 > 0 AND f1 < 20}</pre>
104+
*
105+
* <p>In the plan before {@link PushCalcPastChangelogNormalizeRule} the conditions above
106+
* ChangelogNormalize.
107+
*
108+
* <pre>
109+
* {@code Union(all=[true], union=[f0, f1])
110+
* :- Calc(select=[f0, f1], where=[SEARCH(f1, Sarg[(0..10)])])
111+
* : +- ChangelogNormalize(key=[f1])(reuse_id=[1])
112+
* : +- Exchange(distribution=[hash[f1]])
113+
* : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1])
114+
* +- Calc(select=[f0, f1], where=[SEARCH(f1, Sarg[(0..20)])])
115+
* +- Reused(reference_id=[1])}
116+
* </pre>
117+
*
118+
* <p>After applying {@link PushCalcPastChangelogNormalizeRule} the condition should be splitted
119+
* into common and not common parts. Common part should be pushed down as below.
120+
*
121+
* <pre>{@code Union(all=[true], union=[f0, f1])
122+
* :- Calc(select=[f0, f1], where=[(f1 < 10)])
123+
* : +- ChangelogNormalize(key=[f1])(reuse_id=[1])
124+
* : +- Exchange(distribution=[hash[f1]])
125+
* : +- Calc(select=[f0, f1], where=[(f1 > 0)])
126+
* : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1])
127+
* +- Calc(select=[f0, f1], where=[(f1 < 20)])
128+
* +- Reused(reference_id=[1])}</pre>
129+
*/
130+
public class FlinkMarkChangelogNormalizeProgram
131+
implements FlinkOptimizeProgram<StreamOptimizeContext> {
132+
@Override
133+
public RelNode optimize(RelNode root, StreamOptimizeContext context) {
134+
final Map<TableScan, List<ChangelogNormalizeContext>> tableScansToChangelogNormalize =
135+
new HashMap<>();
136+
final FlinkRexBuilder rexBuilder =
137+
new FlinkRexBuilder(context.getFlinkRelBuilder().getTypeFactory());
138+
for (RelNode relNode : root.getInputs()) {
139+
gatherTableScanToChangelogNormalizeMap(
140+
relNode, tableScansToChangelogNormalize, rexBuilder);
141+
}
142+
143+
for (Map.Entry<TableScan, List<ChangelogNormalizeContext>> entry :
144+
tableScansToChangelogNormalize.entrySet()) {
145+
final List<ChangelogNormalizeContext> changelogNormalizeContexts = entry.getValue();
146+
if (changelogNormalizeContexts.size() <= 1) {
147+
// we are interested only in cases with at least 2 changelog normalize nodes having
148+
// the same source
149+
continue;
150+
}
151+
152+
final Set<RexNode> common = calculateCommonCondition(changelogNormalizeContexts);
153+
for (ChangelogNormalizeContext ctx : changelogNormalizeContexts) {
154+
ctx.getChangelogNormalize().markSourceReuse();
155+
if (!common.isEmpty()) {
156+
ctx.getChangelogNormalize().setCommonFilter(common.toArray(new RexNode[0]));
157+
}
158+
}
159+
}
160+
return root;
161+
}
162+
163+
private Set<RexNode> calculateCommonCondition(
164+
List<ChangelogNormalizeContext> changelogNormalizeContexts) {
165+
changelogNormalizeContexts.sort(Comparator.comparingInt(o -> o.getConditions().size()));
166+
final Set<RexNode> common =
167+
new HashSet<>(changelogNormalizeContexts.get(0).getConditions());
168+
169+
for (int i = 1; i < changelogNormalizeContexts.size() && !common.isEmpty(); i++) {
170+
common.retainAll(changelogNormalizeContexts.get(i).getConditions());
171+
}
172+
return common;
173+
}
174+
175+
private void gatherTableScanToChangelogNormalizeMap(
176+
RelNode curRelNode,
177+
Map<TableScan, List<ChangelogNormalizeContext>> map,
178+
FlinkRexBuilder rexBuilder) {
179+
for (RelNode input : curRelNode.getInputs()) {
180+
if (input instanceof StreamPhysicalChangelogNormalize) {
181+
StreamPhysicalChangelogNormalize changelogNormalize =
182+
(StreamPhysicalChangelogNormalize) input;
183+
if (curRelNode instanceof StreamPhysicalCalc) {
184+
StreamPhysicalCalc calc = (StreamPhysicalCalc) curRelNode;
185+
final List<RexNode> conditions =
186+
FlinkRexUtil.extractConjunctiveConditions(
187+
rexBuilder, calc.getProgram());
188+
gatherTableScanToChangelogNormalizeMap(
189+
input,
190+
ChangelogNormalizeContext.of(changelogNormalize, conditions),
191+
map);
192+
}
193+
} else {
194+
gatherTableScanToChangelogNormalizeMap(input, map, rexBuilder);
195+
}
196+
}
197+
}
198+
199+
private void gatherTableScanToChangelogNormalizeMap(
200+
RelNode cur,
201+
ChangelogNormalizeContext context,
202+
Map<TableScan, List<ChangelogNormalizeContext>> currentMap) {
203+
if (cur instanceof TableScan) {
204+
currentMap.computeIfAbsent((TableScan) cur, (k) -> new ArrayList<>()).add(context);
205+
} else {
206+
for (RelNode relNode : cur.getInputs()) {
207+
gatherTableScanToChangelogNormalizeMap(relNode, context, currentMap);
208+
}
209+
}
210+
}
211+
212+
private static class ChangelogNormalizeContext {
213+
private final StreamPhysicalChangelogNormalize changelogNormalize;
214+
private final List<RexNode> conditions;
215+
216+
public ChangelogNormalizeContext(
217+
StreamPhysicalChangelogNormalize changelogNormalize, List<RexNode> conditions) {
218+
this.changelogNormalize = changelogNormalize;
219+
this.conditions = conditions;
220+
}
221+
222+
public static ChangelogNormalizeContext of(
223+
StreamPhysicalChangelogNormalize changelogNormalize, List<RexNode> conditions) {
224+
return new ChangelogNormalizeContext(changelogNormalize, conditions);
225+
}
226+
227+
public StreamPhysicalChangelogNormalize getChangelogNormalize() {
228+
return changelogNormalize;
229+
}
230+
231+
public List<RexNode> getConditions() {
232+
return conditions;
233+
}
234+
}
235+
}

0 commit comments

Comments
 (0)