Skip to content

Commit b930926

Browse files
committed
优化 output format 的第一条数据输入和第一条脏数据输出
1 parent 83daea8 commit b930926

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private void insertWrite(Row row) {
216216
}
217217
} catch (Exception e) {
218218
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
219-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
219+
LOG.error("record insert failed, total dirty num:{}, current record:{}", outDirtyRecords.getCount(), row.toString());
220220
LOG.error("", e);
221221
}
222222

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void writeRecord(Tuple2 record) throws IOException {
108108

109109
if (row.getArity() != fieldNames.length) {
110110
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
111-
LOG.error("record insert failed ..", row.toString());
111+
LOG.error("record insert failed:{}", row.toString());
112112
LOG.error("cause by row.getArity() != fieldNames.length");
113113
}
114114

@@ -131,7 +131,7 @@ public void writeRecord(Tuple2 record) throws IOException {
131131
} catch (KuduException e) {
132132

133133
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
134-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
134+
LOG.error("record insert failed, total dirty record:{} current row:{}", outDirtyRecords.getCount(), row.toString());
135135
LOG.error("", e);
136136
}
137137

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,15 @@ private void insertWrite(Row row) {
194194
}
195195
}
196196
} catch (SQLException e) {
197-
LOG.error("", e);
197+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
198+
outDirtyRecords.inc(batchNum == 1 ? batchNum : rows.size());
199+
LOG.error("record insert failed,dirty record num:{}, current row:{}", outDirtyRecords.getCount(), row.toString());
200+
LOG.error("", e);
201+
} else {
202+
outDirtyRecords.inc(batchNum == 1 ? batchNum : rows.size());
203+
}
204+
205+
198206
}
199207

200208
}

0 commit comments

Comments
 (0)