Skip to content

Commit e72dc1e

Browse files
author
Naireen
committed
address comments
1 parent 552bf7f commit e72dc1e

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -447,11 +447,11 @@ public ProcessContinuation processElement(
447447
long skippedRecords = 0L;
448448
final Stopwatch sw = Stopwatch.createStarted();
449449

450-
while (true) {
451-
// Fetch the record size accumulator.
452-
final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor);
453-
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
454-
try {
450+
KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
451+
try {
452+
while (true) {
453+
// Fetch the record size accumulator.
454+
final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor);
455455
rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics);
456456
// When there are no records available for the current TopicPartition, self-checkpoint
457457
// and move to process the next element.
@@ -514,9 +514,7 @@ public ProcessContinuation processElement(
514514
int recordSize =
515515
(rawRecord.key() == null ? 0 : rawRecord.key().length)
516516
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
517-
avgRecordSizeCache
518-
.getUnchecked(kafkaSourceDescriptor)
519-
.update(recordSize, rawRecord.offset() - expectedOffset);
517+
avgRecordSize.update(recordSize);
520518
rawSizes.update(recordSize);
521519
expectedOffset = rawRecord.offset() + 1;
522520
Instant outputTimestamp;
@@ -566,9 +564,9 @@ public ProcessContinuation processElement(
566564
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
567565
.doubleValue()
568566
* avgRecordSize.get()));
569-
} finally {
570-
kafkaMetrics.flushBufferedMetrics();
571567
}
568+
} finally {
569+
kafkaMetrics.flushBufferedMetrics();
572570
}
573571
}
574572
}
@@ -586,9 +584,10 @@ private ConsumerRecords<byte[], byte[]> poll(
586584
final Stopwatch sw = Stopwatch.createStarted();
587585
long previousPosition = -1;
588586
java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout);
587+
java.time.Duration elapsed = java.time.Duration.ZERO;
589588
while (true) {
590-
java.time.Duration elapsed = sw.elapsed();
591589
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(timeout.minus(elapsed));
590+
elapsed = sw.elapsed();
592591
kafkaMetrics.updateSuccessfulRpcMetrics(
593592
topicPartition.topic(), java.time.Duration.ofMillis(elapsed.toMillis()));
594593
if (!rawRecords.isEmpty()) {

0 commit comments

Comments
 (0)