Skip to content

Commit add0cab

Browse files
committed
Merge branch 'v1.5.0_dev_mysql_log' into 'v1.5.0_dev'
V1.5.0 dev mysql log See merge request !51
2 parents eacbf12 + c149f0b commit add0cab

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.atomic.AtomicInteger;
4141

4242
import com.dtstack.flink.sql.sink.MetricOutputFormat;
43+
import sun.rmi.runtime.Log;
4344

4445
/**
4546
* OutputFormat to write tuples into a database.
@@ -97,6 +98,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
9798
establishConnection();
9899
initMetric();
99100

101+
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
102+
if (isReplaceInsertQuery()) {
103+
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
104+
}
105+
upload = dbConn.prepareStatement(insertQuery);
106+
} else {
107+
throw new SQLException("Table " + tableName + " doesn't exist");
108+
}
109+
100110
if (batchWaitInterval > 0) {
101111
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
102112

@@ -107,18 +117,11 @@ public void open(int taskNumber, int numTasks) throws IOException {
107117

108118
}
109119

110-
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
111-
if (isReplaceInsertQuery()) {
112-
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
113-
}
114-
upload = dbConn.prepareStatement(insertQuery);
115-
} else {
116-
throw new SQLException("Table " + tableName + " doesn't exist");
117-
}
118-
119120
} catch (SQLException sqe) {
121+
LOG.error("", sqe);
120122
throw new IllegalArgumentException("open() failed.", sqe);
121123
} catch (ClassNotFoundException cnfe) {
124+
LOG.error("", cnfe);
122125
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
123126
}
124127
}
@@ -271,6 +274,7 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
271274

272275
private synchronized void submitExecuteBatch() {
273276
try {
277+
LOG.info("submitExecuteBatch start......");
274278
this.upload.executeBatch();
275279
this.batchCount.set(0);
276280
} catch (SQLException e) {

0 commit comments

Comments
 (0)