@@ -447,10 +447,10 @@ public ProcessContinuation processElement(
447447 long skippedRecords = 0L ;
448448 final Stopwatch sw = Stopwatch .createStarted ();
449449
450+ KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
450451 while (true ) {
451452 // Fetch the record size accumulator.
452453 final MovingAvg avgRecordSize = avgRecordSizeCache .getUnchecked (kafkaSourceDescriptor );
453- KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
454454 try {
455455 rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition (), kafkaMetrics );
456456 // When there are no records available for the current TopicPartition, self-checkpoint
@@ -514,9 +514,7 @@ public ProcessContinuation processElement(
514514 int recordSize =
515515 (rawRecord .key () == null ? 0 : rawRecord .key ().length )
516516 + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
517- avgRecordSizeCache
518- .getUnchecked (kafkaSourceDescriptor )
519- .update (recordSize , rawRecord .offset () - expectedOffset );
517+ avgRecordSize .update (recordSize );
520518 rawSizes .update (recordSize );
521519 expectedOffset = rawRecord .offset () + 1 ;
522520 Instant outputTimestamp ;
@@ -566,7 +564,7 @@ public ProcessContinuation processElement(
566564 .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
567565 .doubleValue ()
568566 * avgRecordSize .get ()));
569- } finally {
567+ } finally {
570568 kafkaMetrics .flushBufferedMetrics ();
571569 }
572570 }
@@ -586,9 +584,10 @@ private ConsumerRecords<byte[], byte[]> poll(
586584 final Stopwatch sw = Stopwatch .createStarted ();
587585 long previousPosition = -1 ;
588586 java .time .Duration timeout = java .time .Duration .ofSeconds (this .consumerPollingTimeout );
587+ java .time .Duration elapsed = java .time .Duration .ZERO ;
589588 while (true ) {
590- java .time .Duration elapsed = sw .elapsed ();
591589 final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (timeout .minus (elapsed ));
590+ elapsed = sw .elapsed ();
592591 kafkaMetrics .updateSuccessfulRpcMetrics (
593592 topicPartition .topic (), java .time .Duration .ofMillis (elapsed .toMillis ()));
594593 if (!rawRecords .isEmpty ()) {
0 commit comments