Skip to content

Commit 14bc810

Browse files
author
xuchao
committed
初步重构 sql语句解析逻辑;
字段替换和重命名相关的逻辑在解析阶段完成; 目标:经过解析sql之后的所有sql是确定的。
1 parent f3ee3ce commit 14bc810

File tree

7 files changed

+397
-156
lines changed

7 files changed

+397
-156
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,13 @@ public static void main(String[] args) throws Exception {
149149
env.execute(name);
150150
}
151151

152-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache, StreamQueryConfig queryConfig) throws Exception {
152+
private static void sqlTranslation(String localSqlPluginPath,
153+
StreamTableEnvironment tableEnv,
154+
SqlTree sqlTree,
155+
Map<String, SideTableInfo> sideTableMap,
156+
Map<String, Table> registerTableCache,
157+
StreamQueryConfig queryConfig) throws Exception {
158+
153159
SideSqlExec sideSqlExec = new SideSqlExec();
154160
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
155161
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {

core/src/main/java/com/dtstack/flink/sql/side/FieldInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,13 @@ public boolean equals(Object o) {
8585
public int hashCode() {
8686
return Objects.hash(table, fieldName);
8787
}
88+
89+
@Override
90+
public String toString() {
91+
return "FieldInfo{" +
92+
"table='" + table + '\'' +
93+
", fieldName='" + fieldName + '\'' +
94+
", typeInformation=" + typeInformation +
95+
'}';
96+
}
8897
}

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class JoinInfo implements Serializable {
4141
private static final long serialVersionUID = -1L;
4242

4343
//左表是否是维表
44-
private boolean leftIsSideTable;
44+
private boolean leftIsSideTable = false;
4545

4646
//右表是否是维表
4747
private boolean rightIsSideTable;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public JoinNodeDealer(SideSQLParser sideSQLParser){
7474
* 解析 join 操作
7575
* @param joinNode
7676
* @param sideTableSet 标明哪些表名是维表
77-
* @param queueInfo
77+
* @param queueInfo sql执行队列
7878
* @param parentWhere join 关联的最上层的where 节点
7979
* @param parentSelectList join 关联的最上层的select 节点
8080
* @param joinFieldSet
@@ -99,18 +99,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9999
String rightTableName = "";
100100
String rightTableAlias = "";
101101

102-
//TODO 含义需要更明确
103-
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
104-
105-
//如果是连续join 判断是否已经处理过添加到执行队列
102+
//抽取join中的的条件
106103
extractJoinField(joinNode.getCondition(), joinFieldSet);
107104

108-
if(leftNode.getKind() == IDENTIFIER){
109-
leftTbName = leftNode.toString();
110-
} else if (leftNode.getKind() == JOIN) {
105+
if (leftNode.getKind() == JOIN) {
111106
//处理连续join
112107
dealNestJoin(joinNode, sideTableSet,
113-
queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList, fieldReplaceRef);
108+
queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef, parentSelectList);
114109
leftNode = joinNode.getLeft();
115110
}
116111

@@ -135,24 +130,16 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
135130
JoinInfo tableInfo = new JoinInfo();
136131
tableInfo.setLeftTableName(leftTbName);
137132
tableInfo.setRightTableName(rightTableName);
138-
if (StringUtils.isEmpty(leftTbAlias)){
139-
tableInfo.setLeftTableAlias(leftTbName);
140-
} else {
141-
tableInfo.setLeftTableAlias(leftTbAlias);
142-
}
143133

144-
if (StringUtils.isEmpty(rightTableAlias)){
145-
tableInfo.setRightTableAlias(rightTableName);
146-
} else {
147-
tableInfo.setRightTableAlias(rightTableAlias);
148-
}
134+
leftTbAlias = StringUtils.isEmpty(leftTbAlias) ? leftTbName : leftTbAlias;
135+
rightTableAlias = StringUtils.isEmpty(rightTableAlias) ? rightTableName : rightTableAlias;
149136

150-
tableInfo.setLeftIsSideTable(leftIsSide);
137+
tableInfo.setLeftTableAlias(leftTbAlias);
138+
tableInfo.setRightTableAlias(rightTableAlias);
151139
tableInfo.setRightIsSideTable(rightIsSide);
152140
tableInfo.setLeftNode(leftNode);
153141
tableInfo.setRightNode(rightNode);
154142
tableInfo.setJoinType(joinType);
155-
156143
tableInfo.setCondition(joinNode.getCondition());
157144
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
158145

@@ -216,6 +203,15 @@ public void extractJoinNeedSelectField(SqlNode leftNode,
216203
tableInfo.setRightSelectFieldInfo(rightTbSelectField);
217204
}
218205

206+
/**
207+
* 指定的节点关联到的 select 中的字段和 where中的字段
208+
* @param sqlNode
209+
* @param parentWhere
210+
* @param parentSelectList
211+
* @param tableRef
212+
* @param joinFieldSet
213+
* @return
214+
*/
219215
public Set<String> extractField(SqlNode sqlNode,
220216
SqlNode parentWhere,
221217
SqlNodeList parentSelectList,
@@ -250,8 +246,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
250246
Set<Tuple2<String, String>> joinFieldSet,
251247
Map<String, String> tableRef,
252248
Map<String, String> fieldRef,
253-
SqlNodeList parentSelectList,
254-
HashBiMap<String, String> fieldReplaceRef){
249+
SqlNodeList parentSelectList){
255250

256251
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
257252
SqlNode parentRightJoinNode = joinNode.getRight();
@@ -267,7 +262,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
267262
SqlBasicCall buildAs = TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null);
268263

269264
if(rightIsSide){
270-
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, fieldReplaceRef, tableRef);
265+
addSideInfoToExeQueue(queueInfo, joinInfo, joinNode, parentSelectList, parentWhere, tableRef);
271266
}
272267

273268
SqlNode newLeftNode = joinNode.getLeft();
@@ -280,7 +275,7 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
280275

281276
//替换leftNode 为新的查询
282277
joinNode.setLeft(buildAs);
283-
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
278+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere);
284279
}
285280

286281
return joinInfo;
@@ -294,15 +289,13 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
294289
* @param joinNode
295290
* @param parentSelectList
296291
* @param parentWhere
297-
* @param fieldReplaceRef
298292
* @param tableRef
299293
*/
300294
public void addSideInfoToExeQueue(Queue<Object> queueInfo,
301295
JoinInfo joinInfo,
302296
SqlJoin joinNode,
303297
SqlNodeList parentSelectList,
304298
SqlNode parentWhere,
305-
HashBiMap<String, String> fieldReplaceRef,
306299
Map<String, String> tableRef){
307300
//只处理维表
308301
if(!joinInfo.isRightIsSideTable()){
@@ -315,7 +308,7 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
315308
//替换左表为新的表名称
316309
joinNode.setLeft(buildAs);
317310

318-
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere, fieldReplaceRef);
311+
replaceSelectAndWhereField(buildAs, leftJoinNode, tableRef, parentSelectList, parentWhere);
319312
}
320313

321314
/**
@@ -325,14 +318,12 @@ public void addSideInfoToExeQueue(Queue<Object> queueInfo,
325318
* @param tableRef
326319
* @param parentSelectList
327320
* @param parentWhere
328-
* @param fieldReplaceRef
329321
*/
330322
public void replaceSelectAndWhereField(SqlBasicCall buildAs,
331323
SqlNode leftJoinNode,
332324
Map<String, String> tableRef,
333325
SqlNodeList parentSelectList,
334-
SqlNode parentWhere,
335-
HashBiMap<String, String> fieldReplaceRef){
326+
SqlNode parentWhere){
336327

337328
String newLeftTableName = buildAs.getOperands()[1].toString();
338329
Set<String> fromTableNameSet = Sets.newHashSet();
@@ -343,6 +334,7 @@ public void replaceSelectAndWhereField(SqlBasicCall buildAs,
343334
}
344335

345336
//替换select field 中的对应字段
337+
HashBiMap<String, String> fieldReplaceRef = HashBiMap.create();
346338
for(SqlNode sqlNode : parentSelectList.getList()){
347339
for(String tbTmp : fromTableNameSet) {
348340
TableUtils.replaceSelectFieldTable(sqlNode, tbTmp, newLeftTableName, fieldReplaceRef);

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 12 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -90,48 +90,16 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws
9090
return queueInfo;
9191
}
9292

93-
private void checkAndReplaceMultiJoin(SqlNode sqlNode, Set<String> sideTableSet) {
94-
SqlKind sqlKind = sqlNode.getKind();
95-
switch (sqlKind) {
96-
case WITH: {
97-
SqlWith sqlWith = (SqlWith) sqlNode;
98-
SqlNodeList sqlNodeList = sqlWith.withList;
99-
for (SqlNode withAsTable : sqlNodeList) {
100-
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
101-
checkAndReplaceMultiJoin(sqlWithItem.query, sideTableSet);
102-
}
103-
checkAndReplaceMultiJoin(sqlWith.body, sideTableSet);
104-
break;
105-
}
106-
case INSERT:
107-
SqlNode sqlSource = ((SqlInsert) sqlNode).getSource();
108-
checkAndReplaceMultiJoin(sqlSource, sideTableSet);
109-
break;
110-
case SELECT:
111-
SqlNode sqlFrom = ((SqlSelect) sqlNode).getFrom();
112-
if (sqlFrom.getKind() != IDENTIFIER) {
113-
checkAndReplaceMultiJoin(sqlFrom, sideTableSet);
114-
}
115-
break;
116-
case JOIN:
117-
convertSideJoinToNewQuery((SqlJoin) sqlNode, sideTableSet);
118-
break;
119-
case AS:
120-
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
121-
if (info.getKind() != IDENTIFIER) {
122-
checkAndReplaceMultiJoin(info, sideTableSet);
123-
}
124-
break;
125-
case UNION:
126-
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
127-
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
128-
checkAndReplaceMultiJoin(unionLeft, sideTableSet);
129-
checkAndReplaceMultiJoin(unionRight, sideTableSet);
130-
break;
131-
}
132-
}
133-
13493

94+
/**
95+
* 解析 sql 根据维表 join关系重新组装新的sql
96+
* @param sqlNode
97+
* @param sideTableSet
98+
* @param queueInfo
99+
* @param parentWhere
100+
* @param parentSelectList
101+
* @return
102+
*/
135103
public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo, SqlNode parentWhere, SqlNodeList parentSelectList){
136104
SqlKind sqlKind = sqlNode.getKind();
137105
switch (sqlKind){
@@ -175,7 +143,9 @@ public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
175143
JoinNodeDealer joinNodeDealer = new JoinNodeDealer(this);
176144
Set<Tuple2<String, String>> joinFieldSet = Sets.newHashSet();
177145
Map<String, String> tableRef = Maps.newHashMap();
178-
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo, parentWhere, parentSelectList, joinFieldSet, tableRef);
146+
Map<String, String> fieldRef = Maps.newHashMap();
147+
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
148+
parentWhere, parentSelectList, joinFieldSet, tableRef, fieldRef);
179149
case AS:
180150
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
181151
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -207,26 +177,7 @@ public Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
207177
return "";
208178
}
209179

210-
private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) {
211-
SqlNode info = ((SqlBasicCall) sqlNode).getOperands()[0];
212-
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
213-
String infoStr = info.getKind() == IDENTIFIER ? info.toString() : null;
214-
215-
AliasInfo aliasInfo = new AliasInfo();
216-
aliasInfo.setName(infoStr);
217-
aliasInfo.setAlias(alias.toString());
218-
return aliasInfo;
219-
}
220180

221-
/**
222-
* 将和维表关联的join 替换为一个新的查询
223-
* @param sqlNode
224-
* @param sideTableSet
225-
*/
226-
private void convertSideJoinToNewQuery(SqlJoin sqlNode, Set<String> sideTableSet) {
227-
checkAndReplaceMultiJoin(sqlNode.getLeft(), sideTableSet);
228-
checkAndReplaceMultiJoin(sqlNode.getRight(), sideTableSet);
229-
}
230181

231182

232183
public void setLocalTableCache(Map<String, Table> localTableCache) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,12 @@ public class SideSqlExec {
8484

8585
private Map<String, Table> localTableCache = Maps.newHashMap();
8686

87-
public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
88-
Map<String, Table> tableCache, StreamQueryConfig queryConfig, CreateTmpTableParser.SqlParserResult createView) throws Exception {
87+
public void exec(String sql,
88+
Map<String, SideTableInfo> sideTableMap,
89+
StreamTableEnvironment tableEnv,
90+
Map<String, Table> tableCache,
91+
StreamQueryConfig queryConfig,
92+
CreateTmpTableParser.SqlParserResult createView) throws Exception {
8993
if(localSqlPluginPath == null){
9094
throw new RuntimeException("need to set localSqlPluginPath");
9195
}
@@ -192,66 +196,6 @@ private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map<String, Table>
192196
}
193197

194198

195-
/**
196-
* 添加字段别名
197-
* @param pollSqlNode
198-
* @param fieldList
199-
* @param mappingTable
200-
*/
201-
private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
202-
SqlKind sqlKind = pollSqlNode.getKind();
203-
switch (sqlKind) {
204-
case INSERT:
205-
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
206-
addAliasForFieldNode(source, fieldList, mappingTable);
207-
break;
208-
case AS:
209-
addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
210-
break;
211-
case SELECT:
212-
SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList();
213-
selectList.getList().forEach(node -> {
214-
if (node.getKind() == IDENTIFIER) {
215-
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
216-
if (sqlIdentifier.names.size() == 1) {
217-
return;
218-
}
219-
// save real field
220-
String fieldName = sqlIdentifier.names.get(1);
221-
if (!fieldName.endsWith("0") || fieldName.endsWith("0") && mappingTable.columnMap().containsKey(fieldName)) {
222-
fieldList.add(fieldName);
223-
}
224-
225-
}
226-
});
227-
for (int i = 0; i < selectList.getList().size(); i++) {
228-
SqlNode node = selectList.get(i);
229-
if (node.getKind() == IDENTIFIER) {
230-
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
231-
if (sqlIdentifier.names.size() == 1) {
232-
return;
233-
}
234-
String name = sqlIdentifier.names.get(1);
235-
// avoid real field pv0 convert pv
236-
if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) {
237-
SqlOperator operator = new SqlAsOperator();
238-
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
239-
240-
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(name.substring(0, name.length() - 1), null, sqlParserPos);
241-
SqlNode[] sqlNodes = new SqlNode[2];
242-
sqlNodes[0] = sqlIdentifier;
243-
sqlNodes[1] = sqlIdentifierAlias;
244-
SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos);
245-
246-
selectList.set(i, sqlBasicCall);
247-
}
248-
}
249-
}
250-
break;
251-
}
252-
}
253-
254-
255199
public AliasInfo parseASNode(SqlNode sqlNode) throws SqlParseException {
256200
SqlKind sqlKind = sqlNode.getKind();
257201
if(sqlKind != AS){

0 commit comments

Comments
 (0)