Skip to content

Commit fc035e7

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java
2 parents 51ff665 + e506733 commit fc035e7

File tree

3 files changed

+47
-6
lines changed

3 files changed

+47
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import org.apache.calcite.config.Lex;
24-
import org.apache.calcite.sql.*;
24+
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlInsert;
26+
import org.apache.calcite.sql.SqlJoin;
27+
import org.apache.calcite.sql.SqlKind;
28+
import org.apache.calcite.sql.SqlNode;
29+
import org.apache.calcite.sql.SqlOrderBy;
30+
import org.apache.calcite.sql.SqlSelect;
2531
import org.apache.calcite.sql.parser.SqlParseException;
2632
import org.apache.calcite.sql.parser.SqlParser;
2733
import org.apache.commons.lang3.StringUtils;
@@ -32,7 +38,8 @@
3238
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
3339

3440
/**
35-
* parser flink sql
41+
* 解析flink sql
42+
* sql 只支持 insert 开头的
3643
* Date: 2018/6/22
3744
* Company: www.dtstack.com
3845
* @author xuchao
@@ -112,10 +119,6 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
112119
sqlParseResult.addSourceTable(identifierNode.toString());
113120
}
114121
break;
115-
// case MATCH_RECOGNIZE:
116-
// SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
117-
// sqlParseResult.addSourceTable(node.getTableRef().toString());
118-
// break;
119122
case UNION:
120123
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
121124
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
@@ -130,6 +133,10 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){
130133
parseNode(unionRight, sqlParseResult);
131134
}
132135
break;
136+
case ORDER_BY:
137+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
138+
parseNode(sqlOrderBy.query, sqlParseResult);
139+
break;
133140
default:
134141
//do nothing
135142
break;

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.calcite.sql.SqlKind;
3131
import org.apache.calcite.sql.SqlNode;
3232
import org.apache.calcite.sql.SqlOperator;
33+
import org.apache.calcite.sql.SqlOrderBy;
3334
import org.apache.calcite.sql.SqlSelect;
3435
import org.apache.calcite.sql.parser.SqlParseException;
3536
import org.apache.calcite.sql.parser.SqlParser;
@@ -119,6 +120,10 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
119120
parseSql(unionRight, sideTableSet, queueInfo);
120121

121122
break;
123+
124+
case ORDER_BY:
125+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
126+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo);
122127
}
123128

124129
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.calcite.sql.SqlNode;
3838
import org.apache.calcite.sql.SqlNodeList;
3939
import org.apache.calcite.sql.SqlOperator;
40+
import org.apache.calcite.sql.SqlOrderBy;
4041
import org.apache.calcite.sql.SqlSelect;
4142
import org.apache.calcite.sql.fun.SqlCase;
4243
import org.apache.calcite.sql.parser.SqlParseException;
@@ -317,11 +318,39 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
317318
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
318319

319320
break;
321+
case ORDER_BY:
322+
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
323+
replaceFieldName(sqlOrderBy.query, mappingTable, targetTableName, tableAlias);
324+
SqlNodeList orderFiledList = sqlOrderBy.orderList;
325+
for (int i=0 ;i<orderFiledList.size();i++) {
326+
SqlNode replaceNode = replaceOrderByTableName(orderFiledList.get(i), tableAlias);
327+
orderFiledList.set(i, replaceNode);
328+
}
329+
320330
default:
321331
break;
322332
}
323333
}
324334

335+
private SqlNode replaceOrderByTableName(SqlNode orderNode, String tableAlias) {
336+
if(orderNode.getKind() == IDENTIFIER){
337+
SqlIdentifier sqlIdentifier = (SqlIdentifier) orderNode;
338+
if (sqlIdentifier.names.size() == 1) {
339+
return orderNode;
340+
}
341+
return sqlIdentifier.setName(0, tableAlias);
342+
} else if (orderNode instanceof SqlBasicCall) {
343+
SqlBasicCall sqlBasicCall = (SqlBasicCall) orderNode;
344+
for(int i=0; i<sqlBasicCall.getOperandList().size(); i++){
345+
SqlNode sqlNode = sqlBasicCall.getOperandList().get(i);
346+
sqlBasicCall.getOperands()[i] = replaceOrderByTableName(sqlNode , tableAlias);
347+
}
348+
return sqlBasicCall;
349+
} else {
350+
return orderNode;
351+
}
352+
}
353+
325354
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
326355
if(groupNode.getKind() == IDENTIFIER){
327356
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;

0 commit comments

Comments
 (0)