Skip to content

Commit 3f172ec

Browse files
committed
Remove obsolete offset within range check
1 parent d42bbec commit 3f172ec

File tree

3 files changed

+6
-54
lines changed

3 files changed

+6
-54
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ public boolean start() throws IOException {
151151
@Override
152152
public boolean advance() throws IOException {
153153
/* Read first record (if any). we need to loop here because :
154-
* - (a) some records initially need to be skipped if they are before consumedOffset
155154
* - (b) if curBatch is empty, we want to fetch next batch and then advance.
156155
* - (c) curBatch is an iterator of iterators. we interleave the records from each.
157156
* curBatch.next() might return an empty iterator.
@@ -175,18 +174,6 @@ public boolean advance() throws IOException {
175174
ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
176175
long expected = pState.nextOffset;
177176
long offset = rawRecord.offset();
178-
179-
if (offset < expected) { // -- (a)
180-
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
181-
// should we check if the offset is way off from consumedOffset (say > 1M)?
182-
LOG.warn(
183-
"{}: ignoring already consumed offset {} for {}",
184-
this,
185-
offset,
186-
pState.topicPartition);
187-
continue;
188-
}
189-
190177
long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.
191178

192179
if (curRecord == null) {

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -445,12 +445,9 @@ public ProcessContinuation processElement(
445445
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
446446
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
447447
consumer.assign(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
448-
long startOffset = tracker.currentRestriction().getFrom();
449-
long expectedOffset = startOffset;
450-
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
448+
long expectedOffset = tracker.currentRestriction().getFrom();
449+
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), expectedOffset);
451450
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
452-
long skippedRecords = 0L;
453-
final Stopwatch sw = Stopwatch.createStarted();
454451

455452
while (true) {
456453
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
@@ -468,36 +465,6 @@ public ProcessContinuation processElement(
468465
return ProcessContinuation.resume();
469466
}
470467
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
471-
// If the Kafka consumer returns a record with an offset that is already processed
472-
// the record can be safely skipped. This is needed because there is a possibility
473-
// that the seek() above fails to move the offset to the desired position. In which
474-
// case poll() would return records that are already cnsumed.
475-
if (rawRecord.offset() < startOffset) {
476-
// If the start offset is not reached even after skipping the records for 10 seconds
477-
// then the processing is stopped with a backoff to give the Kakfa server some time
478-
// catch up.
479-
if (sw.elapsed().getSeconds() > 10L) {
480-
LOG.error(
481-
"The expected offset ({}) was not reached even after"
482-
+ " skipping consumed records for 10 seconds. The offset we could"
483-
+ " reach was {}. The processing of this bundle will be attempted"
484-
+ " at a later time.",
485-
expectedOffset,
486-
rawRecord.offset());
487-
return ProcessContinuation.resume()
488-
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
489-
}
490-
skippedRecords++;
491-
continue;
492-
}
493-
if (skippedRecords > 0L) {
494-
LOG.warn(
495-
"{} records were skipped due to seek returning an"
496-
+ " earlier position than requested position of {}",
497-
skippedRecords,
498-
expectedOffset);
499-
skippedRecords = 0L;
500-
}
501468
if (!tracker.tryClaim(rawRecord.offset())) {
502469
return ProcessContinuation.stop();
503470
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThrows;
2223
import static org.junit.Assert.assertTrue;
2324

2425
import java.nio.charset.StandardCharsets;
@@ -523,12 +524,9 @@ public void testProcessElementWithEarlierOffset() throws Exception {
523524
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
524525
KafkaSourceDescriptor descriptor =
525526
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
526-
ProcessContinuation result =
527-
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
528-
assertEquals(ProcessContinuation.stop(), result);
529-
assertEquals(
530-
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
531-
receiver.getGoodRecords());
527+
assertThrows(
528+
IllegalArgumentException.class,
529+
() -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver));
532530
}
533531

534532
@Test

0 commit comments

Comments
 (0)