Skip to content

Commit 96ac939

Browse files
committed
Merge remote-tracking branch 'origin/1.5_v3.8.0_beta_1.0' into 1.5_v3.8.0
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java
2 parents 2f3b0c3 + 98aa150 commit 96ac939

File tree

6 files changed

+170
-28
lines changed

6 files changed

+170
-28
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25-
import com.dtstack.flink.sql.parser.*;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
26+
import com.dtstack.flink.sql.parser.CreateFuncParser;
27+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
28+
import com.dtstack.flink.sql.parser.InsertSqlParser;
29+
import com.dtstack.flink.sql.parser.SqlParser;
30+
import com.dtstack.flink.sql.parser.SqlTree;
2631
import com.dtstack.flink.sql.side.SideSqlExec;
2732
import com.dtstack.flink.sql.side.SideTableInfo;
2833
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -204,7 +209,7 @@ public static void main(String[] args) throws Exception {
204209
//sql-dimensional table contains the dimension table of execution
205210
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
206211
}else{
207-
tableEnv.sqlUpdate(result.getExecSql());
212+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
208213
if(LOG.isInfoEnabled()){
209214
LOG.info("exec sql: " + result.getExecSql());
210215
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.apache.calcite.sql.SqlIdentifier;
4+
import org.apache.calcite.sql.SqlInsert;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.flink.table.api.Table;
7+
import org.apache.flink.table.api.TableEnvironment;
8+
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.ValidationException;
10+
import org.apache.flink.table.api.java.StreamTableEnvironment;
11+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
12+
import org.apache.flink.table.plan.logical.LogicalRelNode;
13+
import org.apache.flink.table.plan.schema.TableSinkTable;
14+
15+
import java.lang.reflect.Method;
16+
17+
/**
18+
* @description: mapping by name when insert into sink table
19+
* @author: maqi
20+
* @create: 2019/08/15 11:09
21+
*/
22+
public class FlinkSQLExec {
23+
24+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
25+
26+
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
27+
SqlNode insert = planner.parse(stmt);
28+
29+
if (!(insert instanceof SqlInsert)) {
30+
throw new TableException(
31+
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
32+
}
33+
SqlNode query = ((SqlInsert) insert).getSource();
34+
35+
SqlNode validatedQuery = planner.validate(query);
36+
37+
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
38+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
39+
40+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41+
method.setAccessible(true);
42+
43+
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44+
String[] fieldNames = targetTable.tableSink().getFieldNames();
45+
46+
Table newTable = null;
47+
48+
try {
49+
newTable = queryResult.select(String.join(",", fieldNames));
50+
} catch (Exception e) {
51+
throw new ValidationException(
52+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
53+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
54+
"TableSink schema: " + String.join(",", fieldNames));
55+
}
56+
57+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
58+
}
59+
}

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8989
for( int i=0; i<outFieldInfoList.size(); i++){
9090
FieldInfo fieldInfo = outFieldInfoList.get(i);
9191
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
92-
fields.add(fieldInfo.getFieldName());
92+
fields.add(sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()));
9393
sideFieldIndex.put(i, sideIndex);
9494
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
9595
sideIndex++;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.enums.ECacheType;
24+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
2425
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2526
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2627
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2728
import com.dtstack.flink.sql.util.ClassUtil;
2829
import com.dtstack.flink.sql.util.ParseUtils;
30+
import org.apache.calcite.sql.SqlAsOperator;
2931
import org.apache.calcite.sql.SqlBasicCall;
3032
import org.apache.calcite.sql.SqlDataTypeSpec;
3133
import org.apache.calcite.sql.SqlIdentifier;
@@ -35,7 +37,7 @@
3537
import org.apache.calcite.sql.SqlLiteral;
3638
import org.apache.calcite.sql.SqlNode;
3739
import org.apache.calcite.sql.SqlNodeList;
38-
import org.apache.calcite.sql.SqlOrderBy;
40+
import org.apache.calcite.sql.SqlOperator;
3941
import org.apache.calcite.sql.SqlSelect;
4042
import org.apache.calcite.sql.fun.SqlCase;
4143
import org.apache.calcite.sql.parser.SqlParseException;
@@ -100,13 +102,16 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
100102

101103
if(preIsSideJoin){
102104
preIsSideJoin = false;
105+
List<String> fieldNames = null;
103106
for(FieldReplaceInfo replaceInfo : replaceInfoList){
107+
fieldNames = Lists.newArrayList();
104108
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
109+
addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
105110
}
106111
}
107112

108113
if(pollSqlNode.getKind() == INSERT){
109-
tableEnv.sqlUpdate(pollSqlNode.toString());
114+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
110115
if(LOG.isInfoEnabled()){
111116
LOG.info("exec sql: " + pollSqlNode.toString());
112117
}
@@ -125,6 +130,68 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
125130

126131
}
127132

133+
134+
private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
135+
SqlKind sqlKind = pollSqlNode.getKind();
136+
switch (sqlKind) {
137+
case INSERT:
138+
SqlNode source = ((SqlInsert) pollSqlNode).getSource();
139+
addAliasForFieldNode(source, fieldList, mappingTable);
140+
break;
141+
142+
case AS:
143+
addAliasForFieldNode(((SqlBasicCall) pollSqlNode).getOperands()[0], fieldList, mappingTable);
144+
break;
145+
146+
case SELECT:
147+
148+
SqlNodeList selectList = ((SqlSelect) pollSqlNode).getSelectList();
149+
150+
selectList.getList().forEach(node -> {
151+
if (node.getKind() == IDENTIFIER) {
152+
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
153+
if (sqlIdentifier.names.size() == 1) {
154+
return;
155+
}
156+
// save real field
157+
String fieldName = sqlIdentifier.names.get(1);
158+
if (!fieldName.endsWith("0") || fieldName.endsWith("0") && mappingTable.columnMap().containsKey(fieldName)) {
159+
fieldList.add(fieldName);
160+
}
161+
162+
}
163+
});
164+
165+
for (int i = 0; i < selectList.getList().size(); i++) {
166+
SqlNode node = selectList.get(i);
167+
if (node.getKind() == IDENTIFIER) {
168+
SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
169+
if (sqlIdentifier.names.size() == 1) {
170+
return;
171+
}
172+
173+
String name = sqlIdentifier.names.get(1);
174+
// avoid real field pv0 convert pv
175+
if (name.endsWith("0") && !fieldList.contains(name) && !fieldList.contains(name.substring(0, name.length() - 1))) {
176+
SqlOperator operator = new SqlAsOperator();
177+
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
178+
179+
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(name.substring(0, name.length() - 1), null, sqlParserPos);
180+
SqlNode[] sqlNodes = new SqlNode[2];
181+
sqlNodes[0] = sqlIdentifier;
182+
sqlNodes[1] = sqlIdentifierAlias;
183+
SqlBasicCall sqlBasicCall = new SqlBasicCall(operator, sqlNodes, sqlParserPos);
184+
185+
selectList.set(i, sqlBasicCall);
186+
}
187+
188+
}
189+
}
190+
break;
191+
}
192+
}
193+
194+
128195
public AliasInfo parseASNode(SqlNode sqlNode) throws SqlParseException {
129196
SqlKind sqlKind = sqlNode.getKind();
130197
if(sqlKind != AS){
@@ -305,7 +372,7 @@ private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String
305372
}
306373
}
307374

308-
public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName){
375+
public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName) {
309376

310377
SqlKind sqlKind = sqlNode.getKind();
311378
switch (sqlKind){
@@ -342,7 +409,7 @@ public SqlNode filterNodeWithTargetName(SqlNode sqlNode, String targetTableName)
342409
}
343410

344411

345-
public void setLocalSqlPluginPath(String localSqlPluginPath){
412+
public void setLocalSqlPluginPath(String localSqlPluginPath) {
346413
this.localSqlPluginPath = localSqlPluginPath;
347414
}
348415

@@ -386,12 +453,12 @@ private List<SqlNode> replaceSelectStarFieldName(SqlNode selectNode, HashBasedTa
386453
}
387454
}
388455

389-
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
390-
if(selectNode.getKind() == AS){
391-
SqlNode leftNode = ((SqlBasicCall)selectNode).getOperands()[0];
456+
private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String, String, String> mappingTable, String tableAlias) {
457+
if (selectNode.getKind() == AS) {
458+
SqlNode leftNode = ((SqlBasicCall) selectNode).getOperands()[0];
392459
SqlNode replaceNode = replaceSelectFieldName(leftNode, mappingTable, tableAlias);
393-
if(replaceNode != null){
394-
((SqlBasicCall)selectNode).getOperands()[0] = replaceNode;
460+
if (replaceNode != null) {
461+
((SqlBasicCall) selectNode).getOperands()[0] = replaceNode;
395462
}
396463

397464
return selectNode;
@@ -470,15 +537,15 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
470537
for(int i=0; i<whenOperands.size(); i++){
471538
SqlNode oneOperand = whenOperands.get(i);
472539
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
473-
if(replaceNode != null){
540+
if (replaceNode != null) {
474541
whenOperands.set(i, replaceNode);
475542
}
476543
}
477544

478545
for(int i=0; i<thenOperands.size(); i++){
479546
SqlNode oneOperand = thenOperands.get(i);
480547
SqlNode replaceNode = replaceSelectFieldName(oneOperand, mappingTable, tableAlias);
481-
if(replaceNode != null){
548+
if (replaceNode != null) {
482549
thenOperands.set(i, replaceNode);
483550
}
484551
}
@@ -495,19 +562,27 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
495562

496563
/**
497564
* Analyzing conditions are very join the dimension tables include all equivalent conditions (i.e., dimension table is the primary key definition
565+
*
498566
* @return
499567
*/
500-
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, List<String> primaryKeys){
501-
502-
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias);
503-
if(CollectionUtils.isEqualCollection(conditionFields, primaryKeys)){
568+
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, SideTableInfo sideTableInfo) {
569+
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
570+
if(CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))){
504571
return true;
505572
}
506573

507574
return false;
508575
}
509576

510-
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName){
577+
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo) {
578+
List<String> res = Lists.newArrayList();
579+
sideTableInfo.getPrimaryKeys().forEach(field -> {
580+
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
581+
});
582+
return res;
583+
}
584+
585+
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName, SideTableInfo sideTableInfo){
511586
List<SqlNode> sqlNodeList = Lists.newArrayList();
512587
ParseUtils.parseAnd(conditionNode, sqlNodeList);
513588
List<String> conditionFields = Lists.newArrayList();
@@ -530,7 +605,7 @@ public List<String> getConditionFields(SqlNode conditionNode, String specifyTabl
530605
}else{
531606
throw new RuntimeException(String.format("side table:%s join condition is wrong", specifyTableName));
532607
}
533-
608+
tableCol = sideTableInfo.getPhysicalFields().getOrDefault(tableCol, tableCol);
534609
conditionFields.add(tableCol);
535610
}
536611

@@ -561,8 +636,11 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
561636

562637
if(preIsSideJoin){
563638
preIsSideJoin = false;
564-
for(FieldReplaceInfo replaceInfo : replaceInfoList){
639+
List<String> fieldNames = null;
640+
for (FieldReplaceInfo replaceInfo : replaceInfoList) {
641+
fieldNames = Lists.newArrayList();
565642
replaceFieldName(pollSqlNode, replaceInfo.getMappingTable(), replaceInfo.getTargetTableName(), replaceInfo.getTargetTableAlias());
643+
addAliasForFieldNode(pollSqlNode, fieldNames, replaceInfo.getMappingTable());
566644
}
567645
}
568646

@@ -598,6 +676,7 @@ public void registerTmpTable(CreateTmpTableParser.SqlParserResult result,
598676
}
599677
}
600678
}
679+
601680
private void joinFun(Object pollObj, Map<String, Table> localTableCache,
602681
Map<String, SideTableInfo> sideTableMap, StreamTableEnvironment tableEnv,
603682
List<FieldReplaceInfo> replaceInfoList) throws Exception{
@@ -624,7 +703,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
624703
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
625704
}
626705

627-
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo.getPrimaryKeys())){
706+
if(!checkJoinCondition(joinInfo.getCondition(), joinInfo.getRightTableAlias(), sideTableInfo)){
628707
throw new RuntimeException("ON condition must contain all equal fields!!!");
629708
}
630709

@@ -650,7 +729,7 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
650729

651730
//join side table before keyby ===> Reducing the size of each dimension table cache of async
652731
if(sideTableInfo.isPartitionedJoin()){
653-
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());
732+
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias(), sideTableInfo);
654733
String[] leftJoinColArr = new String[leftJoinColList.size()];
655734
leftJoinColArr = leftJoinColList.toArray(leftJoinColArr);
656735
adaptStream = adaptStream.keyBy(leftJoinColArr);
@@ -681,12 +760,11 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
681760
}
682761
}
683762

684-
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table){
763+
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
685764
List<String> fieldNames = new LinkedList<>();
686765
String fieldsInfo = result.getFieldsInfoStr();
687766
String[] fields = fieldsInfo.split(",");
688-
for (int i=0; i < fields.length; i++)
689-
{
767+
for (int i = 0; i < fields.length; i++) {
690768
String[] filed = fields[i].split("\\s");
691769
if (filed.length < 2 || fields.length != table.getSchema().getColumnNames().length){
692770
return false;

oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5656

5757
sqlCondition = "select ${selectField} from ${tableName} where ";
5858
for (int i = 0; i < equalFieldList.size(); i++) {
59-
String equalField = equalFieldList.get(i);
59+
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
6060

6161
sqlCondition += dealLowerFiled(equalField) + "=? ";
6262
if (i != equalFieldList.size() - 1) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
6666

6767
sqlCondition = "select ${selectField} from ${tableName} where ";
6868
for (int i = 0; i < equalFieldList.size(); i++) {
69-
String equalField = equalFieldList.get(i);
69+
String equalField = sideTableInfo.getPhysicalFields().getOrDefault(equalFieldList.get(i), equalFieldList.get(i));
7070

7171
sqlCondition += equalField + "=? ";
7272
if (i != equalFieldList.size() - 1) {

0 commit comments

Comments
 (0)