3232import java .sql .DriverManager ;
3333import java .sql .PreparedStatement ;
3434import java .sql .SQLException ;
35+ import java .util .ArrayList ;
3536import java .util .Arrays ;
3637import java .util .List ;
3738import java .util .Map ;
@@ -66,9 +67,11 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6667 private String insertQuery ;
6768 public int [] typesArray ;
6869
70+ /** 存储用于批量写入的数据 */
71+ protected List <Row > rows = new ArrayList ();
72+
6973 private Connection dbConn ;
7074 private PreparedStatement upload ;
71- private AtomicInteger batchCount = new AtomicInteger (0 );
7275 private transient ScheduledThreadPoolExecutor timerService ;
7376
7477
@@ -107,7 +110,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
107110 throw new SQLException ("Table " + tableName + " doesn't exist" );
108111 }
109112
110- if (batchWaitInterval > 0 ) {
113+ if (batchWaitInterval > 0 && batchNum > 1 ) {
111114 LOG .info ("open batch wait interval scheduled, interval is {} ms" , batchWaitInterval );
112115
113116 timerService = new ScheduledThreadPoolExecutor (1 );
@@ -134,6 +137,7 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
134137 } else {
135138 dbConn = DriverManager .getConnection (dbURL , username , password );
136139 }
140+ dbConn .setAutoCommit (false );
137141 }
138142
139143 /**
@@ -150,35 +154,53 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
150154 * @see PreparedStatement
151155 */
152156 @ Override
153- public void writeRecord (Tuple2 tuple2 ) throws IOException {
157+ public void writeRecord (Tuple2 tuple2 ) {
154158
155159 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
156160 Boolean retract = tupleTrans .getField (0 );
157161 Row row = tupleTrans .getField (1 );
158162
159-
160163 if (typesArray != null && typesArray .length > 0 && typesArray .length != row .getArity ()) {
161164 LOG .warn ("Column SQL types array doesn't match arity of passed Row! Check the passed array..." );
162165 }
163- try {
164- if (retract ) {
165- insertWrite (row );
166- outRecords .inc ();
167- } else {
168- //do nothing
169- }
170- } catch (SQLException | IllegalArgumentException e ) {
171- throw new IllegalArgumentException ("writeRecord() failed" , e );
166+
167+ if (retract ) {
168+ insertWrite (row );
169+ outRecords .inc ();
170+ } else {
171+ //do nothing
172172 }
173173 }
174174
175175
176- private void insertWrite (Row row ) throws SQLException {
177- updatePreparedStmt (row , upload );
178- upload .addBatch ();
179- batchCount .incrementAndGet ();
180- if (batchCount .get () >= batchNum ) {
181- submitExecuteBatch ();
176+ private void insertWrite (Row row ) {
177+ checkConnectionOpen (dbConn );
178+
179+ if (batchNum == 1 ) {
180+ writeSingleRecord (row );
181+ } else {
182+ try {
183+ rows .add (row );
184+ updatePreparedStmt (row , upload );
185+ upload .addBatch ();
186+ } catch (SQLException e ) {
187+ LOG .error ("" , e );
188+ }
189+
190+ if (rows .size () >= batchNum ) {
191+ submitExecuteBatch ();
192+ }
193+ }
194+ }
195+
196+ private void writeSingleRecord (Row row ) {
197+ try {
198+ updatePreparedStmt (row , upload );
199+ upload .execute ();
200+ } catch (SQLException e ) {
201+ outDirtyRecords .inc ();
202+ LOG .error ("record insert failed .." , row .toString ());
203+ LOG .error ("" , e );
182204 }
183205 }
184206
@@ -276,9 +298,30 @@ private synchronized void submitExecuteBatch() {
276298 try {
277299 LOG .info ("submitExecuteBatch start......" );
278300 this .upload .executeBatch ();
279- this . batchCount . set ( 0 );
301+ dbConn . commit ( );
280302 } catch (SQLException e ) {
281- LOG .error ("" , e );
303+ try {
304+ dbConn .rollback ();
305+ } catch (SQLException e1 ) {
306+ LOG .error ("rollback data error !" , e );
307+ }
308+ rows .forEach (this ::writeSingleRecord );
309+ } finally {
310+ rows .clear ();
311+ }
312+ }
313+
314+ private void checkConnectionOpen (Connection dbConn ) {
315+ try {
316+ if (dbConn .isClosed ()) {
317+ LOG .info ("db connection reconnect.." );
318+ establishConnection ();
319+ upload = dbConn .prepareStatement (insertQuery );
320+ }
321+ } catch (SQLException e ) {
322+ LOG .error ("check connection open failed.." , e );
323+ } catch (ClassNotFoundException e ) {
324+ LOG .error ("load jdbc class error when reconnect db.." , e );
282325 }
283326 }
284327
@@ -302,7 +345,7 @@ public void close() throws IOException {
302345 LOG .info ("Inputformat couldn't be closed - " , se );
303346 } finally {
304347 upload = null ;
305- batchCount . set ( 0 );
348+ rows . clear ( );
306349 }
307350
308351 try {
0 commit comments