Skip to content

Commit ccc6e1a

Browse files
committed
实时计算谓词下推,关系型数据库异步方式
1 parent 4700955 commit ccc6e1a

File tree

2 files changed

+76
-2
lines changed

2 files changed

+76
-2
lines changed

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,70 @@
3939
import org.apache.calcite.sql.SqlBasicCall;
4040
import org.apache.calcite.sql.SqlKind;
4141
import org.apache.calcite.sql.SqlNode;
42+
import org.apache.calcite.sql.*;
43+
import org.apache.calcite.sql.parser.SqlParserPos;
4244
import org.apache.commons.lang3.StringUtils;
45+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4346

4447
import java.util.List;
48+
import java.util.Map;
4549

4650
/**
4751
* @Auther: jiangjunjie
4852
* @Date: 2019-06-30 14:57
4953
* @Description:
5054
*/
5155
public class ParseUtils {
56+
public static void parseSideWhere(SqlNode whereNode, Map<String, String> physicalFields, List<String> whereConditionList) {
57+
SqlKind sqlKind = whereNode.getKind();
58+
if ((sqlKind == SqlKind.OR || sqlKind == SqlKind.AND) && ((SqlBasicCall) whereNode).getOperandList().size() == 2) {
59+
SqlNode[] sqlOperandsList = ((SqlBasicCall) whereNode).getOperands();
60+
// whereNode是一颗先解析or再解析and的二叉树。二叉树中序遍历,先左子树,其次中间节点,最后右子树
61+
parseSideWhere(sqlOperandsList[0], physicalFields, whereConditionList);
62+
whereConditionList.add(sqlKind.name());
63+
parseSideWhere(sqlOperandsList[1], physicalFields, whereConditionList);
64+
} else {
65+
SqlIdentifier sqlIdentifier = (SqlIdentifier) ((SqlBasicCall) whereNode).getOperands()[0];
66+
String fieldName = null;
67+
if (sqlIdentifier.names.size() == 1) {
68+
fieldName = sqlIdentifier.getComponent(0).getSimple();
69+
} else {
70+
fieldName = sqlIdentifier.getComponent(1).getSimple();
71+
}
72+
if (physicalFields.containsKey(fieldName)) {
73+
String sideFieldName = physicalFields.get(fieldName);
74+
// clone SqlIdentifier node
75+
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
76+
SqlIdentifier sqlIdentifierClone = new SqlIdentifier("", null, sqlParserPos);
77+
List<String> namesClone = Lists.newArrayList();
78+
for(String name :sqlIdentifier.names){
79+
namesClone.add(name);
80+
}
81+
sqlIdentifierClone.setNames(namesClone,null);
82+
// clone SqlBasicCall node
83+
SqlBasicCall sqlBasicCall = (SqlBasicCall)whereNode;
84+
SqlNode[] sqlNodes = sqlBasicCall.getOperands();
85+
SqlNode[] sqlNodesClone = new SqlNode[sqlNodes.length];
86+
for (int i = 0; i < sqlNodes.length; i++) {
87+
sqlNodesClone[i] = sqlNodes[i];
88+
}
89+
SqlBasicCall sqlBasicCallClone = new SqlBasicCall(sqlBasicCall.getOperator(), sqlNodesClone, sqlParserPos);
90+
// 替换维表中真实字段名
91+
List<String> names = Lists.newArrayList();
92+
names.add(sideFieldName);
93+
sqlIdentifierClone.setNames(names, null);
94+
95+
sqlBasicCallClone.setOperand(0, sqlIdentifierClone);
96+
whereConditionList.add(sqlBasicCallClone.toString());
97+
} else {
98+
// 如果字段不是维表中字段,删除字段前的链接符
99+
if (whereConditionList.size() >= 1) {
100+
whereConditionList.remove(whereConditionList.size() - 1);
101+
}
102+
}
103+
}
104+
}
105+
52106
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
53107
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
54108
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);
@@ -78,4 +132,4 @@ public static String parseOperator(SqlKind sqlKind) {
78132
return sqlKind.sql;
79133
}
80134

81-
}
135+
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
import org.apache.calcite.sql.SqlIdentifier;
2929
import org.apache.calcite.sql.SqlKind;
3030
import org.apache.calcite.sql.SqlNode;
31+
import org.apache.calcite.sql.*;
3132
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3233
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
3334

35+
import java.util.Arrays;
3436
import java.util.List;
37+
import java.util.Map;
38+
3539

3640
/**
3741
* Reason:
@@ -68,6 +72,14 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6872
dealOneEqualCon(sqlNode, sideTableName);
6973
}
7074

75+
List<String> whereConditionList = Lists.newArrayList();;
76+
Map<String, String> physicalFields = rdbSideTableInfo.getPhysicalFields();
77+
SqlNode whereNode = ((SqlSelect) joinInfo.getSelectNode()).getWhere();
78+
if (whereNode != null) {
79+
// 解析维表中的过滤条件
80+
ParseUtils.parseSideWhere(whereNode, physicalFields, whereConditionList);
81+
}
82+
7183
sqlCondition = "select ${selectField} from ${tableName} where ";
7284
for (int i = 0; i < equalFieldList.size(); i++) {
7385
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
@@ -77,6 +89,14 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
7789
sqlCondition += " and ";
7890
}
7991
}
92+
if (0 != whereConditionList.size()) {
93+
// 如果where条件中第一个符合条件的是维表中的条件
94+
String firstCondition = whereConditionList.get(0);
95+
if (!"and".equalsIgnoreCase(firstCondition) && !"or".equalsIgnoreCase(firstCondition)) {
96+
sqlCondition += " and ";
97+
}
98+
sqlCondition += String.join(" ", whereConditionList);
99+
}
80100

81101
sqlCondition = sqlCondition.replace("${tableName}", rdbSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
82102
}
@@ -134,4 +154,4 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
134154

135155
}
136156

137-
}
157+
}

0 commit comments

Comments
 (0)