Skip to content

Commit e9a39a8

Browse files
committed
union parse
1 parent f4ee386 commit e9a39a8

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
110110
aliasInfo.setAlias(alias.toString());
111111

112112
return aliasInfo;
113+
114+
case UNION:
115+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
116+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
117+
118+
parseSql(unionLeft, sideTableSet, queueInfo);
119+
120+
parseSql(unionRight, sideTableSet, queueInfo);
121+
122+
break;
113123
}
114124

115125
return "";

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,17 @@ private void replaceFieldName(SqlNode sqlNode, HashBasedTable<String, String, St
232232
throw new RuntimeException("---not deal type:" + sqlNode);
233233
}
234234

235+
break;
236+
237+
case UNION:
238+
SqlNode unionLeft = ((SqlBasicCall) sqlNode).getOperands()[0];
239+
240+
SqlNode unionRight = ((SqlBasicCall) sqlNode).getOperands()[1];
241+
242+
replaceFieldName(unionLeft, mappingTable, targetTableName, tableAlias);
243+
244+
replaceFieldName(unionRight, mappingTable, targetTableName, tableAlias);
245+
235246
break;
236247
default:
237248
break;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ public void writeRecord(Tuple2 tuple2) {
174174

175175

176176
private void insertWrite(Row row) {
177-
System.out.println("接受到数据row:" +row );
178177
checkConnectionOpen(dbConn);
179178
try {
180179
if (batchInterval == 1) {
@@ -196,9 +195,8 @@ private void writeSingleRecord(Row row) {
196195
try {
197196
updatePreparedStmt(row, upload);
198197
upload.execute();
199-
System.out.println("单条插入成功:" + row);
198+
dbConn.commit();
200199
} catch (SQLException e) {
201-
System.out.println("单条插入失败:" + row);
202200
LOG.error("record insert failed ..", row.toString());
203201
LOG.error("", e);
204202
}
@@ -209,7 +207,6 @@ private synchronized void submitExecuteBatch() {
209207
LOG.info("submitExecuteBatch start......");
210208
this.upload.executeBatch();
211209
dbConn.commit();
212-
rows.forEach(row -> System.out.println("批量插入成功:"+ row));
213210
} catch (SQLException e) {
214211
try {
215212
dbConn.rollback();

0 commit comments

Comments
 (0)