Skip to content

Commit ec4e207

Browse files
committed
batch insert
1 parent f7b5735 commit ec4e207

File tree

2 files changed

+112
-18
lines changed

2 files changed

+112
-18
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.dtstack.flink.sql.util;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.sql.DriverManager;
7+
8+
public class JDBCUtils {
9+
10+
private static final Logger LOG = LoggerFactory.getLogger(ClassUtil.class);
11+
12+
public final static String lock_str = "jdbc_lock_str";
13+
14+
public static void forName(String clazz, ClassLoader classLoader) {
15+
synchronized (lock_str){
16+
try {
17+
Class.forName(clazz, true, classLoader);
18+
DriverManager.setLoginTimeout(10);
19+
} catch (Exception e) {
20+
throw new RuntimeException(e);
21+
}
22+
}
23+
}
24+
25+
26+
public synchronized static void forName(String clazz) {
27+
try {
28+
Class<?> driverClass = Class.forName(clazz);
29+
driverClass.newInstance();
30+
} catch (Exception e) {
31+
throw new RuntimeException(e);
32+
}
33+
}
34+
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.sink.rdb.format;
2020

2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
22+
import com.dtstack.flink.sql.util.JDBCUtils;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.flink.api.java.tuple.Tuple;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,6 +32,8 @@
3132
import java.sql.*;
3233
import java.util.*;
3334
import java.io.IOException;
35+
import java.util.concurrent.ScheduledThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
3437

3538
import com.dtstack.flink.sql.sink.MetricOutputFormat;
3639

@@ -53,20 +56,27 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5356
private String tableName;
5457
private String dbType;
5558
private RdbSink dbSink;
59+
// trigger preparedStatement execute batch interval
60+
private long batchWaitInterval = 10000l;
61+
62+
// batchNum
5663
private int batchInterval = 5000;
5764
private String insertQuery;
5865
public int[] typesArray;
5966

6067
private Connection dbConn;
6168
private PreparedStatement upload;
6269

63-
private int batchCount = 0;
70+
/** 存储用于批量写入的数据 */
71+
protected List<Row> rows = new ArrayList();
6472

6573
//index field
6674
private Map<String, List<String>> realIndexes = Maps.newHashMap();
6775
//full field
6876
private List<String> fullField = Lists.newArrayList();
6977

78+
private transient ScheduledThreadPoolExecutor timerService;
79+
7080
public RetractJDBCOutputFormat() {
7181

7282
}
@@ -85,6 +95,7 @@ public void configure(Configuration parameters) {
8595
@Override
8696
public void open(int taskNumber, int numTasks) throws IOException {
8797
try {
98+
LOG.info("PreparedStatement execute batch num is {}", batchInterval);
8899
dbConn = establishConnection();
89100
initMetric();
90101
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
@@ -96,6 +107,17 @@ public void open(int taskNumber, int numTasks) throws IOException {
96107
throw new SQLException("Table " + tableName + " doesn't exist");
97108
}
98109

110+
if (batchWaitInterval > 0 && batchInterval > 1) {
111+
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
112+
113+
timerService = new ScheduledThreadPoolExecutor(1);
114+
timerService.scheduleAtFixedRate(() -> {
115+
submitExecuteBatch();
116+
}, 0, batchWaitInterval, TimeUnit.MILLISECONDS);
117+
118+
}
119+
120+
99121
} catch (SQLException sqe) {
100122
throw new IllegalArgumentException("open() failed.", sqe);
101123
} catch (ClassNotFoundException cnfe) {
@@ -106,12 +128,13 @@ public void open(int taskNumber, int numTasks) throws IOException {
106128

107129
private Connection establishConnection() throws SQLException, ClassNotFoundException {
108130
Connection connection ;
109-
Class.forName(drivername);
131+
JDBCUtils.forName(drivername, getClass().getClassLoader());
110132
if (username == null) {
111133
connection = DriverManager.getConnection(dbURL);
112134
} else {
113135
connection = DriverManager.getConnection(dbURL, username, password);
114136
}
137+
connection.setAutoCommit(false);
115138
return connection;
116139
}
117140

@@ -129,7 +152,7 @@ private Connection establishConnection() throws SQLException, ClassNotFoundExcep
129152
* @see PreparedStatement
130153
*/
131154
@Override
132-
public void writeRecord(Tuple2 tuple2) throws IOException {
155+
public void writeRecord(Tuple2 tuple2) {
133156

134157
Tuple2<Boolean, Row> tupleTrans = tuple2;
135158
Boolean retract = tupleTrans.getField(0);
@@ -139,27 +162,64 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
139162
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
140163
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
141164
}
165+
166+
if (retract) {
167+
insertWrite(row);
168+
outRecords.inc();
169+
} else {
170+
//do nothing
171+
}
172+
173+
}
174+
175+
176+
private void insertWrite(Row row) {
177+
System.out.println("接受到数据row:" +row );
178+
checkConnectionOpen(dbConn);
142179
try {
143-
if (retract) {
144-
insertWrite(row);
145-
outRecords.inc();
180+
if (batchInterval == 1) {
181+
writeSingleRecord(row);
146182
} else {
147-
//do nothing
183+
updatePreparedStmt(row, upload);
184+
rows.add(row);
185+
upload.addBatch();
186+
if (rows.size() >= batchInterval) {
187+
submitExecuteBatch();
188+
}
148189
}
149-
} catch (SQLException | IllegalArgumentException e) {
150-
throw new IllegalArgumentException("writeRecord() failed", e);
190+
} catch (SQLException e) {
191+
LOG.error("", e);
192+
}
193+
}
194+
195+
private void writeSingleRecord(Row row) {
196+
try {
197+
updatePreparedStmt(row, upload);
198+
upload.execute();
199+
System.out.println("单条插入成功:" + row);
200+
} catch (SQLException e) {
201+
System.out.println("单条插入失败:" + row);
202+
LOG.error("record insert failed ..", row.toString());
203+
LOG.error("", e);
151204
}
152205
}
153206

207+
private synchronized void submitExecuteBatch() {
208+
try {
209+
LOG.info("submitExecuteBatch start......");
210+
this.upload.executeBatch();
211+
dbConn.commit();
212+
rows.forEach(row -> System.out.println("批量插入成功:"+ row));
213+
} catch (SQLException e) {
214+
try {
215+
dbConn.rollback();
216+
} catch (SQLException e1) {
217+
LOG.error("rollback data error !", e);
218+
}
154219

155-
private void insertWrite(Row row) throws SQLException {
156-
checkConnectionOpen(dbConn);
157-
updatePreparedStmt(row, upload);
158-
upload.addBatch();
159-
batchCount++;
160-
if (batchCount >= batchInterval) {
161-
upload.executeBatch();
162-
batchCount = 0;
220+
rows.forEach(this::writeSingleRecord);
221+
} finally {
222+
rows.clear();
163223
}
164224
}
165225

@@ -282,7 +342,7 @@ public void close() throws IOException {
282342
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
283343
} finally {
284344
upload = null;
285-
batchCount = 0;
345+
286346
}
287347

288348
try {

0 commit comments

Comments
 (0)