Skip to content

Commit 7937ecb

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev_feature_predicate-pushdown' into v1.8.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java
2 parents 56165d8 + c7f2f10 commit 7937ecb

File tree

2 files changed

+77
-12
lines changed

2 files changed

+77
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,31 +36,73 @@
3636

3737
package com.dtstack.flink.sql.util;
3838

39-
import com.dtstack.flink.sql.side.JoinInfo;
40-
import org.apache.calcite.sql.SqlBasicCall;
41-
import org.apache.calcite.sql.SqlDataTypeSpec;
42-
import org.apache.calcite.sql.SqlIdentifier;
43-
import org.apache.calcite.sql.SqlJoin;
44-
import org.apache.calcite.sql.SqlKind;
45-
import org.apache.calcite.sql.SqlLiteral;
46-
import org.apache.calcite.sql.SqlNode;
47-
import org.apache.calcite.sql.SqlNodeList;
39+
import org.apache.calcite.sql.*;
4840
import org.apache.calcite.sql.fun.SqlCase;
41+
import org.apache.calcite.sql.parser.SqlParserPos;
4942
import org.apache.commons.lang3.StringUtils;
50-
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
43+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
5144

5245
import java.util.List;
5346
import java.util.Map;
5447

5548
import static org.apache.calcite.sql.SqlKind.*;
56-
import static org.apache.calcite.sql.SqlKind.OTHER;
5749

5850
/**
5951
* @Auther: jiangjunjie
6052
* @Date: 2019-06-30 14:57
6153
* @Description:
6254
*/
6355
public class ParseUtils {
56+
public static void parseSideWhere(SqlNode whereNode, Map<String, String> physicalFields, List<String> whereConditionList) {
57+
SqlKind sqlKind = whereNode.getKind();
58+
if ((sqlKind == SqlKind.OR || sqlKind == SqlKind.AND) && ((SqlBasicCall) whereNode).getOperandList().size() == 2) {
59+
SqlNode[] sqlOperandsList = ((SqlBasicCall) whereNode).getOperands();
60+
// whereNode是一颗先解析or再解析and的二叉树。二叉树中序遍历,先左子树,其次中间节点,最后右子树
61+
parseSideWhere(sqlOperandsList[0], physicalFields, whereConditionList);
62+
whereConditionList.add(sqlKind.name());
63+
parseSideWhere(sqlOperandsList[1], physicalFields, whereConditionList);
64+
} else {
65+
SqlIdentifier sqlIdentifier = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
66+
String fieldName = null;
67+
if (sqlIdentifier.names.size() == 1) {
68+
fieldName = sqlIdentifier.getComponent(0).getSimple();
69+
} else {
70+
fieldName = sqlIdentifier.getComponent(1).getSimple();
71+
}
72+
if (physicalFields.containsKey(fieldName)) {
73+
String sideFieldName = physicalFields.get(fieldName);
74+
// clone SqlIdentifier node
75+
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
76+
SqlIdentifier sqlIdentifierClone = new SqlIdentifier("", null, sqlParserPos);
77+
List<String> namesClone = Lists.newArrayList();
78+
for(String name :sqlIdentifier.names){
79+
namesClone.add(name);
80+
}
81+
sqlIdentifierClone.setNames(namesClone,null);
82+
// clone SqlBasicCall node
83+
SqlBasicCall sqlBasicCall = (SqlBasicCall)whereNode;
84+
SqlNode[] sqlNodes = sqlBasicCall.getOperands();
85+
SqlNode[] sqlNodesClone = new SqlNode[sqlNodes.length];
86+
for (int i = 0; i < sqlNodes.length; i++) {
87+
sqlNodesClone[i] = sqlNodes[i];
88+
}
89+
SqlBasicCall sqlBasicCallClone = new SqlBasicCall(sqlBasicCall.getOperator(), sqlNodesClone, sqlParserPos);
90+
// 替换维表中真实字段名
91+
List<String> names = Lists.newArrayList();
92+
names.add(sideFieldName);
93+
sqlIdentifierClone.setNames(names, null);
94+
95+
sqlBasicCallClone.setOperand(0, sqlIdentifierClone);
96+
whereConditionList.add(sqlBasicCallClone.toString());
97+
} else {
98+
// 如果字段不是维表中字段,删除字段前的链接符
99+
if (whereConditionList.size() >= 1) {
100+
whereConditionList.remove(whereConditionList.size() - 1);
101+
}
102+
}
103+
}
104+
}
105+
64106
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
65107
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
66108
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
import org.apache.calcite.sql.SqlIdentifier;
2929
import org.apache.calcite.sql.SqlKind;
3030
import org.apache.calcite.sql.SqlNode;
31+
import org.apache.calcite.sql.*;
3132
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3233
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3334

35+
import java.util.Arrays;
3436
import java.util.List;
37+
import java.util.Map;
38+
3539

3640
/**
3741
* Reason:
@@ -68,6 +72,14 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6872
dealOneEqualCon(sqlNode, sideTableName);
6973
}
7074

75+
List<String> whereConditionList = Lists.newArrayList();;
76+
Map<String, String> physicalFields = rdbSideTableInfo.getPhysicalFields();
77+
SqlNode whereNode = ((SqlSelect) joinInfo.getSelectNode()).getWhere();
78+
if (whereNode != null) {
79+
// 解析维表中的过滤条件
80+
ParseUtils.parseSideWhere(whereNode, physicalFields, whereConditionList);
81+
}
82+
7183
sqlCondition = "select ${selectField} from ${tableName} where ";
7284
for (int i = 0; i < equalFieldList.size(); i++) {
7385
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
@@ -77,6 +89,17 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
7789
sqlCondition += " and ";
7890
}
7991
}
92+
if (0 != whereConditionList.size()) {
93+
// 如果where条件中第一个符合条件的是维表中的条件
94+
String firstCondition = whereConditionList.get(0);
95+
if (!"and".equalsIgnoreCase(firstCondition) && !"or".equalsIgnoreCase(firstCondition)) {
96+
whereConditionList.add(0, "and (");
97+
} else {
98+
whereConditionList.add(1, "(");
99+
}
100+
whereConditionList.add(whereConditionList.size(), ")");
101+
sqlCondition += String.join(" ", whereConditionList);
102+
}
80103

81104
sqlCondition = sqlCondition.replace("${tableName}", rdbSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
82105

@@ -137,4 +160,4 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
137160

138161
}
139162

140-
}
163+
}

0 commit comments

Comments
 (0)