Skip to content

Commit c149f0b

Browse files
committed
mysql log
1 parent 05bdaf0 commit c149f0b

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
9898
establishConnection();
9999
initMetric();
100100

101+
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
102+
if (isReplaceInsertQuery()) {
103+
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
104+
}
105+
upload = dbConn.prepareStatement(insertQuery);
106+
} else {
107+
throw new SQLException("Table " + tableName + " doesn't exist");
108+
}
109+
101110
if (batchWaitInterval > 0) {
102111
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
103112

@@ -108,15 +117,6 @@ public void open(int taskNumber, int numTasks) throws IOException {
108117

109118
}
110119

111-
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
112-
if (isReplaceInsertQuery()) {
113-
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
114-
}
115-
upload = dbConn.prepareStatement(insertQuery);
116-
} else {
117-
throw new SQLException("Table " + tableName + " doesn't exist");
118-
}
119-
120120
} catch (SQLException sqe) {
121121
LOG.error("", sqe);
122122
throw new IllegalArgumentException("open() failed.", sqe);

0 commit comments

Comments
 (0)