Skip to content

Commit 35d1d8a

Browse files
committed
[FLINK-38624][table] Convert FlinkLogicalOverAggregate to java
1 parent 421ec89 commit 35d1d8a

File tree

2 files changed

+135
-121
lines changed

2 files changed

+135
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.logical;
20+
21+
import org.apache.flink.table.api.ValidationException;
22+
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
23+
24+
import org.apache.calcite.plan.Convention;
25+
import org.apache.calcite.plan.RelOptCluster;
26+
import org.apache.calcite.plan.RelOptRule;
27+
import org.apache.calcite.plan.RelTraitSet;
28+
import org.apache.calcite.rel.RelCollationTraitDef;
29+
import org.apache.calcite.rel.RelNode;
30+
import org.apache.calcite.rel.convert.ConverterRule;
31+
import org.apache.calcite.rel.core.Window;
32+
import org.apache.calcite.rel.hint.RelHint;
33+
import org.apache.calcite.rel.logical.LogicalWindow;
34+
import org.apache.calcite.rel.metadata.RelMdCollation;
35+
import org.apache.calcite.rel.metadata.RelMetadataQuery;
36+
import org.apache.calcite.rel.type.RelDataType;
37+
import org.apache.calcite.rex.RexLiteral;
38+
import org.apache.calcite.sql.SqlRankFunction;
39+
import org.checkerframework.checker.nullness.qual.Nullable;
40+
41+
import java.util.List;
42+
43+
/**
44+
* Sub-class of {@link Window} that is a relational expression which represents a set of over window
45+
* aggregates in Flink.
46+
*/
47+
public class FlinkLogicalOverAggregate extends Window implements FlinkLogicalRel {
48+
public static final ConverterRule CONVERTER =
49+
new FlinkLogicalOverAggregateConverter(
50+
ConverterRule.Config.INSTANCE.withConversion(
51+
LogicalWindow.class,
52+
Convention.NONE,
53+
FlinkConventions.LOGICAL(),
54+
"FlinkLogicalOverAggregateConverter"));
55+
56+
protected FlinkLogicalOverAggregate(
57+
RelOptCluster cluster,
58+
RelTraitSet traitSet,
59+
List<RelHint> hints,
60+
RelNode input,
61+
List<RexLiteral> constants,
62+
RelDataType rowType,
63+
List<Group> groups) {
64+
super(cluster, traitSet, hints, input, constants, rowType, groups);
65+
}
66+
67+
public FlinkLogicalOverAggregate(
68+
RelOptCluster cluster,
69+
RelTraitSet traitSet,
70+
RelNode input,
71+
List<RexLiteral> constants,
72+
RelDataType rowType,
73+
List<Group> groups) {
74+
super(cluster, traitSet, input, constants, rowType, groups);
75+
}
76+
77+
@Override
78+
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
79+
return copy(traitSet, inputs, rowType, groups);
80+
}
81+
82+
public RelNode copy(
83+
RelTraitSet traitSet,
84+
List<RelNode> inputs,
85+
RelDataType rowType,
86+
List<Window.Group> groups) {
87+
return new FlinkLogicalOverAggregate(
88+
getCluster(), traitSet, inputs.get(0), constants, rowType, groups);
89+
}
90+
91+
private static class FlinkLogicalOverAggregateConverter extends ConverterRule {
92+
93+
protected FlinkLogicalOverAggregateConverter(Config config) {
94+
super(config);
95+
}
96+
97+
@Override
98+
public @Nullable RelNode convert(RelNode rel) {
99+
LogicalWindow window = (LogicalWindow) rel;
100+
RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
101+
RelTraitSet traitSet =
102+
rel.getCluster()
103+
.traitSetOf(FlinkConventions.LOGICAL())
104+
.replaceIfs(
105+
RelCollationTraitDef.INSTANCE,
106+
() ->
107+
RelMdCollation.window(
108+
mq, window.getInput(), window.groups))
109+
.simplify();
110+
RelNode newInput = RelOptRule.convert(window.getInput(), FlinkConventions.LOGICAL());
111+
112+
window.groups.forEach(
113+
group -> {
114+
final int orderKeySize = group.orderKeys.getFieldCollations().size();
115+
group.aggCalls.forEach(
116+
winAggCall -> {
117+
if (orderKeySize == 0
118+
&& winAggCall.op instanceof SqlRankFunction) {
119+
throw new ValidationException(
120+
"Over Agg: The window rank function requires order by clause with non-constant fields. "
121+
+ "please re-check the over window statement.");
122+
}
123+
});
124+
});
125+
126+
return new FlinkLogicalOverAggregate(
127+
rel.getCluster(),
128+
traitSet,
129+
newInput,
130+
window.constants,
131+
window.getRowType(),
132+
window.groups);
133+
}
134+
}
135+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala

Lines changed: 0 additions & 121 deletions
This file was deleted.

0 commit comments

Comments
 (0)