Skip to content

Commit 2a90b45

Browse files
committed
data output frequency
1 parent e9a39a8 commit 2a90b45

File tree

3 files changed

+16
-4
lines changed

3 files changed

+16
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/metric/MetricConstant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class MetricConstant {
4545

4646
public static final String DT_NUM_RECORDS_OUT = "dtNumRecordsOut";
4747

48+
public static final String DT_NUM_DIRTY_RECORDS_OUT = "dtNumDirtyRecordsOut";
49+
4850
public static final String DT_NUM_RECORDS_OUT_RATE = "dtNumRecordsOutRate";
4951

5052
public static final String DT_EVENT_DELAY_GAUGE = "dtEventDelay";

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
3434

3535
protected transient Meter outRecordsRate;
3636

37+
protected transient Counter outDirtyRecords;
38+
3739
public void initMetric() {
3840
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
41+
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
3942
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
4043
}
4144

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public class RetractJDBCOutputFormat extends MetricOutputFormat {
4949

5050
private static final Logger LOG = LoggerFactory.getLogger(RetractJDBCOutputFormat.class);
5151

52+
private static int dirtyDataPrintFrequency = 1000;
53+
private static int receiveDataPrintFrequency = 1000;
54+
5255
private String username;
5356
private String password;
5457
private String drivername;
@@ -164,8 +167,11 @@ public void writeRecord(Tuple2 tuple2) {
164167
}
165168

166169
if (retract) {
167-
insertWrite(row);
168170
outRecords.inc();
171+
if (outRecords.getCount() % receiveDataPrintFrequency == 0) {
172+
LOG.info("Receive data : {}", row);
173+
}
174+
insertWrite(row);
169175
} else {
170176
//do nothing
171177
}
@@ -197,14 +203,15 @@ private void writeSingleRecord(Row row) {
197203
upload.execute();
198204
dbConn.commit();
199205
} catch (SQLException e) {
200-
LOG.error("record insert failed ..", row.toString());
201-
LOG.error("", e);
206+
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0) {
207+
LOG.error("record insert failed ..", row.toString());
208+
LOG.error("", e);
209+
}
202210
}
203211
}
204212

205213
private synchronized void submitExecuteBatch() {
206214
try {
207-
LOG.info("submitExecuteBatch start......");
208215
this.upload.executeBatch();
209216
dbConn.commit();
210217
} catch (SQLException e) {

0 commit comments

Comments
 (0)