Skip to content

Commit f81486e

Browse files
committed
Ensure expectedOffset advances immediately after a claim
1 parent 15d2ff0 commit f81486e

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ public ProcessContinuation processElement(
512512
if (!tracker.tryClaim(rawRecord.offset())) {
513513
return ProcessContinuation.stop();
514514
}
515+
expectedOffset = rawRecord.offset() + 1;
515516
try {
516517
KafkaRecord<K, V> kafkaRecord =
517518
new KafkaRecord<>(
@@ -528,7 +529,6 @@ public ProcessContinuation processElement(
528529
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
529530
avgRecordSize.update(recordSize);
530531
rawSizes.update(recordSize);
531-
expectedOffset = rawRecord.offset() + 1;
532532
Instant outputTimestamp;
533533
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
534534
// WatermarkEstimator should be a manual one.

0 commit comments

Comments
 (0)