@@ -447,123 +447,126 @@ public ProcessContinuation processElement(
447447 long skippedRecords = 0L ;
448448 final Stopwatch sw = Stopwatch .createStarted ();
449449
450- while (true ) {
451- // Fetch the record size accumulator.
452- final MovingAvg avgRecordSize = avgRecordSizeCache .getUnchecked (kafkaSourceDescriptor );
453- rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition ());
454- // When there are no records available for the current TopicPartition, self-checkpoint
455- // and move to process the next element.
456- if (rawRecords .isEmpty ()) {
457- if (!topicPartitionExists (
458- kafkaSourceDescriptor .getTopicPartition (),
459- consumer .partitionsFor (kafkaSourceDescriptor .getTopic ()))) {
460- return ProcessContinuation .stop ();
461- }
462- if (timestampPolicy != null ) {
463- updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
464- }
465- return ProcessContinuation .resume ();
466- }
467- for (ConsumerRecord <byte [], byte []> rawRecord : rawRecords ) {
468- // If the Kafka consumer returns a record with an offset that is already processed
469- // the record can be safely skipped. This is needed because there is a possibility
470- // that the seek() above fails to move the offset to the desired position. In which
471- // case poll() would return records that are already cnsumed.
472- if (rawRecord .offset () < startOffset ) {
473- // If the start offset is not reached even after skipping the records for 10 seconds
474- // then the processing is stopped with a backoff to give the Kakfa server some time
475- // catch up.
476- if (sw .elapsed ().getSeconds () > 10L ) {
477- LOG .error (
478- "The expected offset ({}) was not reached even after"
479- + " skipping consumed records for 10 seconds. The offset we could"
480- + " reach was {}. The processing of this bundle will be attempted"
481- + " at a later time." ,
482- expectedOffset ,
483- rawRecord .offset ());
484- return ProcessContinuation .resume ()
485- .withResumeDelay (org .joda .time .Duration .standardSeconds (10L ));
486- }
487- skippedRecords ++;
488- continue ;
489- }
490- if (skippedRecords > 0L ) {
491- LOG .warn (
492- "{} records were skipped due to seek returning an"
493- + " earlier position than requested position of {}" ,
494- skippedRecords ,
495- expectedOffset );
496- skippedRecords = 0L ;
497- }
498- if (!tracker .tryClaim (rawRecord .offset ())) {
499- return ProcessContinuation .stop ();
500- }
501- try {
502- KafkaRecord <K , V > kafkaRecord =
503- new KafkaRecord <>(
504- rawRecord .topic (),
505- rawRecord .partition (),
506- rawRecord .offset (),
507- ConsumerSpEL .getRecordTimestamp (rawRecord ),
508- ConsumerSpEL .getRecordTimestampType (rawRecord ),
509- ConsumerSpEL .hasHeaders () ? rawRecord .headers () : null ,
510- ConsumerSpEL .deserializeKey (keyDeserializerInstance , rawRecord ),
511- ConsumerSpEL .deserializeValue (valueDeserializerInstance , rawRecord ));
512- int recordSize =
513- (rawRecord .key () == null ? 0 : rawRecord .key ().length )
514- + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
515- avgRecordSize .update (recordSize );
516- rawSizes .update (recordSize );
517- expectedOffset = rawRecord .offset () + 1 ;
518- Instant outputTimestamp ;
519- // The outputTimestamp and watermark will be computed by timestampPolicy, where the
520- // WatermarkEstimator should be a manual one.
521- if (timestampPolicy != null ) {
522- TimestampPolicyContext context =
523- updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
524- outputTimestamp = timestampPolicy .getTimestampForRecord (context , kafkaRecord );
525- } else {
526- Preconditions .checkStateNotNull (this .extractOutputTimestampFn );
527- outputTimestamp = extractOutputTimestampFn .apply (kafkaRecord );
450+ KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
451+ try {
452+ while (true ) {
453+ // Fetch the record size accumulator.
454+ final MovingAvg avgRecordSize = avgRecordSizeCache .getUnchecked (kafkaSourceDescriptor );
455+ rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition (), kafkaMetrics );
456+ // When there are no records available for the current TopicPartition, self-checkpoint
457+ // and move to process the next element.
458+ if (rawRecords .isEmpty ()) {
459+ if (!topicPartitionExists (
460+ kafkaSourceDescriptor .getTopicPartition (),
461+ consumer .partitionsFor (kafkaSourceDescriptor .getTopic ()))) {
462+ return ProcessContinuation .stop ();
528463 }
529- receiver
530- .get (recordTag )
531- .outputWithTimestamp (KV .of (kafkaSourceDescriptor , kafkaRecord ), outputTimestamp );
532- } catch (SerializationException e ) {
533- // This exception should only occur during the key and value deserialization when
534- // creating the Kafka Record
535- badRecordRouter .route (
536- receiver ,
537- rawRecord ,
538- null ,
539- e ,
540- "Failure deserializing Key or Value of Kakfa record reading from Kafka" );
541464 if (timestampPolicy != null ) {
542465 updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
543466 }
467+ return ProcessContinuation .resume ();
468+ }
469+ for (ConsumerRecord <byte [], byte []> rawRecord : rawRecords ) {
470+ // If the Kafka consumer returns a record with an offset that is already processed
471+ // the record can be safely skipped. This is needed because there is a possibility
472+ // that the seek() above fails to move the offset to the desired position. In which
473+ // case poll() would return records that are already cnsumed.
474+ if (rawRecord .offset () < startOffset ) {
475+ // If the start offset is not reached even after skipping the records for 10 seconds
476+ // then the processing is stopped with a backoff to give the Kakfa server some time
477+ // catch up.
478+ if (sw .elapsed ().getSeconds () > 10L ) {
479+ LOG .error (
480+ "The expected offset ({}) was not reached even after"
481+ + " skipping consumed records for 10 seconds. The offset we could"
482+ + " reach was {}. The processing of this bundle will be attempted"
483+ + " at a later time." ,
484+ expectedOffset ,
485+ rawRecord .offset ());
486+ return ProcessContinuation .resume ()
487+ .withResumeDelay (org .joda .time .Duration .standardSeconds (10L ));
488+ }
489+ skippedRecords ++;
490+ continue ;
491+ }
492+ if (skippedRecords > 0L ) {
493+ LOG .warn (
494+ "{} records were skipped due to seek returning an"
495+ + " earlier position than requested position of {}" ,
496+ skippedRecords ,
497+ expectedOffset );
498+ skippedRecords = 0L ;
499+ }
500+ if (!tracker .tryClaim (rawRecord .offset ())) {
501+ return ProcessContinuation .stop ();
502+ }
503+ try {
504+ KafkaRecord <K , V > kafkaRecord =
505+ new KafkaRecord <>(
506+ rawRecord .topic (),
507+ rawRecord .partition (),
508+ rawRecord .offset (),
509+ ConsumerSpEL .getRecordTimestamp (rawRecord ),
510+ ConsumerSpEL .getRecordTimestampType (rawRecord ),
511+ ConsumerSpEL .hasHeaders () ? rawRecord .headers () : null ,
512+ ConsumerSpEL .deserializeKey (keyDeserializerInstance , rawRecord ),
513+ ConsumerSpEL .deserializeValue (valueDeserializerInstance , rawRecord ));
514+ int recordSize =
515+ (rawRecord .key () == null ? 0 : rawRecord .key ().length )
516+ + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
517+ avgRecordSize .update (recordSize );
518+ rawSizes .update (recordSize );
519+ expectedOffset = rawRecord .offset () + 1 ;
520+ Instant outputTimestamp ;
521+ // The outputTimestamp and watermark will be computed by timestampPolicy, where the
522+ // WatermarkEstimator should be a manual one.
523+ if (timestampPolicy != null ) {
524+ TimestampPolicyContext context =
525+ updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
526+ outputTimestamp = timestampPolicy .getTimestampForRecord (context , kafkaRecord );
527+ } else {
528+ Preconditions .checkStateNotNull (this .extractOutputTimestampFn );
529+ outputTimestamp = extractOutputTimestampFn .apply (kafkaRecord );
530+ }
531+ receiver
532+ .get (recordTag )
533+ .outputWithTimestamp (KV .of (kafkaSourceDescriptor , kafkaRecord ), outputTimestamp );
534+ } catch (SerializationException e ) {
535+ // This exception should only occur during the key and value deserialization when
536+ // creating the Kafka Record
537+ badRecordRouter .route (
538+ receiver ,
539+ rawRecord ,
540+ null ,
541+ e ,
542+ "Failure deserializing Key or Value of Kakfa record reading from Kafka" );
543+ if (timestampPolicy != null ) {
544+ updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
545+ }
546+ }
544547 }
545- }
546548
547- backlogBytes .set (
548- (long )
549- (BigDecimal .valueOf (
550- Preconditions .checkStateNotNull (
551- offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
552- .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
553- .doubleValue ()
554- * avgRecordSize .get ()));
555- KafkaMetrics kafkaResults = KafkaSinkMetrics .kafkaMetrics ();
556- kafkaResults .updateBacklogBytes (
557- kafkaSourceDescriptor .getTopic (),
558- kafkaSourceDescriptor .getPartition (),
559- (long )
560- (BigDecimal .valueOf (
561- Preconditions .checkStateNotNull (
562- offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
563- .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
564- .doubleValue ()
565- * avgRecordSize .get ()));
566- kafkaResults .flushBufferedMetrics ();
549+ backlogBytes .set (
550+ (long )
551+ (BigDecimal .valueOf (
552+ Preconditions .checkStateNotNull (
553+ offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
554+ .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
555+ .doubleValue ()
556+ * avgRecordSize .get ()));
557+ kafkaMetrics .updateBacklogBytes (
558+ kafkaSourceDescriptor .getTopic (),
559+ kafkaSourceDescriptor .getPartition (),
560+ (long )
561+ (BigDecimal .valueOf (
562+ Preconditions .checkStateNotNull (
563+ offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
564+ .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
565+ .doubleValue ()
566+ * avgRecordSize .get ()));
567+ }
568+ } finally {
569+ kafkaMetrics .flushBufferedMetrics ();
567570 }
568571 }
569572 }
@@ -577,13 +580,16 @@ private boolean topicPartitionExists(
577580
578581 // see https://github.com/apache/beam/issues/25962
579582 private ConsumerRecords <byte [], byte []> poll (
580- Consumer <byte [], byte []> consumer , TopicPartition topicPartition ) {
583+ Consumer <byte [], byte []> consumer , TopicPartition topicPartition , KafkaMetrics kafkaMetrics ) {
581584 final Stopwatch sw = Stopwatch .createStarted ();
582585 long previousPosition = -1 ;
583- java .time .Duration elapsed = java .time .Duration .ZERO ;
584586 java .time .Duration timeout = java .time .Duration .ofSeconds (this .consumerPollingTimeout );
587+ java .time .Duration elapsed = java .time .Duration .ZERO ;
585588 while (true ) {
586589 final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (timeout .minus (elapsed ));
590+ elapsed = sw .elapsed ();
591+ kafkaMetrics .updateSuccessfulRpcMetrics (
592+ topicPartition .topic (), java .time .Duration .ofMillis (elapsed .toMillis ()));
587593 if (!rawRecords .isEmpty ()) {
588594 // return as we have found some entries
589595 return rawRecords ;
@@ -592,7 +598,6 @@ private ConsumerRecords<byte[], byte[]> poll(
592598 // there was no progress on the offset/position, which indicates end of stream
593599 return rawRecords ;
594600 }
595- elapsed = sw .elapsed ();
596601 if (elapsed .toMillis () >= timeout .toMillis ()) {
597602 // timeout is over
598603 LOG .warn (
0 commit comments