Skip to content

Commit 0c4ddc7

Browse files
committed
Merge branch '1.8_merge_1.5' into 'v1.8.0_dev'
1.8 merge 1.5 See merge request !59
2 parents a63735b + b2b0877 commit 0c4ddc7

File tree

6 files changed

+77
-31
lines changed

6 files changed

+77
-31
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MetricConstant {
4545

4646
public static final String DT_NUM_RECORDS_OUT = "dtNumRecordsOut";
4747

48+
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
49+
4850
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
4951

5052
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
3232

3333
protected transient Counter outRecords;
3434

35+
protected transient Counter outDirtyRecords;
36+
3537
protected transient Meter outRecordsRate;
3638

3739
public void initMetric() {
3840
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
41+
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
3942
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
4043
}
4144

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
109109
serializationSchema
110110
);
111111

112-
113112
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
114113
return record.f1;
115114
}).returns(getOutputType().getTypeAt(1));

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
112112
serializationSchema
113113
);
114114

115-
116115
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
117116
return record.f1;
118117
}).returns(getOutputType().getTypeAt(1));

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9292
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
9393
}
9494
this.schema = schemaBuilder.build();
95-
9695
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
9796
return this;
9897
}
@@ -111,8 +110,6 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
111110
partitioner,
112111
serializationSchema
113112
);
114-
115-
116113
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
117114
return record.f1;
118115
}).returns(getOutputType().getTypeAt(1));
@@ -122,7 +119,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
122119

123120
@Override
124121
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
125-
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
122+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
126123
}
127124

128125
@Override
@@ -136,7 +133,7 @@ public TypeInformation<?>[] getFieldTypes() {
136133
}
137134

138135
@Override
139-
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
136+
public TableSink<Tuple2<Boolean, Row>>configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
140137
this.fieldNames = fieldNames;
141138
this.fieldTypes = fieldTypes;
142139
return this;

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.sql.DriverManager;
3333
import java.sql.PreparedStatement;
3434
import java.sql.SQLException;
35+
import java.util.ArrayList;
3536
import java.util.Arrays;
3637
import java.util.List;
3738
import java.util.Map;
@@ -66,9 +67,11 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
6667
private String insertQuery;
6768
public int[] typesArray;
6869

70+
/** 存储用于批量写入的数据 */
71+
protected List<Row> rows = new ArrayList();
72+
6973
private Connection dbConn;
7074
private PreparedStatement upload;
71-
private AtomicInteger batchCount = new AtomicInteger(0);
7275
private transient ScheduledThreadPoolExecutor timerService;
7376

7477

@@ -95,7 +98,7 @@ public void configure(Configuration parameters) {
9598
public void open(int taskNumber, int numTasks) throws IOException {
9699
try {
97100
LOG.info("PreparedStatement execute batch num is {}", batchNum);
98-
establishConnection();
101+
dbConn = establishConnection();
99102
initMetric();
100103

101104
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
@@ -107,7 +110,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
107110
throw new SQLException("Table " + tableName + " doesn't exist");
108111
}
109112

110-
if (batchWaitInterval > 0) {
113+
if (batchWaitInterval > 0 && batchNum > 1) {
111114
LOG.info("open batch wait interval scheduled, interval is {} ms", batchWaitInterval);
112115

113116
timerService = new ScheduledThreadPoolExecutor(1);
@@ -127,13 +130,16 @@ public void open(int taskNumber, int numTasks) throws IOException {
127130
}
128131

129132

130-
private void establishConnection() throws SQLException, ClassNotFoundException {
133+
private Connection establishConnection() throws SQLException, ClassNotFoundException {
134+
Connection connection ;
131135
Class.forName(drivername);
132136
if (username == null) {
133-
dbConn = DriverManager.getConnection(dbURL);
137+
connection = DriverManager.getConnection(dbURL);
134138
} else {
135-
dbConn = DriverManager.getConnection(dbURL, username, password);
139+
connection = DriverManager.getConnection(dbURL, username, password);
136140
}
141+
connection.setAutoCommit(false);
142+
return connection;
137143
}
138144

139145
/**
@@ -150,35 +156,53 @@ private void establishConnection() throws SQLException, ClassNotFoundException {
150156
* @see PreparedStatement
151157
*/
152158
@Override
153-
public void writeRecord(Tuple2 tuple2) throws IOException {
159+
public void writeRecord(Tuple2 tuple2) {
154160

155161
Tuple2<Boolean, Row> tupleTrans = tuple2;
156162
Boolean retract = tupleTrans.getField(0);
157163
Row row = tupleTrans.getField(1);
158164

159-
160165
if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
161166
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
162167
}
168+
169+
if (retract) {
170+
insertWrite(row);
171+
outRecords.inc();
172+
} else {
173+
//do nothing
174+
}
175+
}
176+
177+
178+
private void insertWrite(Row row) {
179+
checkConnectionOpen(dbConn);
163180
try {
164-
if (retract) {
165-
insertWrite(row);
166-
outRecords.inc();
181+
if (batchNum == 1) {
182+
writeSingleRecord(row);
167183
} else {
168-
//do nothing
184+
updatePreparedStmt(row, upload);
185+
rows.add(row);
186+
upload.addBatch();
187+
if (rows.size() >= batchNum) {
188+
submitExecuteBatch();
189+
}
169190
}
170-
} catch (SQLException | IllegalArgumentException e) {
171-
throw new IllegalArgumentException("writeRecord() failed", e);
191+
} catch (SQLException e) {
192+
LOG.error("", e);
172193
}
173-
}
174194

195+
}
175196

176-
private void insertWrite(Row row) throws SQLException {
177-
updatePreparedStmt(row, upload);
178-
upload.addBatch();
179-
batchCount.incrementAndGet();
180-
if (batchCount.get() >= batchNum) {
181-
submitExecuteBatch();
197+
private void writeSingleRecord(Row row) {
198+
try {
199+
updatePreparedStmt(row, upload);
200+
upload.execute();
201+
dbConn.commit();
202+
} catch (SQLException e) {
203+
outDirtyRecords.inc();
204+
LOG.error("record insert failed ..", row.toString());
205+
LOG.error("", e);
182206
}
183207
}
184208

@@ -276,9 +300,31 @@ private synchronized void submitExecuteBatch() {
276300
try {
277301
LOG.info("submitExecuteBatch start......");
278302
this.upload.executeBatch();
279-
this.batchCount.set(0);
303+
dbConn.commit();
280304
} catch (SQLException e) {
281-
LOG.error("", e);
305+
try {
306+
dbConn.rollback();
307+
} catch (SQLException e1) {
308+
LOG.error("rollback data error !", e);
309+
}
310+
311+
rows.forEach(this::writeSingleRecord);
312+
} finally {
313+
rows.clear();
314+
}
315+
}
316+
317+
private void checkConnectionOpen(Connection dbConn) {
318+
try {
319+
if (dbConn.isClosed()) {
320+
LOG.info("db connection reconnect..");
321+
dbConn= establishConnection();
322+
upload = dbConn.prepareStatement(insertQuery);
323+
}
324+
} catch (SQLException e) {
325+
LOG.error("check connection open failed..", e);
326+
} catch (ClassNotFoundException e) {
327+
LOG.error("load jdbc class error when reconnect db..", e);
282328
}
283329
}
284330

@@ -302,7 +348,7 @@ public void close() throws IOException {
302348
LOG.info("Inputformat couldn't be closed - ", se);
303349
} finally {
304350
upload = null;
305-
batchCount.set(0);
351+
rows.clear();
306352
}
307353

308354
try {

0 commit comments

Comments
 (0)