@@ -98,7 +98,7 @@ public void configure(Configuration parameters) {
9898 public void open (int taskNumber , int numTasks ) throws IOException {
9999 try {
100100 LOG .info ("PreparedStatement execute batch num is {}" , batchNum );
101- establishConnection ();
101+ dbConn = establishConnection ();
102102 initMetric ();
103103
104104 if (dbConn .getMetaData ().getTables (null , null , tableName , null ).next ()) {
@@ -130,14 +130,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
130130 }
131131
132132
133- private void establishConnection () throws SQLException , ClassNotFoundException {
133+ private Connection establishConnection () throws SQLException , ClassNotFoundException {
134+ Connection connection ;
134135 Class .forName (drivername );
135136 if (username == null ) {
136- dbConn = DriverManager .getConnection (dbURL );
137+ connection = DriverManager .getConnection (dbURL );
137138 } else {
138- dbConn = DriverManager .getConnection (dbURL , username , password );
139+ connection = DriverManager .getConnection (dbURL , username , password );
139140 }
140- dbConn .setAutoCommit (false );
141+ connection .setAutoCommit (false );
142+ return connection ;
141143 }
142144
143145 /**
@@ -175,22 +177,21 @@ public void writeRecord(Tuple2 tuple2) {
175177
176178 private void insertWrite (Row row ) {
177179 checkConnectionOpen (dbConn );
178-
179- if (batchNum == 1 ) {
180- writeSingleRecord (row );
181- } else {
182- try {
183- rows .add (row );
180+ try {
181+ if (batchNum == 1 ) {
182+ writeSingleRecord (row );
183+ } else {
184184 updatePreparedStmt (row , upload );
185+ rows .add (row );
185186 upload .addBatch ();
186- } catch (SQLException e ) {
187- LOG .error ("" , e );
188- }
189-
190- if (rows .size () >= batchNum ) {
191- submitExecuteBatch ();
187+ if (rows .size () >= batchNum ) {
188+ submitExecuteBatch ();
189+ }
192190 }
191+ } catch (SQLException e ) {
192+ LOG .error ("" , e );
193193 }
194+
194195 }
195196
196197 private void writeSingleRecord (Row row ) {
@@ -305,6 +306,7 @@ private synchronized void submitExecuteBatch() {
305306 } catch (SQLException e1 ) {
306307 LOG .error ("rollback data error !" , e );
307308 }
309+
308310 rows .forEach (this ::writeSingleRecord );
309311 } finally {
310312 rows .clear ();
@@ -315,7 +317,7 @@ private void checkConnectionOpen(Connection dbConn) {
315317 try {
316318 if (dbConn .isClosed ()) {
317319 LOG .info ("db connection reconnect.." );
318- establishConnection ();
320+ dbConn = establishConnection ();
319321 upload = dbConn .prepareStatement (insertQuery );
320322 }
321323 } catch (SQLException e ) {
0 commit comments