Skip to content

Commit 0f4f98d

Browse files
authored
[KafkaIO] Remove duplicate offset in range check (#34201)
* Remove obsolete offset within range check which is only present in ancient kafka client versions
1 parent 00539f7 commit 0f4f98d

File tree

3 files changed

+13
-62
lines changed

3 files changed

+13
-62
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,8 @@ public boolean start() throws IOException {
151151
@Override
152152
public boolean advance() throws IOException {
153153
/* Read first record (if any). we need to loop here because :
154-
* - (a) some records initially need to be skipped if they are before consumedOffset
155-
* - (b) if curBatch is empty, we want to fetch next batch and then advance.
156-
* - (c) curBatch is an iterator of iterators. we interleave the records from each.
154+
* - (a) if curBatch is empty, we want to fetch next batch and then advance.
155+
* - (b) curBatch is an iterator of iterators. we interleave the records from each.
157156
* curBatch.next() might return an empty iterator.
158157
*/
159158
while (true) {
@@ -163,7 +162,7 @@ public boolean advance() throws IOException {
163162

164163
PartitionState<K, V> pState = curBatch.next();
165164

166-
if (!pState.recordIter.hasNext()) { // -- (c)
165+
if (!pState.recordIter.hasNext()) { // -- (b)
167166
pState.recordIter = Collections.emptyIterator(); // drop ref
168167
curBatch.remove();
169168
continue;
@@ -173,19 +172,6 @@ public boolean advance() throws IOException {
173172
elementsReadBySplit.inc();
174173

175174
ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
176-
long expected = pState.nextOffset;
177-
long offset = rawRecord.offset();
178-
179-
if (offset < expected) { // -- (a)
180-
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
181-
// should we check if the offset is way off from consumedOffset (say > 1M)?
182-
LOG.warn(
183-
"{}: ignoring already consumed offset {} for {}",
184-
this,
185-
offset,
186-
pState.topicPartition);
187-
continue;
188-
}
189175

190176
// Apply user deserializers. User deserializers might throw, which will be propagated up
191177
// and 'curRecord' remains unchanged. The runner should close this reader.
@@ -212,7 +198,7 @@ public boolean advance() throws IOException {
212198
int recordSize =
213199
(rawRecord.key() == null ? 0 : rawRecord.key().length)
214200
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
215-
pState.recordConsumed(offset, recordSize);
201+
pState.recordConsumed(rawRecord.offset(), recordSize);
216202
bytesRead.inc(recordSize);
217203
bytesReadBySplit.inc(recordSize);
218204

@@ -223,7 +209,7 @@ public boolean advance() throws IOException {
223209

224210
kafkaResults.flushBufferedMetrics();
225211
return true;
226-
} else { // -- (b)
212+
} else { // -- (a)
227213
kafkaResults = KafkaSinkMetrics.kafkaMetrics();
228214
nextBatch();
229215
kafkaResults.flushBufferedMetrics();

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -588,12 +588,10 @@ public ProcessContinuation processElement(
588588
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
589589
}
590590

591-
long startOffset = tracker.currentRestriction().getFrom();
592-
long expectedOffset = startOffset;
591+
long expectedOffset = tracker.currentRestriction().getFrom();
593592
consumer.resume(Collections.singleton(topicPartition));
594-
consumer.seek(topicPartition, startOffset);
595-
long skippedRecords = 0L;
596-
final Stopwatch sw = Stopwatch.createStarted();
593+
consumer.seek(topicPartition, expectedOffset);
594+
final Stopwatch pollTimer = Stopwatch.createUnstarted();
597595

598596
final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
599597
try {
@@ -602,7 +600,7 @@ public ProcessContinuation processElement(
602600
// A consumer will often have prefetches waiting to be returned immediately in which case
603601
// this timer may contribute more latency than it measures.
604602
// See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
605-
final Stopwatch pollTimer = Stopwatch.createStarted();
603+
pollTimer.reset().start();
606604
// Fetch the next records.
607605
final ConsumerRecords<byte[], byte[]> rawRecords =
608606
consumer.poll(this.consumerPollingTimeout);
@@ -627,37 +625,6 @@ public ProcessContinuation processElement(
627625
// Visible progress within the consumer polling timeout.
628626
// Partially or fully claim and process records in this batch.
629627
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
630-
// If the Kafka consumer returns a record with an offset that is already processed
631-
// the record can be safely skipped. This is needed because there is a possibility
632-
// that the seek() above fails to move the offset to the desired position. In which
633-
// case poll() would return records that are already cnsumed.
634-
if (rawRecord.offset() < startOffset) {
635-
// If the start offset is not reached even after skipping the records for 10 seconds
636-
// then the processing is stopped with a backoff to give the Kakfa server some time
637-
// catch up.
638-
if (sw.elapsed().getSeconds() > 10L) {
639-
LOG.error(
640-
"The expected offset ({}) was not reached even after"
641-
+ " skipping consumed records for 10 seconds. The offset we could"
642-
+ " reach was {}. The processing of this bundle will be attempted"
643-
+ " at a later time.",
644-
expectedOffset,
645-
rawRecord.offset());
646-
consumer.pause(Collections.singleton(topicPartition));
647-
return ProcessContinuation.resume()
648-
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
649-
}
650-
skippedRecords++;
651-
continue;
652-
}
653-
if (skippedRecords > 0L) {
654-
LOG.warn(
655-
"{} records were skipped due to seek returning an"
656-
+ " earlier position than requested position of {}",
657-
skippedRecords,
658-
expectedOffset);
659-
skippedRecords = 0L;
660-
}
661628
if (!tracker.tryClaim(rawRecord.offset())) {
662629
consumer.seek(topicPartition, rawRecord.offset());
663630
consumer.pause(Collections.singleton(topicPartition));

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThrows;
2223
import static org.junit.Assert.assertTrue;
2324

2425
import java.nio.charset.StandardCharsets;
@@ -525,12 +526,9 @@ public void testProcessElementWithEarlierOffset() throws Exception {
525526
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
526527
KafkaSourceDescriptor descriptor =
527528
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
528-
ProcessContinuation result =
529-
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
530-
assertEquals(ProcessContinuation.stop(), result);
531-
assertEquals(
532-
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
533-
receiver.getGoodRecords());
529+
assertThrows(
530+
IllegalArgumentException.class,
531+
() -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver));
534532
}
535533

536534
@Test

0 commit comments

Comments
 (0)