2121
2222import java .math .BigDecimal ;
2323import java .math .MathContext ;
24+ import java .time .Duration ;
2425import java .util .Collections ;
2526import java .util .HashMap ;
2627import java .util .List ;
140141 * {@link ReadFromKafkaDoFn} will stop reading from any removed {@link TopicPartition} automatically
141142 * by querying Kafka {@link Consumer} APIs. Please note that stopping reading may not happen as soon
142143 * as the {@link TopicPartition} is removed. For example, the removal could happen at the same time
143- * when {@link ReadFromKafkaDoFn} performs a {@link Consumer#poll(java.time. Duration)}. In that
144- * case, the {@link ReadFromKafkaDoFn} will still output the fetched records.
144+ * when {@link ReadFromKafkaDoFn} performs a {@link Consumer#poll(Duration)}. In that case, the
145+ * {@link ReadFromKafkaDoFn} will still output the fetched records.
145146 *
146147 * <h4>Stop Reading from Stopped {@link TopicPartition}</h4>
147148 *
@@ -199,11 +200,11 @@ private ReadFromKafkaDoFn(
199200 this .checkStopReadingFn = transform .getCheckStopReadingFn ();
200201 this .badRecordRouter = transform .getBadRecordRouter ();
201202 this .recordTag = recordTag ;
202- if ( transform . getConsumerPollingTimeout () > 0 ) {
203- this . consumerPollingTimeout = transform . getConsumerPollingTimeout ();
204- } else {
205- this . consumerPollingTimeout = DEFAULT_KAFKA_POLL_TIMEOUT ;
206- }
203+ this . consumerPollingTimeout =
204+ Duration . ofSeconds (
205+ transform . getConsumerPollingTimeout () > 0
206+ ? transform . getConsumerPollingTimeout ()
207+ : DEFAULT_KAFKA_POLL_TIMEOUT );
207208 }
208209
209210 private static final Logger LOG = LoggerFactory .getLogger (ReadFromKafkaDoFn .class );
@@ -248,7 +249,7 @@ private static final class SharedStateHolder {
248249
249250 private transient @ Nullable LoadingCache <KafkaSourceDescriptor , MovingAvg > avgRecordSizeCache ;
250251 private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L ;
251- @ VisibleForTesting final long consumerPollingTimeout ;
252+ @ VisibleForTesting final Duration consumerPollingTimeout ;
252253 @ VisibleForTesting final DeserializerProvider <K > keyDeserializerProvider ;
253254 @ VisibleForTesting final DeserializerProvider <V > valueDeserializerProvider ;
254255 @ VisibleForTesting final Map <String , Object > consumerConfig ;
@@ -443,19 +444,27 @@ public ProcessContinuation processElement(
443444 long startOffset = tracker .currentRestriction ().getFrom ();
444445 long expectedOffset = startOffset ;
445446 consumer .seek (kafkaSourceDescriptor .getTopicPartition (), startOffset );
446- ConsumerRecords <byte [], byte []> rawRecords = ConsumerRecords .empty ();
447447 long skippedRecords = 0L ;
448448 final Stopwatch sw = Stopwatch .createStarted ();
449449
450- KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
450+ final KafkaMetrics kafkaMetrics = KafkaSinkMetrics .kafkaMetrics ();
451451 try {
452452 while (true ) {
453453 // Fetch the record size accumulator.
454454 final MovingAvg avgRecordSize = avgRecordSizeCache .getUnchecked (kafkaSourceDescriptor );
455- rawRecords = poll (consumer , kafkaSourceDescriptor .getTopicPartition (), kafkaMetrics );
456- // When there are no records available for the current TopicPartition, self-checkpoint
457- // and move to process the next element.
458- if (rawRecords .isEmpty ()) {
455+ // TODO: Remove this timer and use the existing fetch-latency-avg metric.
456+ // A consumer will often have prefetches waiting to be returned immediately in which case
457+ // this timer may contribute more latency than it measures.
458+ // See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
459+ final Stopwatch pollTimer = Stopwatch .createStarted ();
460+ // Fetch the next records.
461+ final ConsumerRecords <byte [], byte []> rawRecords =
462+ consumer .poll (this .consumerPollingTimeout );
463+ kafkaMetrics .updateSuccessfulRpcMetrics (topicPartition .topic (), pollTimer .elapsed ());
464+
465+ // No progress when the polling timeout expired.
466+ // Self-checkpoint and move to process the next element.
467+ if (rawRecords == ConsumerRecords .<byte [], byte []>empty ()) {
459468 if (!topicPartitionExists (
460469 kafkaSourceDescriptor .getTopicPartition (),
461470 consumer .partitionsFor (kafkaSourceDescriptor .getTopic ()))) {
@@ -466,6 +475,9 @@ public ProcessContinuation processElement(
466475 }
467476 return ProcessContinuation .resume ();
468477 }
478+
479+ // Visible progress within the consumer polling timeout.
480+ // Partially or fully claim and process records in this batch.
469481 for (ConsumerRecord <byte [], byte []> rawRecord : rawRecords ) {
470482 // If the Kafka consumer returns a record with an offset that is already processed
471483 // the record can be safely skipped. This is needed because there is a possibility
@@ -500,6 +512,7 @@ public ProcessContinuation processElement(
500512 if (!tracker .tryClaim (rawRecord .offset ())) {
501513 return ProcessContinuation .stop ();
502514 }
515+ expectedOffset = rawRecord .offset () + 1 ;
503516 try {
504517 KafkaRecord <K , V > kafkaRecord =
505518 new KafkaRecord <>(
@@ -516,7 +529,6 @@ public ProcessContinuation processElement(
516529 + (rawRecord .value () == null ? 0 : rawRecord .value ().length );
517530 avgRecordSize .update (recordSize );
518531 rawSizes .update (recordSize );
519- expectedOffset = rawRecord .offset () + 1 ;
520532 Instant outputTimestamp ;
521533 // The outputTimestamp and watermark will be computed by timestampPolicy, where the
522534 // WatermarkEstimator should be a manual one.
@@ -546,6 +558,17 @@ public ProcessContinuation processElement(
546558 }
547559 }
548560
561+ // Non-visible progress within the consumer polling timeout.
562+ // Claim up to the current position.
563+ if (expectedOffset < (expectedOffset = consumer .position (topicPartition ))) {
564+ if (!tracker .tryClaim (expectedOffset - 1 )) {
565+ return ProcessContinuation .stop ();
566+ }
567+ if (timestampPolicy != null ) {
568+ updateWatermarkManually (timestampPolicy , watermarkEstimator , tracker );
569+ }
570+ }
571+
549572 backlogBytes .set (
550573 (long )
551574 (BigDecimal .valueOf (
@@ -578,36 +601,6 @@ private boolean topicPartitionExists(
578601 .anyMatch (partitionInfo -> partitionInfo .partition () == (topicPartition .partition ()));
579602 }
580603
581- // see https://github.com/apache/beam/issues/25962
582- private ConsumerRecords <byte [], byte []> poll (
583- Consumer <byte [], byte []> consumer , TopicPartition topicPartition , KafkaMetrics kafkaMetrics ) {
584- final Stopwatch sw = Stopwatch .createStarted ();
585- long previousPosition = -1 ;
586- java .time .Duration timeout = java .time .Duration .ofSeconds (this .consumerPollingTimeout );
587- java .time .Duration elapsed = java .time .Duration .ZERO ;
588- while (true ) {
589- final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (timeout .minus (elapsed ));
590- elapsed = sw .elapsed ();
591- kafkaMetrics .updateSuccessfulRpcMetrics (
592- topicPartition .topic (), java .time .Duration .ofMillis (elapsed .toMillis ()));
593- if (!rawRecords .isEmpty ()) {
594- // return as we have found some entries
595- return rawRecords ;
596- }
597- if (previousPosition == (previousPosition = consumer .position (topicPartition ))) {
598- // there was no progress on the offset/position, which indicates end of stream
599- return rawRecords ;
600- }
601- if (elapsed .toMillis () >= timeout .toMillis ()) {
602- // timeout is over
603- LOG .warn (
604- "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method." ,
605- consumerPollingTimeout );
606- return rawRecords ;
607- }
608- }
609- }
610-
611604 private TimestampPolicyContext updateWatermarkManually (
612605 TimestampPolicy <K , V > timestampPolicy ,
613606 WatermarkEstimator <Instant > watermarkEstimator ,
0 commit comments