Skip to content

Commit 0cbdd3f

Browse files
committed
add predicate parser
1 parent 1fa8aef commit 0cbdd3f

File tree

3 files changed

+290
-0
lines changed

3 files changed

+290
-0
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import java.io.Serializable;
22+
23+
/**
24+
* Predicate base info
25+
*
26+
* Date: 2019/12/11
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class PredicateInfo implements Serializable {
31+
32+
private String operatorName;
33+
private String operatorKind;
34+
private String ownerTable;
35+
private String fieldName;
36+
private String condition;
37+
38+
public PredicateInfo(String operatorName, String operatorKind, String ownerTable, String fieldName, String condition) {
39+
this.operatorName = operatorName;
40+
this.operatorKind = operatorKind;
41+
this.ownerTable = ownerTable;
42+
this.fieldName = fieldName;
43+
this.condition = condition;
44+
}
45+
46+
public String getOperatorName() {
47+
return operatorName;
48+
}
49+
50+
public void setOperatorName(String operatorName) {
51+
this.operatorName = operatorName;
52+
}
53+
54+
public String getOperatorKind() {
55+
return operatorKind;
56+
}
57+
58+
public void setOperatorKind(String operatorKind) {
59+
this.operatorKind = operatorKind;
60+
}
61+
62+
public String getOwnerTable() {
63+
return ownerTable;
64+
}
65+
66+
public void setOwnerTable(String ownerTable) {
67+
this.ownerTable = ownerTable;
68+
}
69+
70+
public String getFieldName() {
71+
return fieldName;
72+
}
73+
74+
public void setFieldName(String fieldName) {
75+
this.fieldName = fieldName;
76+
}
77+
78+
public String getCondition() {
79+
return condition;
80+
}
81+
82+
public void setCondition(String condition) {
83+
this.condition = condition;
84+
}
85+
86+
@Override
87+
public String toString() {
88+
return "PredicateInfo{" +
89+
"operatorName='" + operatorName + '\'' +
90+
", operatorKind='" + operatorKind + '\'' +
91+
", ownerTable='" + ownerTable + '\'' +
92+
", fieldName='" + fieldName + '\'' +
93+
", condition='" + condition + '\'' +
94+
'}';
95+
}
96+
97+
public static Builder builder() {
98+
return new Builder();
99+
}
100+
101+
102+
public static class Builder {
103+
104+
private String operatorName;
105+
private String operatorKind;
106+
private String ownerTable;
107+
private String fieldName;
108+
private String condition;
109+
110+
public Builder setOperatorName(String operatorName) {
111+
this.operatorName = operatorName;
112+
return this;
113+
}
114+
115+
public Builder setOperatorKind(String operatorKind) {
116+
this.operatorKind = operatorKind;
117+
return this;
118+
}
119+
120+
public Builder setOwnerTable(String ownerTable) {
121+
this.ownerTable = ownerTable;
122+
return this;
123+
}
124+
125+
public Builder setFieldName(String fieldName) {
126+
this.fieldName = fieldName;
127+
return this;
128+
}
129+
130+
public Builder setCondition(String condition) {
131+
this.condition = condition;
132+
return this;
133+
}
134+
135+
public PredicateInfo build() {
136+
return new PredicateInfo(
137+
operatorName, operatorKind, ownerTable, fieldName, condition);
138+
}
139+
}
140+
141+
142+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import com.dtstack.flink.sql.config.CalciteConfig;
22+
import com.google.common.collect.Lists;
23+
import com.google.common.collect.Maps;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlIdentifier;
26+
import org.apache.calcite.sql.SqlInsert;
27+
import org.apache.calcite.sql.SqlJoin;
28+
import org.apache.calcite.sql.SqlKind;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.SqlNodeList;
31+
import org.apache.calcite.sql.SqlOperator;
32+
import org.apache.calcite.sql.SqlOrderBy;
33+
import org.apache.calcite.sql.SqlSelect;
34+
import org.apache.calcite.sql.SqlWith;
35+
import org.apache.calcite.sql.SqlWithItem;
36+
import org.apache.calcite.sql.parser.SqlParseException;
37+
import org.apache.calcite.sql.parser.SqlParser;
38+
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static org.apache.calcite.sql.SqlKind.*;
43+
44+
/**
45+
*
46+
* 将同级谓词下推到维表
47+
* Date: 2019/12/11
48+
* Company: www.dtstack.com
49+
* @author maqi
50+
*/
51+
public class SidePredicatesParser {
52+
public void fillPredicatesForSideTable(String exeSql, Map<String, SideTableInfo> sideTableMap) throws SqlParseException {
53+
SqlParser sqlParser = SqlParser.create(exeSql, CalciteConfig.MYSQL_LEX_CONFIG);
54+
SqlNode sqlNode = sqlParser.parseStmt();
55+
parseSql(sqlNode, sideTableMap, Maps.newHashMap());
56+
}
57+
58+
private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap, Map<String, String> tabMapping) {
59+
SqlKind sqlKind = sqlNode.getKind();
60+
switch (sqlKind) {
61+
case INSERT:
62+
SqlNode sqlSource = ((SqlInsert) sqlNode).getSource();
63+
parseSql(sqlSource, sideTableMap, tabMapping);
64+
break;
65+
case SELECT:
66+
SqlNode fromNode = ((SqlSelect) sqlNode).getFrom();
67+
SqlNode whereNode = ((SqlSelect) sqlNode).getWhere();
68+
69+
if (fromNode.getKind() != IDENTIFIER) {
70+
// 子查询或者AS
71+
parseSql(fromNode, sideTableMap, tabMapping);
72+
}
73+
74+
if (null != whereNode && whereNode.getKind() != OR) {
75+
List<PredicateInfo> predicateInfos = Lists.newArrayList();
76+
extractPredicateInfo(whereNode, predicateInfos);
77+
// tabMapping: <m,MyTable>, <s.sideTable>
78+
System.out.println(predicateInfos);
79+
}
80+
break;
81+
case JOIN:
82+
SqlNode leftNode = ((SqlJoin) sqlNode).getLeft();
83+
SqlNode rightNode = ((SqlJoin) sqlNode).getRight();
84+
parseSql(leftNode, sideTableMap, tabMapping);
85+
parseSql(rightNode, sideTableMap, tabMapping);
86+
break;
87+
case AS:
88+
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
89+
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
90+
if (info.getKind() == IDENTIFIER) {
91+
tabMapping.put(alias.toString(), info.toString());
92+
} else {
93+
// 为子查询创建一个同级map
94+
parseSql(info, sideTableMap, Maps.newHashMap());
95+
}
96+
break;
97+
98+
case UNION:
99+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
100+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
101+
parseSql(unionLeft, sideTableMap, tabMapping);
102+
parseSql(unionRight, sideTableMap, tabMapping);
103+
break;
104+
}
105+
}
106+
107+
108+
109+
private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predicatesInfo) {
110+
SqlKind sqlKind = whereNode.getKind();
111+
if (sqlKind == SqlKind.AND && ((SqlBasicCall) whereNode).getOperandList().size() == 2) {
112+
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[0], predicatesInfo);
113+
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[1], predicatesInfo);
114+
} else {
115+
SqlOperator operator = ((SqlBasicCall) whereNode).getOperator();
116+
String operatorName = operator.getName();
117+
SqlKind operatorKind = operator.getKind();
118+
119+
if (operatorKind == SqlKind.BETWEEN) {
120+
SqlIdentifier fieldFullPath = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
121+
if (fieldFullPath.names.size() == 2) {
122+
String ownerTable = fieldFullPath.names.get(0);
123+
String fieldName = fieldFullPath.names.get(1);
124+
String content = ((SqlBasicCall) whereNode).getOperands()[1].toString() + " and " + ((SqlBasicCall) whereNode).getOperands()[2].toString();
125+
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
126+
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
127+
predicatesInfo.add(predicateInfo);
128+
}
129+
} else {
130+
SqlIdentifier fieldFullPath = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
131+
// not table name not deal
132+
if (fieldFullPath.names.size() == 2) {
133+
String ownerTable = fieldFullPath.names.get(0);
134+
String fieldName = fieldFullPath.names.get(1);
135+
String content = ((SqlBasicCall) whereNode).getOperands()[1].toString();
136+
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
137+
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
138+
predicatesInfo.add(predicateInfo);
139+
}
140+
}
141+
142+
}
143+
}
144+
145+
146+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class SideSqlExec {
8585
private String tmpFields = null;
8686

8787
private SideSQLParser sideSQLParser = new SideSQLParser();
88+
private SidePredicatesParser sidePredicatesParser = new SidePredicatesParser();
8889

8990
private Map<String, Table> localTableCache = Maps.newHashMap();
9091

@@ -97,6 +98,7 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
9798
}
9899

99100
localTableCache.putAll(tableCache);
101+
sidePredicatesParser.fillPredicatesForSideTable(sql, sideTableMap);
100102
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
101103
Object pollObj = null;
102104

0 commit comments

Comments
 (0)