Skip to content

Commit 5bf27e8

Browse files
committed
delay init scheduled
1 parent 13a8dd9 commit 5bf27e8

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
7777
private transient ScheduledThreadPoolExecutor timerService;
7878

7979
public RetractJDBCOutputFormat() {
80-
this.timerService = new ScheduledThreadPoolExecutor(1);
8180
}
8281

8382
@Override
@@ -96,6 +95,8 @@ public void open(int taskNumber, int numTasks) throws IOException {
9695
try {
9796
establishConnection();
9897
initMetric();
98+
99+
this.timerService = new ScheduledThreadPoolExecutor(1);
99100
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
100101
if (isReplaceInsertQuery()) {
101102
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
@@ -174,7 +175,7 @@ private void insertWrite(Row row) throws SQLException {
174175
upload.executeBatch();
175176
batchCount.set(0);
176177

177-
if (scheduledFuture != null) {
178+
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
178179
scheduledFuture.cancel(true);
179180
}
180181
}

0 commit comments

Comments
 (0)