Skip to content

Commit b3fd289

Browse files
committed
Merge branch 'v1.5.0_dev_sink_batch_num' into 'v1.5.0_dev'
rdb sink batch and interval rdb写入时支持:batchSize 和 batchWaitInterval 参数设置。 See merge request !42
2 parents a7d94df + f582684 commit b3fd289

File tree

4 files changed

+87
-32
lines changed

4 files changed

+87
-32
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
6161

6262
protected String dbType;
6363

64-
protected int batchInterval = 1;
64+
protected int batchNum = 1;
65+
66+
protected long batchWaitInterval = 10000;
6567

6668
protected int[] sqlTypes;
6769

@@ -89,7 +91,8 @@ public RichSinkFunction createJdbcSinkFunc() {
8991
outputFormat.setUsername(userName);
9092
outputFormat.setPassword(password);
9193
outputFormat.setInsertQuery(sql);
92-
outputFormat.setBatchInterval(batchInterval);
94+
outputFormat.setBatchNum(batchNum);
95+
outputFormat.setBatchWaitInterval(batchWaitInterval);
9396
outputFormat.setTypesArray(sqlTypes);
9497
outputFormat.setTableName(tableName);
9598
outputFormat.setDbType(dbType);
@@ -112,7 +115,12 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
112115

113116
Integer tmpSqlBatchSize = rdbTableInfo.getBatchSize();
114117
if (tmpSqlBatchSize != null) {
115-
setBatchInterval(tmpSqlBatchSize);
118+
setBatchNum(tmpSqlBatchSize);
119+
}
120+
121+
Long batchWaitInterval = rdbTableInfo.getBatchWaitInterval();
122+
if (batchWaitInterval != null) {
123+
setBatchWaitInterval(batchWaitInterval);
116124
}
117125

118126
Integer tmpSinkParallelism = rdbTableInfo.getParallelism();
@@ -198,13 +206,13 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
198206
return this;
199207
}
200208

201-
/**
202-
* Set the default frequency submit updated every submission
203-
*
204-
* @param batchInterval
205-
*/
206-
public void setBatchInterval(int batchInterval) {
207-
this.batchInterval = batchInterval;
209+
210+
public void setBatchNum(int batchNum) {
211+
this.batchNum = batchNum;
212+
}
213+
214+
public void setBatchWaitInterval(long batchWaitInterval) {
215+
this.batchWaitInterval = batchWaitInterval;
208216
}
209217

210218
@Override

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,31 @@
2020

2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2222
import org.apache.commons.lang3.StringUtils;
23-
import org.apache.flink.api.java.tuple.Tuple;
2423
import org.apache.flink.api.java.tuple.Tuple2;
2524
import org.apache.flink.configuration.Configuration;
2625
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
2726
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2827
import org.apache.flink.types.Row;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
31-
import java.sql.*;
32-
import java.util.*;
3330
import java.io.IOException;
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.sql.PreparedStatement;
34+
import java.sql.SQLException;
35+
import java.util.Arrays;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.concurrent.ScheduledThreadPoolExecutor;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicInteger;
3441

3542
import com.dtstack.flink.sql.sink.MetricOutputFormat;
3643

3744
/**
3845
* OutputFormat to write tuples into a database.
3946
* The OutputFormat has to be configured using the supplied OutputFormatBuilder.
4047
*
41-
* @see Tuple
42-
* @see DriverManager
4348
*/
4449
public class RetractJDBCOutputFormat extends MetricOutputFormat {
4550
private static final long serialVersionUID = 1L;
@@ -53,23 +58,25 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5358
private String tableName;
5459
private String dbType;
5560
private RdbSink dbSink;
56-
private int batchInterval = 5000;
61+
// trigger preparedStatement execute batch interval
62+
private long batchWaitInterval = 10000l;
63+
// PreparedStatement execute batch num
64+
private int batchNum = 1;
5765
private String insertQuery;
5866
public int[] typesArray;
5967

6068
private Connection dbConn;
6169
private PreparedStatement upload;
70+
private AtomicInteger batchCount = new AtomicInteger(0);
71+
private transient ScheduledThreadPoolExecutor timerService;
6272

63-
private int batchCount = 0;
64-
private static int rowLenth = 1000;
6573

6674
//index field
6775
private Map<String, List<String>> realIndexes = Maps.newHashMap();
6876
//full field
6977
private List<String> fullField = Lists.newArrayList();
7078

7179
public RetractJDBCOutputFormat() {
72-
7380
}
7481

7582
@Override
@@ -86,8 +93,20 @@ public void configure(Configuration parameters) {
8693
@Override
8794
public void open(int taskNumber, int numTasks) throws IOException {
8895
try {
96+
LOG.info("PreparedStatement execute batch num is {}", batchNum);
8997
establishConnection();
9098
initMetric();
99+
100+
if (batchWaitInterval > 0) {
101+
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
102+
103+
timerService = new ScheduledThreadPoolExecutor(1);
104+
timerService.scheduleAtFixedRate(() -> {
105+
submitExecuteBatch();
106+
}, 0, batchWaitInterval, TimeUnit.MILLISECONDS);
107+
108+
}
109+
91110
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
92111
if (isReplaceInsertQuery()) {
93112
insertQuery = dbSink.buildUpdateSql(tableName, Arrays.asList(dbSink.getFieldNames()), realIndexes, fullField);
@@ -141,9 +160,6 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
141160
try {
142161
if (retract) {
143162
insertWrite(row);
144-
if (outRecords.getCount()%rowLenth == 0){
145-
LOG.info(row.toString());
146-
}
147163
outRecords.inc();
148164
} else {
149165
//do nothing
@@ -155,13 +171,11 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
155171

156172

157173
private void insertWrite(Row row) throws SQLException {
158-
159174
updatePreparedStmt(row, upload);
160175
upload.addBatch();
161-
batchCount++;
162-
if (batchCount >= batchInterval) {
163-
upload.executeBatch();
164-
batchCount = 0;
176+
batchCount.incrementAndGet();
177+
if (batchCount.get() >= batchNum) {
178+
submitExecuteBatch();
165179
}
166180
}
167181

@@ -254,6 +268,16 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
254268
}
255269
}
256270

271+
272+
private synchronized void submitExecuteBatch() {
273+
try {
274+
this.upload.executeBatch();
275+
this.batchCount.set(0);
276+
} catch (SQLException e) {
277+
LOG.error("", e);
278+
}
279+
}
280+
257281
/**
258282
* Executes prepared statement and closes all resources of this instance.
259283
*
@@ -266,19 +290,23 @@ public void close() throws IOException {
266290
upload.executeBatch();
267291
upload.close();
268292
}
293+
if (null != timerService) {
294+
timerService.shutdown();
295+
LOG.info("batch wait interval scheduled service closed ");
296+
}
269297
} catch (SQLException se) {
270-
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
298+
LOG.info("Inputformat couldn't be closed - ", se);
271299
} finally {
272300
upload = null;
273-
batchCount = 0;
301+
batchCount.set(0);
274302
}
275303

276304
try {
277305
if (dbConn != null) {
278306
dbConn.close();
279307
}
280308
} catch (SQLException se) {
281-
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
309+
LOG.info("Inputformat couldn't be closed - ", se);
282310
} finally {
283311
dbConn = null;
284312
}
@@ -336,8 +364,8 @@ public void setDbSink(RdbSink dbSink) {
336364
this.dbSink = dbSink;
337365
}
338366

339-
public void setBatchInterval(int batchInterval) {
340-
this.batchInterval = batchInterval;
367+
public void setBatchNum(int batchNum) {
368+
this.batchNum = batchNum;
341369
}
342370

343371
public void setInsertQuery(String insertQuery) {
@@ -368,6 +396,11 @@ public Map<String, List<String>> getRealIndexes() {
368396
return realIndexes;
369397
}
370398

399+
400+
public void setBatchWaitInterval(long batchWaitInterval) {
401+
this.batchWaitInterval = batchWaitInterval;
402+
}
403+
371404
public List<String> getFullField() {
372405
return fullField;
373406
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbSinkParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4343
rdbTableInfo.setUserName(MathUtil.getString(props.get(RdbTableInfo.USER_NAME_KEY.toLowerCase())));
4444
rdbTableInfo.setPassword(MathUtil.getString(props.get(RdbTableInfo.PASSWORD_KEY.toLowerCase())));
4545
rdbTableInfo.setBatchSize(MathUtil.getIntegerVal(props.get(RdbTableInfo.BATCH_SIZE_KEY.toLowerCase())));
46+
rdbTableInfo.setBatchWaitInterval(MathUtil.getLongVal(props.get(RdbTableInfo.BATCH_WAIT_INTERVAL_KEY.toLowerCase())));
4647
rdbTableInfo.setBufferSize(MathUtil.getString(props.get(RdbTableInfo.BUFFER_SIZE_KEY.toLowerCase())));
4748
rdbTableInfo.setFlushIntervalMs(MathUtil.getString(props.get(RdbTableInfo.FLUSH_INTERVALMS_KEY.toLowerCase())));
4849

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class RdbTableInfo extends TargetTableInfo {
3939

4040
public static final String BATCH_SIZE_KEY = "batchSize";
4141

42+
public static final String BATCH_WAIT_INTERVAL_KEY = "batchWaitInterval";
43+
4244
public static final String BUFFER_SIZE_KEY = "bufferSize";
4345

4446
public static final String FLUSH_INTERVALMS_KEY = "flushIntervalMs";
@@ -53,6 +55,8 @@ public class RdbTableInfo extends TargetTableInfo {
5355

5456
private Integer batchSize;
5557

58+
private Long batchWaitInterval;
59+
5660
private String bufferSize;
5761

5862
private String flushIntervalMs;
@@ -113,6 +117,15 @@ public void setFlushIntervalMs(String flushIntervalMs) {
113117
this.flushIntervalMs = flushIntervalMs;
114118
}
115119

120+
121+
public Long getBatchWaitInterval() {
122+
return batchWaitInterval;
123+
}
124+
125+
public void setBatchWaitInterval(Long batchWaitInterval) {
126+
this.batchWaitInterval = batchWaitInterval;
127+
}
128+
116129
@Override
117130
public boolean check() {
118131
Preconditions.checkNotNull(url, "rdb field of URL is required");

0 commit comments

Comments
 (0)