Skip to content

Commit 17b8931

Browse files
committed
union parse
1 parent 36f838c commit 17b8931

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
110110
aliasInfo.setAlias(alias.toString());
111111

112112
return aliasInfo;
113+
114+
case UNION:
115+
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
116+
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
117+
118+
parseSql(unionLeft, sideTableSet, queueInfo);
119+
120+
parseSql(unionRight, sideTableSet, queueInfo);
121+
122+
break;
113123
}
114124

115125
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
306306
throw new RuntimeException("---not deal type:" + sqlNode);
307307
}
308308

309+
break;
310+
case UNION:
311+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
312+
313+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
314+
315+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
316+
317+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
318+
309319
break;
310320
default:
311321
break;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ private void insertWrite(Row row) {
197197
private void writeSingleRecord(Row row) {
198198
try {
199199
updatePreparedStmt(row, upload);
200-
upload.execute();
200+
upload.executeUpdate();
201+
dbConn.commit();
201202
} catch (SQLException e) {
202203
outDirtyRecords.inc();
203204
LOG.error("record insert failed ..", row.toString());

0 commit comments

Comments
 (0)