Skip to content

Commit d5cabbb

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into 1.8_merge_1.5
# Conflicts: # kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java
2 parents 4a87626 + 0e10e60 commit d5cabbb

File tree

3 files changed

+74
-24
lines changed

3 files changed

+74
-24
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MetricConstant {
4545

4646
public static final String DT_NUM_RECORDS_OUT = "dtNumRecordsOut";
4747

48+
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
49+
4850
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
4951

5052
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
3232

3333
protected transient Counter outRecords;
3434

35+
protected transient Counter outDirtyRecords;
36+
3537
protected transient Meter outRecordsRate;
3638

3739
public void initMetric() {
3840
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
41+
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
3942
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
4043
}
4144

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.sql.DriverManager;
3333
import java.sql.PreparedStatement;
3434
import java.sql.SQLException;
35+
import java.util.ArrayList;
3536
import java.util.Arrays;
3637
import java.util.List;
3738
import java.util.Map;
@@ -66,9 +67,11 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6667
private String insertQuery;
6768
public int[] typesArray;
6869

70+
/** 存储用于批量写入的数据 */
71+
protected List<Row> rows = new ArrayList();
72+
6973
private Connection dbConn;
7074
private PreparedStatement upload;
71-
private AtomicInteger batchCount = new AtomicInteger(0);
7275
private transient ScheduledThreadPoolExecutor timerService;
7376

7477

@@ -95,7 +98,7 @@ public void configure(Configuration parameters) {
9598
public void open(int taskNumber, int numTasks) throws IOException {
9699
try {
97100
LOG.info("PreparedStatement execute batch num is {}", batchNum);
98-
establishConnection();
101+
dbConn = establishConnection();
99102
initMetric();
100103

101104
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
@@ -107,7 +110,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
107110
throw new SQLException("Table " + tableName + " doesn't exist");
108111
}
109112

110-
if (batchWaitInterval > 0) {
113+
if (batchWaitInterval > 0 && batchNum > 1) {
111114
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
112115

113116
timerService = new ScheduledThreadPoolExecutor(1);
@@ -127,13 +130,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
127130
}
128131

129132

130-
private void establishConnection() throws SQLException, ClassNotFoundException {
133+
private Connection establishConnection() throws SQLException, ClassNotFoundException {
134+
Connection connection ;
131135
Class.forName(drivername);
132136
if (username == null) {
133-
dbConn = DriverManager.getConnection(dbURL);
137+
connection = DriverManager.getConnection(dbURL);
134138
} else {
135-
dbConn = DriverManager.getConnection(dbURL, username, password);
139+
connection = DriverManager.getConnection(dbURL, username, password);
136140
}
141+
connection.setAutoCommit(false);
142+
return connection;
137143
}
138144

139145
/**
@@ -150,35 +156,52 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
150156
* @see PreparedStatement
151157
*/
152158
@Override
153-
public void writeRecord(Tuple2 tuple2) throws IOException {
159+
public void writeRecord(Tuple2 tuple2) {
154160

155161
Tuple2<Boolean, Row> tupleTrans = tuple2;
156162
Boolean retract = tupleTrans.getField(0);
157163
Row row = tupleTrans.getField(1);
158164

159-
160165
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
161166
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
162167
}
168+
169+
if (retract) {
170+
insertWrite(row);
171+
outRecords.inc();
172+
} else {
173+
//do nothing
174+
}
175+
}
176+
177+
178+
private void insertWrite(Row row) {
179+
checkConnectionOpen(dbConn);
163180
try {
164-
if (retract) {
165-
insertWrite(row);
166-
outRecords.inc();
181+
if (batchNum == 1) {
182+
writeSingleRecord(row);
167183
} else {
168-
//do nothing
184+
updatePreparedStmt(row, upload);
185+
rows.add(row);
186+
upload.addBatch();
187+
if (rows.size() >= batchNum) {
188+
submitExecuteBatch();
189+
}
169190
}
170-
} catch (SQLException | IllegalArgumentException e) {
171-
throw new IllegalArgumentException("writeRecord() failed", e);
191+
} catch (SQLException e) {
192+
LOG.error("", e);
172193
}
173-
}
174194

195+
}
175196

176-
private void insertWrite(Row row) throws SQLException {
177-
updatePreparedStmt(row, upload);
178-
upload.addBatch();
179-
batchCount.incrementAndGet();
180-
if (batchCount.get() >= batchNum) {
181-
submitExecuteBatch();
197+
private void writeSingleRecord(Row row) {
198+
try {
199+
updatePreparedStmt(row, upload);
200+
upload.execute();
201+
} catch (SQLException e) {
202+
outDirtyRecords.inc();
203+
LOG.error("record insert failed ..", row.toString());
204+
LOG.error("", e);
182205
}
183206
}
184207

@@ -276,9 +299,31 @@ private synchronized void submitExecuteBatch() {
276299
try {
277300
LOG.info("submitExecuteBatch start......");
278301
this.upload.executeBatch();
279-
this.batchCount.set(0);
302+
dbConn.commit();
280303
} catch (SQLException e) {
281-
LOG.error("", e);
304+
try {
305+
dbConn.rollback();
306+
} catch (SQLException e1) {
307+
LOG.error("rollback data error !", e);
308+
}
309+
310+
rows.forEach(this::writeSingleRecord);
311+
} finally {
312+
rows.clear();
313+
}
314+
}
315+
316+
private void checkConnectionOpen(Connection dbConn) {
317+
try {
318+
if (dbConn.isClosed()) {
319+
LOG.info("db connection reconnect..");
320+
dbConn= establishConnection();
321+
upload = dbConn.prepareStatement(insertQuery);
322+
}
323+
} catch (SQLException e) {
324+
LOG.error("check connection open failed..", e);
325+
} catch (ClassNotFoundException e) {
326+
LOG.error("load jdbc class error when reconnect db..", e);
282327
}
283328
}
284329

@@ -302,7 +347,7 @@ public void close() throws IOException {
302347
LOG.info("Inputformat couldn't be closed - ", se);
303348
} finally {
304349
upload = null;
305-
batchCount.set(0);
350+
rows.clear();
306351
}
307352

308353
try {

0 commit comments

Comments
 (0)