@@ -451,121 +451,124 @@ public ProcessContinuation processElement(
451451 // Fetch the record size accumulator.
452452 final MovingAvg avgRecordSize = avgRecordSizeCache .getUnchecked (kafkaSourceDescriptor );
453453 KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
454- rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition ());
455- kafkaResults .flushBufferedMetrics ();
456- // When there are no records available for the current TopicPartition, self-checkpoint
457- // and move to process the next element.
458- if (rawRecords .isEmpty ()) {
459- if (!topicPartitionExists (
460- kafkaSourceDescriptor .getTopicPartition (),
461- consumer .partitionsFor (kafkaSourceDescriptor .getTopic ()))) {
462- return ProcessContinuation .stop ();
463- }
464- if (timestampPolicy != null ) {
465- updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
466- }
467- return ProcessContinuation .resume ();
468- }
469- for (ConsumerRecord <byte [], byte []> rawRecord : rawRecords ) {
470- // If the Kafka consumer returns a record with an offset that is already processed
471- // the record can be safely skipped. This is needed because there is a possibility
472- // that the seek() above fails to move the offset to the desired position. In which
473- // case poll() would return records that are already cnsumed.
474- if (rawRecord .offset () < startOffset ) {
475- // If the start offset is not reached even after skipping the records for 10 seconds
476- // then the processing is stopped with a backoff to give the Kakfa server some time
477- // catch up.
478- if (sw .elapsed ().getSeconds () > 10L ) {
479- LOG .error (
480- "The expected offset ({}) was not reached even after"
481- + " skipping consumed records for 10 seconds. The offset we could"
482- + " reach was {}. The processing of this bundle will be attempted"
483- + " at a later time." ,
484- expectedOffset ,
485- rawRecord .offset ());
486- return ProcessContinuation .resume ()
487- .withResumeDelay (org .joda .time .Duration .standardSeconds (10L ));
488- }
489- skippedRecords ++;
490- continue ;
491- }
492- if (skippedRecords > 0L ) {
493- LOG .warn (
494- "{} records were skipped due to seek returning an"
495- + " earlier position than requested position of {}" ,
496- skippedRecords ,
497- expectedOffset );
498- skippedRecords = 0L ;
499- }
500- if (!tracker .tryClaim (rawRecord .offset ())) {
501- return ProcessContinuation .stop ();
502- }
503- try {
504- KafkaRecord <K , V > kafkaRecord =
505- new KafkaRecord <>(
506- rawRecord .topic (),
507- rawRecord .partition (),
508- rawRecord .offset (),
509- ConsumerSpEL .getRecordTimestamp (rawRecord ),
510- ConsumerSpEL .getRecordTimestampType (rawRecord ),
511- ConsumerSpEL .hasHeaders () ? rawRecord .headers () : null ,
512- ConsumerSpEL .deserializeKey (keyDeserializerInstance , rawRecord ),
513- ConsumerSpEL .deserializeValue (valueDeserializerInstance , rawRecord ));
514- int recordSize =
515- (rawRecord .key () == null ? 0 : rawRecord .key ().length )
516- + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
517- avgRecordSize .update (recordSize );
518- rawSizes .update (recordSize );
519- expectedOffset = rawRecord .offset () + 1 ;
520- Instant outputTimestamp ;
521- // The outputTimestamp and watermark will be computed by timestampPolicy, where the
522- // WatermarkEstimator should be a manual one.
523- if (timestampPolicy != null ) {
524- TimestampPolicyContext context =
525- updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
526- outputTimestamp = timestampPolicy .getTimestampForRecord (context , kafkaRecord );
527- } else {
528- Preconditions .checkStateNotNull (this .extractOutputTimestampFn );
529- outputTimestamp = extractOutputTimestampFn .apply (kafkaRecord );
454+ try {
455+ rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition (), kafkaMetrics );
456+ // When there are no records available for the current TopicPartition, self-checkpoint
457+ // and move to process the next element.
458+ if (rawRecords .isEmpty ()) {
459+ if (!topicPartitionExists (
460+ kafkaSourceDescriptor .getTopicPartition (),
461+ consumer .partitionsFor (kafkaSourceDescriptor .getTopic ()))) {
462+ return ProcessContinuation .stop ();
530463 }
531- receiver
532- .get (recordTag )
533- .outputWithTimestamp (KV .of (kafkaSourceDescriptor , kafkaRecord ), outputTimestamp );
534- } catch (SerializationException e ) {
535- // This exception should only occur during the key and value deserialization when
536- // creating the Kafka Record
537- badRecordRouter .route (
538- receiver ,
539- rawRecord ,
540- null ,
541- e ,
542- "Failure deserializing Key or Value of Kakfa record reading from Kafka" );
543464 if (timestampPolicy != null ) {
544465 updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
545466 }
467+ return ProcessContinuation .resume ();
468+ }
469+ for (ConsumerRecord <byte [], byte []> rawRecord : rawRecords ) {
470+ // If the Kafka consumer returns a record with an offset that is already processed
471+ // the record can be safely skipped. This is needed because there is a possibility
472+ // that the seek() above fails to move the offset to the desired position. In which
473+ // case poll() would return records that are already cnsumed.
474+ if (rawRecord .offset () < startOffset ) {
475+ // If the start offset is not reached even after skipping the records for 10 seconds
476+ // then the processing is stopped with a backoff to give the Kakfa server some time
477+ // catch up.
478+ if (sw .elapsed ().getSeconds () > 10L ) {
479+ LOG .error (
480+ "The expected offset ({}) was not reached even after"
481+ + " skipping consumed records for 10 seconds. The offset we could"
482+ + " reach was {}. The processing of this bundle will be attempted"
483+ + " at a later time." ,
484+ expectedOffset ,
485+ rawRecord .offset ());
486+ return ProcessContinuation .resume ()
487+ .withResumeDelay (org .joda .time .Duration .standardSeconds (10L ));
488+ }
489+ skippedRecords ++;
490+ continue ;
491+ }
492+ if (skippedRecords > 0L ) {
493+ LOG .warn (
494+ "{} records were skipped due to seek returning an"
495+ + " earlier position than requested position of {}" ,
496+ skippedRecords ,
497+ expectedOffset );
498+ skippedRecords = 0L ;
499+ }
500+ if (!tracker .tryClaim (rawRecord .offset ())) {
501+ return ProcessContinuation .stop ();
502+ }
503+ try {
504+ KafkaRecord <K , V > kafkaRecord =
505+ new KafkaRecord <>(
506+ rawRecord .topic (),
507+ rawRecord .partition (),
508+ rawRecord .offset (),
509+ ConsumerSpEL .getRecordTimestamp (rawRecord ),
510+ ConsumerSpEL .getRecordTimestampType (rawRecord ),
511+ ConsumerSpEL .hasHeaders () ? rawRecord .headers () : null ,
512+ ConsumerSpEL .deserializeKey (keyDeserializerInstance , rawRecord ),
513+ ConsumerSpEL .deserializeValue (valueDeserializerInstance , rawRecord ));
514+ int recordSize =
515+ (rawRecord .key () == null ? 0 : rawRecord .key ().length )
516+ + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
517+ avgRecordSizeCache
518+ .getUnchecked (kafkaSourceDescriptor )
519+ .update (recordSize , rawRecord .offset () - expectedOffset );
520+ rawSizes .update (recordSize );
521+ expectedOffset = rawRecord .offset () + 1 ;
522+ Instant outputTimestamp ;
523+ // The outputTimestamp and watermark will be computed by timestampPolicy, where the
524+ // WatermarkEstimator should be a manual one.
525+ if (timestampPolicy != null ) {
526+ TimestampPolicyContext context =
527+ updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
528+ outputTimestamp = timestampPolicy .getTimestampForRecord (context , kafkaRecord );
529+ } else {
530+ Preconditions .checkStateNotNull (this .extractOutputTimestampFn );
531+ outputTimestamp = extractOutputTimestampFn .apply (kafkaRecord );
532+ }
533+ receiver
534+ .get (recordTag )
535+ .outputWithTimestamp (KV .of (kafkaSourceDescriptor , kafkaRecord ), outputTimestamp );
536+ } catch (SerializationException e ) {
537+ // This exception should only occur during the key and value deserialization when
538+ // creating the Kafka Record
539+ badRecordRouter .route (
540+ receiver ,
541+ rawRecord ,
542+ null ,
543+ e ,
544+ "Failure deserializing Key or Value of Kakfa record reading from Kafka" );
545+ if (timestampPolicy != null ) {
546+ updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
547+ }
548+ }
546549 }
547- }
548550
549- backlogBytes .set (
550- (long )
551- (BigDecimal .valueOf (
552- Preconditions .checkStateNotNull (
553- offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
554- .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
555- .doubleValue ()
556- * avgRecordSize .get ()));
557- kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
558- kafkaMetrics .updateBacklogBytes (
559- kafkaSourceDescriptor .getTopic (),
560- kafkaSourceDescriptor .getPartition (),
561- (long )
562- (BigDecimal .valueOf (
563- Preconditions .checkStateNotNull (
564- offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
565- .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
566- .doubleValue ()
567- * avgRecordSize .get ()));
568- kafkaMetrics .flushBufferedMetrics ();
551+ backlogBytes .set (
552+ (long )
553+ (BigDecimal .valueOf (
554+ Preconditions .checkStateNotNull (
555+ offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
556+ .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
557+ .doubleValue ()
558+ * avgRecordSize .get ()));
559+ kafkaMetrics .updateBacklogBytes (
560+ kafkaSourceDescriptor .getTopic (),
561+ kafkaSourceDescriptor .getPartition (),
562+ (long )
563+ (BigDecimal .valueOf (
564+ Preconditions .checkStateNotNull (
565+ offsetEstimatorCache .get (kafkaSourceDescriptor ).estimate ()))
566+ .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
567+ .doubleValue ()
568+ * avgRecordSize .get ()));
569+ } finally {
570+ kafkaMetrics .flushBufferedMetrics ();
571+ }
569572 }
570573 }
571574 }
@@ -582,12 +585,12 @@ private ConsumerRecords<byte[], byte[]> poll(
582585 Consumer <byte [], byte []> consumer , TopicPartition topicPartition , KafkaMetrics kafkaMetrics ) {
583586 final Stopwatch sw = Stopwatch .createStarted ();
584587 long previousPosition = -1 ;
585- java .time .Duration elapsed = java .time .Duration .ZERO ;
586588 java .time .Duration timeout = java .time .Duration .ofSeconds (this .consumerPollingTimeout );
587589 while (true ) {
590+ java .time .Duration elapsed = sw .elapsed ();
588591 final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (timeout .minus (elapsed ));
589- elapsed = sw . elapsed ();
590- kafkaMetrics . updateSuccessfulRpcMetrics ( topicPartition .topic (), elapsed );
592+ kafkaMetrics . updateSuccessfulRpcMetrics (
593+ topicPartition .topic (), java . time . Duration . ofMillis ( elapsed . toMillis ()) );
591594 if (!rawRecords .isEmpty ()) {
592595 // return as we have found some entries
593596 return rawRecords ;
@@ -596,7 +599,6 @@ private ConsumerRecords<byte[], byte[]> poll(
596599 // there was no progress on the offset/position, which indicates end of stream
597600 return rawRecords ;
598601 }
599- elapsed = sw .elapsed ();
600602 if (elapsed .toMillis () >= timeout .toMillis ()) {
601603 // timeout is over
602604 LOG .warn (
0 commit comments