3535import java .util .Arrays ;
3636import java .util .List ;
3737import java .util .Map ;
38- import java .util .concurrent .ScheduledFuture ;
3938import java .util .concurrent .ScheduledThreadPoolExecutor ;
4039import java .util .concurrent .TimeUnit ;
4140import java .util .concurrent .atomic .AtomicInteger ;
@@ -59,7 +58,7 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5958 private String tableName ;
6059 private String dbType ;
6160 private RdbSink dbSink ;
62- private long batchWaitInterval ;
61+ private long batchWaitInterval = 10000l ;
6362 private int batchNum ;
6463 private String insertQuery ;
6564 public int [] typesArray ;
@@ -69,13 +68,13 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6968
7069 private AtomicInteger batchCount = new AtomicInteger (0 );
7170
71+ private transient ScheduledThreadPoolExecutor timerService ;
72+
7273 //index field
7374 private Map <String , List <String >> realIndexes = Maps .newHashMap ();
7475 //full field
7576 private List <String > fullField = Lists .newArrayList ();
7677
77- private transient ScheduledThreadPoolExecutor timerService ;
78-
7978 public RetractJDBCOutputFormat () {
8079 }
8180
@@ -96,7 +95,14 @@ public void open(int taskNumber, int numTasks) throws IOException {
9695 establishConnection ();
9796 initMetric ();
9897
99- this .timerService = new ScheduledThreadPoolExecutor (1 );
98+ if (batchWaitInterval > 0 ) {
99+ timerService = new ScheduledThreadPoolExecutor (1 );
100+ timerService .scheduleAtFixedRate (() -> {
101+ submitExecuteBatch ();
102+ }, 0 , batchWaitInterval , TimeUnit .MILLISECONDS );
103+
104+ }
105+
100106 if (dbConn .getMetaData ().getTables (null , null , tableName , null ).next ()) {
101107 if (isReplaceInsertQuery ()) {
102108 insertQuery = dbSink .buildUpdateSql (tableName , Arrays .asList (dbSink .getFieldNames ()), realIndexes , fullField );
@@ -163,21 +169,9 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
163169 private void insertWrite (Row row ) throws SQLException {
164170 updatePreparedStmt (row , upload );
165171 upload .addBatch ();
166- ScheduledFuture <?> scheduledFuture = null ;
167-
168- if (batchWaitInterval > 0 && batchCount .get () == 0 ) {
169- scheduledFuture = registerTimer (batchWaitInterval , this );
170- }
171-
172172 batchCount .incrementAndGet ();
173-
174173 if (batchCount .get () >= batchNum ) {
175- upload .executeBatch ();
176- batchCount .set (0 );
177-
178- if (scheduledFuture != null && !scheduledFuture .isCancelled ()) {
179- scheduledFuture .cancel (true );
180- }
174+ submitExecuteBatch ();
181175 }
182176 }
183177
@@ -270,25 +264,13 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
270264 }
271265 }
272266
273- public ScheduledFuture <?> registerTimer (long delay , RetractJDBCOutputFormat outputFormat ) {
274- return timerService .schedule (new DelayExecuteBatch (outputFormat ), delay , TimeUnit .MILLISECONDS );
275- }
276-
277- private final static class DelayExecuteBatch implements Runnable {
278- RetractJDBCOutputFormat outputFormat ;
279267
280- private DelayExecuteBatch (RetractJDBCOutputFormat outputFormat ) {
281- this .outputFormat = outputFormat ;
282- }
283-
284- @ Override
285- public void run () {
286- try {
287- outputFormat .upload .executeBatch ();
288- outputFormat .batchCount .set (0 );
289- } catch (SQLException e ) {
290- LOG .error ("delay batch insert error..." , e );
291- }
268+ private synchronized void submitExecuteBatch () {
269+ try {
270+ this .upload .executeBatch ();
271+ this .batchCount .set (0 );
272+ } catch (SQLException e ) {
273+ LOG .error ("" , e );
292274 }
293275 }
294276
@@ -304,6 +286,9 @@ public void close() throws IOException {
304286 upload .executeBatch ();
305287 upload .close ();
306288 }
289+ if (null != timerService ) {
290+ timerService .shutdown ();
291+ }
307292 } catch (SQLException se ) {
308293 LOG .info ("Inputformat couldn't be closed - " + se .getMessage ());
309294 } finally {
0 commit comments