Skip to content

Commit 40e2b0d

Browse files
authored
Only log Kafka commit failures as warning every 10m and if failing for > 10m. (#36685)
Otherwise log info level including that it will be retried for failed commits.
1 parent 093084a commit 40e2b0d

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public boolean advance() throws IOException {
157157
*/
158158
while (true) {
159159
if (curBatch.hasNext()) {
160-
// Initalize metrics container.
160+
// Initialize metrics container.
161161
kafkaResults = KafkaSinkMetrics.kafkaMetrics();
162162

163163
PartitionState<K, V> pState = curBatch.next();
@@ -374,6 +374,7 @@ public boolean offsetBasedDeduplicationSupported() {
374374
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN = Duration.millis(1);
375375
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = Duration.millis(20);
376376
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
377+
private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL = Duration.standardMinutes(10);
377378

378379
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
379380
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
@@ -392,6 +393,7 @@ public boolean offsetBasedDeduplicationSupported() {
392393
private AtomicReference<@Nullable KafkaCheckpointMark> finalizedCheckpointMark =
393394
new AtomicReference<>();
394395
private AtomicBoolean closed = new AtomicBoolean(false);
396+
private Instant nextAllowedCommitFailLogTime = Instant.ofEpochMilli(0);
395397

396398
// Backlog support :
397399
// Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
@@ -612,6 +614,7 @@ private void commitCheckpointMark() {
612614
if (checkpointMark != null) {
613615
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
614616
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
617+
Instant now = Instant.now();
615618

616619
try {
617620
consumer.commitSync(
@@ -621,11 +624,24 @@ private void commitCheckpointMark() {
621624
Collectors.toMap(
622625
p -> new TopicPartition(p.getTopic(), p.getPartition()),
623626
p -> new OffsetAndMetadata(p.getNextOffset()))));
627+
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
624628
} catch (Exception e) {
625629
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
626630
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
627-
LOG.warn(
628-
String.format("%s: Could not commit finalized checkpoint %s", this, checkpointMark), e);
631+
if (now.isAfter(nextAllowedCommitFailLogTime)) {
632+
LOG.warn(
633+
String.format(
634+
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
635+
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
636+
e);
637+
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
638+
} else {
639+
LOG.info(
640+
String.format(
641+
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
642+
this, checkpointMark),
643+
e);
644+
}
629645
}
630646
}
631647
}

0 commit comments

Comments
 (0)