Skip to content

Commit d10f1a4

Browse files
committed
Ensure expectedOffset advances immediately after a claim
1 parent d41336f commit d10f1a4

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ public ProcessContinuation processElement(
504504
if (!tracker.tryClaim(rawRecord.offset())) {
505505
return ProcessContinuation.stop();
506506
}
507+
expectedOffset = rawRecord.offset() + 1;
507508
try {
508509
KafkaRecord<K, V> kafkaRecord =
509510
new KafkaRecord<>(
@@ -520,7 +521,6 @@ public ProcessContinuation processElement(
520521
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
521522
avgRecordSize.update(recordSize);
522523
rawSizes.update(recordSize);
523-
expectedOffset = rawRecord.offset() + 1;
524524
Instant outputTimestamp;
525525
// The outputTimestamp and watermark will be computed by timestampPolicy, where the
526526
// WatermarkEstimator should be a manual one.

0 commit comments

Comments
 (0)