Skip to content

Commit 7d2c65e

Browse files
committed
Merge branch 'v1.5.0_dev_union' into 'v1.5.0_dev'
V1.5.0 dev union See merge request !60
2 parents 0e10e60 + 17b8931 commit 7d2c65e

File tree

3 files changed

+32
-17
lines changed

3 files changed

+32
-17
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
110110
aliasInfo.setAlias(alias.toString());
111111

112112
return aliasInfo;
113+
114+
case UNION:
115+
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
116+
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
117+
118+
parseSql(unionLeft, sideTableSet, queueInfo);
119+
120+
parseSql(unionRight, sideTableSet, queueInfo);
121+
122+
break;
113123
}
114124

115125
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
306306
throw new RuntimeException("---not deal type:" + sqlNode);
307307
}
308308

309+
break;
310+
case UNION:
311+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
312+
313+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
314+
315+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
316+
317+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
318+
309319
break;
310320
default:
311321
break;
@@ -439,39 +449,33 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
439449
return sqlIdentifier;
440450
}else if(selectNode.getKind() == LITERAL || selectNode.getKind() == LITERAL_CHAIN){//字面含义
441451
return selectNode;
442-
}else if(selectNode.getKind() == OTHER_FUNCTION
452+
}else if( AGGREGATE.contains(selectNode.getKind())
453+
|| AVG_AGG_FUNCTIONS.contains(selectNode.getKind())
454+
|| COMPARISON.contains(selectNode.getKind())
455+
|| selectNode.getKind() == OTHER_FUNCTION
443456
|| selectNode.getKind() == DIVIDE
444457
|| selectNode.getKind() == CAST
445-
|| selectNode.getKind() == SUM
446-
|| selectNode.getKind() == AVG
447-
|| selectNode.getKind() == MAX
448-
|| selectNode.getKind() == MIN
449458
|| selectNode.getKind() == TRIM
450459
|| selectNode.getKind() == TIMES
451460
|| selectNode.getKind() == PLUS
452-
|| selectNode.getKind() == IN
461+
|| selectNode.getKind() == NOT_IN
453462
|| selectNode.getKind() == OR
454463
|| selectNode.getKind() == AND
455-
|| selectNode.getKind() == COUNT
456-
|| selectNode.getKind() == SUM0
457-
|| selectNode.getKind() == LEAD
458-
|| selectNode.getKind() == LAG
459-
|| selectNode.getKind() == EQUALS
460-
|| selectNode.getKind() == NOT_EQUALS
461464
|| selectNode.getKind() == MINUS
462465
|| selectNode.getKind() == TUMBLE
463466
|| selectNode.getKind() == TUMBLE_START
464467
|| selectNode.getKind() == TUMBLE_END
465468
|| selectNode.getKind() == SESSION
466469
|| selectNode.getKind() == SESSION_START
467470
|| selectNode.getKind() == SESSION_END
471+
|| selectNode.getKind() == HOP
472+
|| selectNode.getKind() == HOP_START
473+
|| selectNode.getKind() == HOP_END
468474
|| selectNode.getKind() == BETWEEN
469475
|| selectNode.getKind() == IS_NULL
470476
|| selectNode.getKind() == IS_NOT_NULL
471-
|| selectNode.getKind() == LESS_THAN
472-
|| selectNode.getKind() == GREATER_THAN
473-
|| selectNode.getKind() == LESS_THAN_OR_EQUAL
474-
|| selectNode.getKind() == GREATER_THAN_OR_EQUAL
477+
|| selectNode.getKind() == CONTAINS
478+
475479
){
476480
SqlBasicCall sqlBasicCall = (SqlBasicCall) selectNode;
477481
for(int i=0; i<sqlBasicCall.getOperands().length; i++){

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ private void insertWrite(Row row) {
197197
private void writeSingleRecord(Row row) {
198198
try {
199199
updatePreparedStmt(row, upload);
200-
upload.execute();
200+
upload.executeUpdate();
201+
dbConn.commit();
201202
} catch (SQLException e) {
202203
outDirtyRecords.inc();
203204
LOG.error("record insert failed ..", row.toString());

0 commit comments

Comments
 (0)