Skip to content

Commit 13a8dd9

Browse files
committed
add scheduled
1 parent 235dcc8 commit 13a8dd9

File tree

5 files changed

+97
-30
lines changed

5 files changed

+97
-30
lines changed

pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
<module>kafka10</module>
1515
<module>kafka11</module>
1616
<module>mysql</module>
17-
<module>hbase</module>
18-
<module>elasticsearch5</module>
19-
<module>mongo</module>
20-
<module>redis5</module>
17+
<!--<module>hbase</module>-->
18+
<!--<module>elasticsearch5</module>-->
19+
<!--<module>mongo</module>-->
20+
<!--<module>redis5</module>-->
2121
<module>launcher</module>
2222
<module>rdb</module>
2323
<module>sqlserver</module>
2424
<module>oracle</module>
25-
<module>cassandra</module>
25+
<!--<module>cassandra</module>-->
2626
</modules>
2727

2828

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ public abstract class RdbSink implements RetractStreamTableSink<Row>, Serializab
6161

6262
protected String dbType;
6363

64-
protected int batchInterval = 1;
64+
protected int batchNum = 1;
65+
66+
protected long batchWaitInterval;
6567

6668
protected int[] sqlTypes;
6769

@@ -89,7 +91,8 @@ public RichSinkFunction createJdbcSinkFunc() {
8991
outputFormat.setUsername(userName);
9092
outputFormat.setPassword(password);
9193
outputFormat.setInsertQuery(sql);
92-
outputFormat.setBatchInterval(batchInterval);
94+
outputFormat.setBatchNum(batchNum);
95+
outputFormat.setBatchWaitInterval(batchWaitInterval);
9396
outputFormat.setTypesArray(sqlTypes);
9497
outputFormat.setTableName(tableName);
9598
outputFormat.setDbType(dbType);
@@ -112,7 +115,12 @@ public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
112115

113116
Integer tmpSqlBatchSize = rdbTableInfo.getBatchSize();
114117
if (tmpSqlBatchSize != null) {
115-
setBatchInterval(tmpSqlBatchSize);
118+
setBatchNum(tmpSqlBatchSize);
119+
}
120+
121+
long batchWaitInterval = rdbTableInfo.getBatchWaitInterval();
122+
if (batchWaitInterval > 0) {
123+
setBatchWaitInterval(batchWaitInterval);
116124
}
117125

118126
Integer tmpSinkParallelism = rdbTableInfo.getParallelism();
@@ -198,13 +206,13 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
198206
return this;
199207
}
200208

201-
/**
202-
* Set the default frequency submit updated every submission
203-
*
204-
* @param batchInterval
205-
*/
206-
public void setBatchInterval(int batchInterval) {
207-
this.batchInterval = batchInterval;
209+
210+
public void setBatchNum(int batchNum) {
211+
this.batchNum = batchNum;
212+
}
213+
214+
public void setBatchWaitInterval(long batchWaitInterval) {
215+
this.batchWaitInterval = batchWaitInterval;
208216
}
209217

210218
@Override

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,32 @@
2020

2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2222
import org.apache.commons.lang3.StringUtils;
23-
import org.apache.flink.api.java.tuple.Tuple;
2423
import org.apache.flink.api.java.tuple.Tuple2;
2524
import org.apache.flink.configuration.Configuration;
2625
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
2726
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2827
import org.apache.flink.types.Row;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
31-
import java.sql.*;
32-
import java.util.*;
3330
import java.io.IOException;
31+
import java.sql.Connection;
32+
import java.sql.DriverManager;
33+
import java.sql.PreparedStatement;
34+
import java.sql.SQLException;
35+
import java.util.Arrays;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.concurrent.ScheduledFuture;
39+
import java.util.concurrent.ScheduledThreadPoolExecutor;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
3442

3543
import com.dtstack.flink.sql.sink.MetricOutputFormat;
3644

3745
/**
3846
* OutputFormat to write tuples into a database.
3947
* The OutputFormat has to be configured using the supplied OutputFormatBuilder.
4048
*
41-
* @see Tuple
42-
* @see DriverManager
4349
*/
4450
public class RetractJDBCOutputFormat extends MetricOutputFormat {
4551
private static final long serialVersionUID = 1L;
@@ -53,22 +59,25 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5359
private String tableName;
5460
private String dbType;
5561
private RdbSink dbSink;
56-
private int batchInterval = 5000;
62+
private long batchWaitInterval;
63+
private int batchNum;
5764
private String insertQuery;
5865
public int[] typesArray;
5966

6067
private Connection dbConn;
6168
private PreparedStatement upload;
6269

63-
private int batchCount = 0;
70+
private AtomicInteger batchCount = new AtomicInteger(0);
6471

6572
//index field
6673
private Map<String, List<String>> realIndexes = Maps.newHashMap();
6774
//full field
6875
private List<String> fullField = Lists.newArrayList();
6976

70-
public RetractJDBCOutputFormat() {
77+
private transient ScheduledThreadPoolExecutor timerService;
7178

79+
public RetractJDBCOutputFormat() {
80+
this.timerService = new ScheduledThreadPoolExecutor(1);
7281
}
7382

7483
@Override
@@ -151,13 +160,23 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
151160

152161

153162
private void insertWrite(Row row) throws SQLException {
154-
155163
updatePreparedStmt(row, upload);
156164
upload.addBatch();
157-
batchCount++;
158-
if (batchCount >= batchInterval) {
165+
ScheduledFuture<?> scheduledFuture = null;
166+
167+
if (batchWaitInterval > 0 && batchCount.get() == 0) {
168+
scheduledFuture = registerTimer(batchWaitInterval, this);
169+
}
170+
171+
batchCount.incrementAndGet();
172+
173+
if (batchCount.get() >= batchNum) {
159174
upload.executeBatch();
160-
batchCount = 0;
175+
batchCount.set(0);
176+
177+
if (scheduledFuture != null) {
178+
scheduledFuture.cancel(true);
179+
}
161180
}
162181
}
163182

@@ -250,6 +269,28 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
250269
}
251270
}
252271

272+
public ScheduledFuture<?> registerTimer(long delay, RetractJDBCOutputFormat outputFormat) {
273+
return timerService.schedule(new DelayExecuteBatch(outputFormat), delay, TimeUnit.MILLISECONDS);
274+
}
275+
276+
private final static class DelayExecuteBatch implements Runnable {
277+
RetractJDBCOutputFormat outputFormat;
278+
279+
private DelayExecuteBatch(RetractJDBCOutputFormat outputFormat) {
280+
this.outputFormat = outputFormat;
281+
}
282+
283+
@Override
284+
public void run() {
285+
try {
286+
outputFormat.upload.executeBatch();
287+
outputFormat.batchCount.set(0);
288+
} catch (SQLException e) {
289+
LOG.error("delay batch insert error...", e);
290+
}
291+
}
292+
}
293+
253294
/**
254295
* Executes prepared statement and closes all resources of this instance.
255296
*
@@ -266,7 +307,7 @@ public void close() throws IOException {
266307
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
267308
} finally {
268309
upload = null;
269-
batchCount = 0;
310+
batchCount.set(0);
270311
}
271312

272313
try {
@@ -332,8 +373,8 @@ public void setDbSink(RdbSink dbSink) {
332373
this.dbSink = dbSink;
333374
}
334375

335-
public void setBatchInterval(int batchInterval) {
336-
this.batchInterval = batchInterval;
376+
public void setBatchNum(int batchNum) {
377+
this.batchNum = batchNum;
337378
}
338379

339380
public void setInsertQuery(String insertQuery) {
@@ -364,6 +405,11 @@ public Map<String, List<String>> getRealIndexes() {
364405
return realIndexes;
365406
}
366407

408+
409+
public void setBatchWaitInterval(long batchWaitInterval) {
410+
this.batchWaitInterval = batchWaitInterval;
411+
}
412+
367413
public List<String> getFullField() {
368414
return fullField;
369415
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbSinkParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4343
rdbTableInfo.setUserName(MathUtil.getString(props.get(RdbTableInfo.USER_NAME_KEY.toLowerCase())));
4444
rdbTableInfo.setPassword(MathUtil.getString(props.get(RdbTableInfo.PASSWORD_KEY.toLowerCase())));
4545
rdbTableInfo.setBatchSize(MathUtil.getIntegerVal(props.get(RdbTableInfo.BATCH_SIZE_KEY.toLowerCase())));
46+
rdbTableInfo.setBatchWaitInterval(MathUtil.getLongVal(props.get(RdbTableInfo.BATCH_WAIT_INTERVAL_KEY.toLowerCase())));
4647
rdbTableInfo.setBufferSize(MathUtil.getString(props.get(RdbTableInfo.BUFFER_SIZE_KEY.toLowerCase())));
4748
rdbTableInfo.setFlushIntervalMs(MathUtil.getString(props.get(RdbTableInfo.FLUSH_INTERVALMS_KEY.toLowerCase())));
4849

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class RdbTableInfo extends TargetTableInfo {
3939

4040
public static final String BATCH_SIZE_KEY = "batchSize";
4141

42+
public static final String BATCH_WAIT_INTERVAL_KEY = "batchWaitInterval";
43+
4244
public static final String BUFFER_SIZE_KEY = "bufferSize";
4345

4446
public static final String FLUSH_INTERVALMS_KEY = "flushIntervalMs";
@@ -53,6 +55,8 @@ public class RdbTableInfo extends TargetTableInfo {
5355

5456
private Integer batchSize;
5557

58+
private long batchWaitInterval;
59+
5660
private String bufferSize;
5761

5862
private String flushIntervalMs;
@@ -113,6 +117,14 @@ public void setFlushIntervalMs(String flushIntervalMs) {
113117
this.flushIntervalMs = flushIntervalMs;
114118
}
115119

120+
public long getBatchWaitInterval() {
121+
return batchWaitInterval;
122+
}
123+
124+
public void setBatchWaitInterval(long batchWaitInterval) {
125+
this.batchWaitInterval = batchWaitInterval;
126+
}
127+
116128
@Override
117129
public boolean check() {
118130
Preconditions.checkNotNull(url, "rdb field of URL is required");

0 commit comments

Comments
 (0)