Skip to content

Commit 8068376

Browse files
committed
[FLINK-25920] Improve logging in committable handling of the sink
(cherry picked from commit 2cdd3f0)
1 parent 7d0bb78 commit 8068376

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ private void emit(
274274
int numberOfParallelSubtasks,
275275
long checkpointId,
276276
Collection<CommT> committables) {
277-
output.collect(
277+
emit(
278278
new StreamRecord<>(
279279
new CommittableSummary<>(
280280
indexOfThisSubtask,
@@ -284,13 +284,18 @@ private void emit(
284284
committables.size(),
285285
0)));
286286
for (CommT committable : committables) {
287-
output.collect(
287+
emit(
288288
new StreamRecord<>(
289289
new CommittableWithLineage<>(
290290
committable, checkpointId, indexOfThisSubtask)));
291291
}
292292
}
293293

294+
private void emit(StreamRecord<CommittableMessage<CommT>> message) {
295+
LOG.debug("Sending message to committer: {}", message);
296+
output.collect(message);
297+
}
298+
294299
private WriterInitContext createInitContext(OptionalLong restoredCheckpointId) {
295300
return new InitContextImpl(
296301
getRuntimeContext(),

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
2525
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import java.io.IOException;
2831
import java.util.ArrayList;
2932
import java.util.Collection;
@@ -43,6 +46,9 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa
4346
private final int numberOfSubtasks;
4447
private final SinkCommitterMetricGroup metricGroup;
4548

49+
private static final Logger LOG =
50+
LoggerFactory.getLogger(CheckpointCommittableManagerImpl.class);
51+
4652
CheckpointCommittableManagerImpl(
4753
int subtaskId,
4854
int numberOfSubtasks,
@@ -82,6 +88,7 @@ void addSummary(CommittableSummary<CommT> summary) {
8288
SubtaskCommittableManager<CommT> merged =
8389
subtasksCommittableManagers.merge(
8490
summary.getSubtaskId(), manager, SubtaskCommittableManager::merge);
91+
LOG.debug("Adding EOI summary (new={}}, merged={}}).", manager, merged);
8592
} else {
8693
SubtaskCommittableManager<CommT> existing =
8794
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
@@ -90,6 +97,11 @@ void addSummary(CommittableSummary<CommT> summary) {
9097
String.format(
9198
"Received duplicate committable summary for checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of FLINK-25920",
9299
checkpointId, summary.getSubtaskId(), manager, existing));
100+
} else {
101+
LOG.debug(
102+
"Setting the summary for checkpointId {} with {}",
103+
this.checkpointId,
104+
manager);
93105
}
94106
}
95107
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,24 @@ SubtaskCommittableManager<CommT> copy() {
204204
checkpointId,
205205
metricGroup);
206206
}
207+
208+
@Override
209+
public String toString() {
210+
return "SubtaskCommittableManager{"
211+
+ "requests="
212+
+ requests
213+
+ ", numExpectedCommittables="
214+
+ numExpectedCommittables
215+
+ ", checkpointId="
216+
+ checkpointId
217+
+ ", subtaskId="
218+
+ subtaskId
219+
+ ", numDrained="
220+
+ numDrained
221+
+ ", numFailed="
222+
+ numFailed
223+
+ ", metricGroup="
224+
+ metricGroup
225+
+ '}';
226+
}
207227
}

0 commit comments

Comments
 (0)