Skip to content

Commit 410d618

Browse files
committed
rdb predicate push down
1 parent 0cbdd3f commit 410d618

File tree

10 files changed

+92
-330
lines changed

10 files changed

+92
-330
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,20 @@
2727
import org.apache.calcite.sql.SqlJoin;
2828
import org.apache.calcite.sql.SqlKind;
2929
import org.apache.calcite.sql.SqlNode;
30-
import org.apache.calcite.sql.SqlNodeList;
3130
import org.apache.calcite.sql.SqlOperator;
32-
import org.apache.calcite.sql.SqlOrderBy;
3331
import org.apache.calcite.sql.SqlSelect;
34-
import org.apache.calcite.sql.SqlWith;
35-
import org.apache.calcite.sql.SqlWithItem;
3632
import org.apache.calcite.sql.parser.SqlParseException;
3733
import org.apache.calcite.sql.parser.SqlParser;
3834

3935
import java.util.List;
4036
import java.util.Map;
37+
import java.util.stream.Collectors;
4138

4239
import static org.apache.calcite.sql.SqlKind.*;
4340

4441
/**
4542
*
46-
* 将同级谓词下推到维表
43+
* 将同级谓词信息填充到维表
4744
* Date: 2019/12/11
4845
* Company: www.dtstack.com
4946
* @author maqi
@@ -55,6 +52,12 @@ public void fillPredicatesForSideTable(String exeSql, Map<String, SideTableInfo>
5552
parseSql(sqlNode, sideTableMap, Maps.newHashMap());
5653
}
5754

55+
/**
56+
* 将谓词信息填充到维表属性
57+
* @param sqlNode
58+
* @param sideTableMap
59+
* @param tabMapping 谓词属性中别名对应的真实维表名称
60+
*/
5861
private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap, Map<String, String> tabMapping) {
5962
SqlKind sqlKind = sqlNode.getKind();
6063
switch (sqlKind) {
@@ -67,15 +70,13 @@ private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap,
6770
SqlNode whereNode = ((SqlSelect) sqlNode).getWhere();
6871

6972
if (fromNode.getKind() != IDENTIFIER) {
70-
// 子查询或者AS
7173
parseSql(fromNode, sideTableMap, tabMapping);
7274
}
73-
75+
// 带or的不解析
7476
if (null != whereNode && whereNode.getKind() != OR) {
75-
List<PredicateInfo> predicateInfos = Lists.newArrayList();
76-
extractPredicateInfo(whereNode, predicateInfos);
77-
// tabMapping: <m,MyTable>, <s.sideTable>
78-
System.out.println(predicateInfos);
77+
List<PredicateInfo> predicateInfoList = Lists.newArrayList();
78+
extractPredicateInfo(whereNode, predicateInfoList);
79+
fillToSideTableInfo(sideTableMap, tabMapping, predicateInfoList);
7980
}
8081
break;
8182
case JOIN:
@@ -94,7 +95,6 @@ private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap,
9495
parseSql(info, sideTableMap, Maps.newHashMap());
9596
}
9697
break;
97-
9898
case UNION:
9999
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
100100
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -104,43 +104,39 @@ private void parseSql(SqlNode sqlNode, Map<String, SideTableInfo> sideTableMap,
104104
}
105105
}
106106

107+
private void fillToSideTableInfo(Map<String, SideTableInfo> sideTableMap, Map<String, String> tabMapping, List<PredicateInfo> predicateInfoList) {
108+
predicateInfoList.stream().filter(info -> sideTableMap.containsKey(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())))
109+
.map(info -> sideTableMap.get(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())).getPredicateInfoes().add(info))
110+
.count();
111+
}
107112

108113

109-
private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predicatesInfo) {
114+
private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predicatesInfoList) {
110115
SqlKind sqlKind = whereNode.getKind();
111116
if (sqlKind == SqlKind.AND && ((SqlBasicCall) whereNode).getOperandList().size() == 2) {
112-
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[0], predicatesInfo);
113-
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[1], predicatesInfo);
117+
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[0], predicatesInfoList);
118+
extractPredicateInfo(((SqlBasicCall) whereNode).getOperands()[1], predicatesInfoList);
114119
} else {
115120
SqlOperator operator = ((SqlBasicCall) whereNode).getOperator();
116121
String operatorName = operator.getName();
117122
SqlKind operatorKind = operator.getKind();
118-
119-
if (operatorKind == SqlKind.BETWEEN) {
120-
SqlIdentifier fieldFullPath = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
121-
if (fieldFullPath.names.size() == 2) {
122-
String ownerTable = fieldFullPath.names.get(0);
123-
String fieldName = fieldFullPath.names.get(1);
124-
String content = ((SqlBasicCall) whereNode).getOperands()[1].toString() + " and " + ((SqlBasicCall) whereNode).getOperands()[2].toString();
125-
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
126-
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
127-
predicatesInfo.add(predicateInfo);
128-
}
129-
} else {
123+
// 跳过函数
124+
if ((((SqlBasicCall) whereNode).getOperands()[0] instanceof SqlIdentifier)) {
130125
SqlIdentifier fieldFullPath = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
131-
// not table name not deal
132126
if (fieldFullPath.names.size() == 2) {
133127
String ownerTable = fieldFullPath.names.get(0);
134128
String fieldName = fieldFullPath.names.get(1);
135-
String content = ((SqlBasicCall) whereNode).getOperands()[1].toString();
129+
String content = (operatorKind == SqlKind.BETWEEN) ? ((SqlBasicCall) whereNode).getOperands()[1].toString() + " AND " +
130+
((SqlBasicCall) whereNode).getOperands()[2].toString() : ((SqlBasicCall) whereNode).getOperands()[1].toString();
131+
136132
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
137133
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
138-
predicatesInfo.add(predicateInfo);
134+
135+
predicatesInfoList.add(predicateInfo);
139136
}
140137
}
141138

142139
}
143140
}
144141

145-
146142
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
235235
} else {
236236
tableInfo.setLeftTableAlias(leftTbAlias);
237237
}
238-
if (leftTbAlias.equals("")){
238+
if (rightTableAlias.equals("")){
239239
tableInfo.setRightTableAlias(rightTableName);
240240
} else {
241241
tableInfo.setRightTableAlias(rightTableAlias);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,18 @@ public class SideSqlExec {
9090
private Map<String, Table> localTableCache = Maps.newHashMap();
9191

9292
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
93-
Map<String, Table> tableCache, StreamQueryConfig queryConfig)
94-
throws Exception {
95-
93+
Map<String, Table> tableCache, StreamQueryConfig queryConfig) throws Exception {
9694
if(localSqlPluginPath == null){
9795
throw new RuntimeException("need to set localSqlPluginPath");
9896
}
9997

10098
localTableCache.putAll(tableCache);
101-
sidePredicatesParser.fillPredicatesForSideTable(sql, sideTableMap);
99+
try {
100+
sidePredicatesParser.fillPredicatesForSideTable(sql, sideTableMap);
101+
} catch (Exception e) {
102+
LOG.error("fill predicates for sideTable fail ", e);
103+
}
104+
102105
Queue<Object> exeQueue = sideSQLParser.getExeQueue(sql, sideTableMap.keySet());
103106
Object pollObj = null;
104107

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.table.TableInfo;
24+
import com.google.common.collect.Lists;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2627

2728
import java.io.Serializable;
29+
import java.util.List;
2830

2931
/**
3032
* Reason:
@@ -65,6 +67,8 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
6567

6668
private String cacheMode="ordered";
6769

70+
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
71+
6872
public RowTypeInfo getRowTypeInfo(){
6973
Class[] fieldClass = getFieldClasses();
7074
TypeInformation<?>[] types = new TypeInformation[fieldClass.length];
@@ -131,4 +135,12 @@ public int getAsyncTimeout() {
131135
public void setAsyncTimeout(int asyncTimeout) {
132136
this.asyncTimeout = asyncTimeout;
133137
}
138+
139+
public void setPredicateInfoes(List<PredicateInfo> predicateInfoes) {
140+
this.predicateInfoes = predicateInfoes;
141+
}
142+
143+
public List<PredicateInfo> getPredicateInfoes() {
144+
return predicateInfoes;
145+
}
134146
}

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -53,64 +53,6 @@
5353
* @Description:
5454
*/
5555
public class ParseUtils {
56-
public static void parseSideWhere(SqlNode whereNode, Map<String, String> physicalFields, List<String> whereConditionList) {
57-
SqlKind sqlKind = whereNode.getKind();
58-
if ((sqlKind == SqlKind.OR || sqlKind == SqlKind.AND) && ((SqlBasicCall) whereNode).getOperandList().size() == 2) {
59-
SqlNode[] sqlOperandsList = ((SqlBasicCall) whereNode).getOperands();
60-
// whereNode是一颗先解析or再解析and的二叉树。二叉树中序遍历,先左子树,其次中间节点,最后右子树
61-
parseSideWhere(sqlOperandsList[0], physicalFields, whereConditionList);
62-
whereConditionList.add(sqlKind.name());
63-
parseSideWhere(sqlOperandsList[1], physicalFields, whereConditionList);
64-
} else {
65-
SqlIdentifier sqlIdentifier = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
66-
String fieldName = null;
67-
if (sqlIdentifier.names.size() == 1) {
68-
fieldName = sqlIdentifier.getComponent(0).getSimple();
69-
} else {
70-
fieldName = sqlIdentifier.getComponent(1).getSimple();
71-
}
72-
if (physicalFields.containsKey(fieldName)) {
73-
String sideFieldName = physicalFields.get(fieldName);
74-
// clone SqlIdentifier node
75-
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
76-
SqlIdentifier sqlIdentifierClone = new SqlIdentifier("", null, sqlParserPos);
77-
List<String> namesClone = Lists.newArrayList();
78-
for(String name :sqlIdentifier.names){
79-
namesClone.add(name);
80-
}
81-
sqlIdentifierClone.setNames(namesClone,null);
82-
// clone SqlBasicCall node
83-
SqlBasicCall sqlBasicCall = (SqlBasicCall)whereNode;
84-
SqlNode[] sqlNodes = sqlBasicCall.getOperands();
85-
SqlNode[] sqlNodesClone = new SqlNode[sqlNodes.length];
86-
for (int i = 0; i < sqlNodes.length; i++) {
87-
sqlNodesClone[i] = sqlNodes[i];
88-
}
89-
SqlOperator sqlOperator = sqlBasicCall.getOperator();
90-
if (sqlOperator instanceof SqlBetweenOperator) {
91-
// Between(ASYMMETRIC) node can not resolve
92-
// SqlBetweenOperator sqlBetweenOperator = new SqlBetweenOperator(null, false);
93-
// sqlBasicCallClone = new SqlBasicCall(sqlBetweenOperator, sqlNodesClone, sqlParserPos);
94-
whereConditionList.clear();
95-
return;
96-
}
97-
SqlBasicCall sqlBasicCallClone = new SqlBasicCall(sqlOperator, sqlNodesClone, sqlParserPos);
98-
// 替换维表中真实字段名
99-
List<String> names = Lists.newArrayList();
100-
names.add(sideFieldName);
101-
sqlIdentifierClone.setNames(names, null);
102-
103-
sqlBasicCallClone.setOperand(0, sqlIdentifierClone);
104-
whereConditionList.add(sqlBasicCallClone.toString());
105-
} else {
106-
// 如果字段不是维表中字段,删除字段前的链接符
107-
if (whereConditionList.size() >= 1) {
108-
whereConditionList.remove(whereConditionList.size() - 1);
109-
}
110-
}
111-
}
112-
}
113-
11456
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
11557
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
11658
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);

kafka09/kafka09-source/src/test/java/com/dtstack/flinkx/AppTest.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

kafka10/kafka10-source/src/test/java/com/dtstack/flinkx/AppTest.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)