1919package com .dtstack .flink .sql .sink .rdb .format ;
2020
2121import com .dtstack .flink .sql .sink .rdb .RdbSink ;
22+ import com .dtstack .flink .sql .util .JDBCUtils ;
2223import org .apache .commons .lang3 .StringUtils ;
2324import org .apache .flink .api .java .tuple .Tuple ;
2425import org .apache .flink .api .java .tuple .Tuple2 ;
3132import java .sql .*;
3233import java .util .*;
3334import java .io .IOException ;
35+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
36+ import java .util .concurrent .TimeUnit ;
3437
3538import com .dtstack .flink .sql .sink .MetricOutputFormat ;
3639
@@ -53,20 +56,27 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5356 private String tableName ;
5457 private String dbType ;
5558 private RdbSink dbSink ;
59+ // trigger preparedStatement execute batch interval
60+ private long batchWaitInterval = 10000l ;
61+
62+ // batchNum
5663 private int batchInterval = 5000 ;
5764 private String insertQuery ;
5865 public int [] typesArray ;
5966
6067 private Connection dbConn ;
6168 private PreparedStatement upload ;
6269
63- private int batchCount = 0 ;
70+ /** 存储用于批量写入的数据 */
71+ protected List <Row > rows = new ArrayList ();
6472
6573 //index field
6674 private Map <String , List <String >> realIndexes = Maps .newHashMap ();
6775 //full field
6876 private List <String > fullField = Lists .newArrayList ();
6977
78+ private transient ScheduledThreadPoolExecutor timerService ;
79+
7080 public RetractJDBCOutputFormat () {
7181
7282 }
@@ -85,7 +95,8 @@ public void configure(Configuration parameters) {
8595 @ Override
8696 public void open (int taskNumber , int numTasks ) throws IOException {
8797 try {
88- establishConnection ();
98+ LOG .info ("PreparedStatement execute batch num is {}" , batchInterval );
99+ dbConn = establishConnection ();
89100 initMetric ();
90101 if (dbConn .getMetaData ().getTables (null , null , tableName , null ).next ()) {
91102 if (isReplaceInsertQuery ()) {
@@ -96,6 +107,17 @@ public void open(int taskNumber, int numTasks) throws IOException {
96107 throw new SQLException ("Table " + tableName + " doesn't exist" );
97108 }
98109
110+ if (batchWaitInterval > 0 && batchInterval > 1 ) {
111+ LOG .info ("open batch wait interval scheduled, interval is {} ms" , batchWaitInterval );
112+
113+ timerService = new ScheduledThreadPoolExecutor (1 );
114+ timerService .scheduleAtFixedRate (() -> {
115+ submitExecuteBatch ();
116+ }, 0 , batchWaitInterval , TimeUnit .MILLISECONDS );
117+
118+ }
119+
120+
99121 } catch (SQLException sqe ) {
100122 throw new IllegalArgumentException ("open() failed." , sqe );
101123 } catch (ClassNotFoundException cnfe ) {
@@ -104,13 +126,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
104126 }
105127
106128
107- private void establishConnection () throws SQLException , ClassNotFoundException {
108- Class .forName (drivername );
129+ private Connection establishConnection () throws SQLException , ClassNotFoundException {
130+ Connection connection ;
131+ JDBCUtils .forName (drivername , getClass ().getClassLoader ());
109132 if (username == null ) {
110- dbConn = DriverManager .getConnection (dbURL );
133+ connection = DriverManager .getConnection (dbURL );
111134 } else {
112- dbConn = DriverManager .getConnection (dbURL , username , password );
135+ connection = DriverManager .getConnection (dbURL , username , password );
113136 }
137+ connection .setAutoCommit (false );
138+ return connection ;
114139 }
115140
116141 /**
@@ -127,7 +152,7 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
127152 * @see PreparedStatement
128153 */
129154 @ Override
130- public void writeRecord (Tuple2 tuple2 ) throws IOException {
155+ public void writeRecord (Tuple2 tuple2 ) {
131156
132157 Tuple2 <Boolean , Row > tupleTrans = tuple2 ;
133158 Boolean retract = tupleTrans .getField (0 );
@@ -137,27 +162,78 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
137162 if (typesArray != null && typesArray .length > 0 && typesArray .length != row .getArity ()) {
138163 LOG .warn ("Column SQL types array doesn't match arity of passed Row! Check the passed array..." );
139164 }
165+
166+ if (retract ) {
167+ insertWrite (row );
168+ outRecords .inc ();
169+ } else {
170+ //do nothing
171+ }
172+
173+ }
174+
175+
176+ private void insertWrite (Row row ) {
177+ System .out .println ("接受到数据row:" +row );
178+ checkConnectionOpen (dbConn );
140179 try {
141- if (retract ) {
142- insertWrite (row );
143- outRecords .inc ();
180+ if (batchInterval == 1 ) {
181+ writeSingleRecord (row );
144182 } else {
145- //do nothing
183+ updatePreparedStmt (row , upload );
184+ rows .add (row );
185+ upload .addBatch ();
186+ if (rows .size () >= batchInterval ) {
187+ submitExecuteBatch ();
188+ }
146189 }
147- } catch (SQLException | IllegalArgumentException e ) {
148- throw new IllegalArgumentException ( "writeRecord() failed " , e );
190+ } catch (SQLException e ) {
191+ LOG . error ( " " , e );
149192 }
150193 }
151194
195+ private void writeSingleRecord (Row row ) {
196+ try {
197+ updatePreparedStmt (row , upload );
198+ upload .execute ();
199+ System .out .println ("单条插入成功:" + row );
200+ } catch (SQLException e ) {
201+ System .out .println ("单条插入失败:" + row );
202+ LOG .error ("record insert failed .." , row .toString ());
203+ LOG .error ("" , e );
204+ }
205+ }
206+
207+ private synchronized void submitExecuteBatch () {
208+ try {
209+ LOG .info ("submitExecuteBatch start......" );
210+ this .upload .executeBatch ();
211+ dbConn .commit ();
212+ rows .forEach (row -> System .out .println ("批量插入成功:" + row ));
213+ } catch (SQLException e ) {
214+ try {
215+ dbConn .rollback ();
216+ } catch (SQLException e1 ) {
217+ LOG .error ("rollback data error !" , e );
218+ }
152219
153- private void insertWrite (Row row ) throws SQLException {
220+ rows .forEach (this ::writeSingleRecord );
221+ } finally {
222+ rows .clear ();
223+ }
224+ }
154225
155- updatePreparedStmt (row , upload );
156- upload .addBatch ();
157- batchCount ++;
158- if (batchCount >= batchInterval ) {
159- upload .executeBatch ();
160- batchCount = 0 ;
226+ private void checkConnectionOpen (Connection dbConn ) {
227+ try {
228+ if (dbConn .isClosed ()) {
229+ LOG .info ("db connection reconnect.." );
230+ dbConn = establishConnection ();
231+ upload = dbConn .prepareStatement (insertQuery );
232+ }
233+ } catch (SQLException e ) {
234+ LOG .error ("check connection open failed.." , e );
235+ } catch (ClassNotFoundException e ) {
236+ LOG .error ("load jdbc class error when reconnect db.." , e );
161237 }
162238 }
163239
@@ -266,7 +342,7 @@ public void close() throws IOException {
266342 LOG .info ("Inputformat couldn't be closed - " + se .getMessage ());
267343 } finally {
268344 upload = null ;
269- batchCount = 0 ;
345+
270346 }
271347
272348 try {
0 commit comments