Skip to content

Commit 0548b64

Browse files
committed
format print log
1 parent 8f2336f commit 0548b64

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
5353

5454
private static final Logger LOG = LoggerFactory.getLogger(RetractJDBCOutputFormat.class);
5555

56+
private static int dirtyDataPrintFrequency = 1000;
57+
58+
private static int receiveDataPrintFrequency = 1000;
59+
5660
private String username;
5761
private String password;
5862
private String drivername;
@@ -167,8 +171,11 @@ public void writeRecord(Tuple2 tuple2) {
167171
}
168172

169173
if (retract) {
170-
insertWrite(row);
171174
outRecords.inc();
175+
if (outRecords.getCount() % receiveDataPrintFrequency == 0) {
176+
LOG.info("Receive data : {}", row);
177+
}
178+
insertWrite(row);
172179
} else {
173180
//do nothing
174181
}
@@ -200,8 +207,10 @@ private void writeSingleRecord(Row row) {
200207
upload.execute();
201208
} catch (SQLException e) {
202209
outDirtyRecords.inc();
203-
LOG.error("record insert failed ..", row.toString());
204-
LOG.error("", e);
210+
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0) {
211+
LOG.error("record insert failed ..", row.toString());
212+
LOG.error("", e);
213+
}
205214
}
206215
}
207216

@@ -297,7 +306,6 @@ private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLExce
297306

298307
private synchronized void submitExecuteBatch() {
299308
try {
300-
LOG.info("submitExecuteBatch start......");
301309
this.upload.executeBatch();
302310
dbConn.commit();
303311
} catch (SQLException e) {

0 commit comments

Comments
 (0)