2727import java .util .Optional ;
2828import java .util .Set ;
2929import java .util .concurrent .TimeUnit ;
30+ import javax .annotation .concurrent .GuardedBy ;
31+ import javax .annotation .concurrent .ThreadSafe ;
3032import org .apache .beam .sdk .coders .Coder ;
3133import org .apache .beam .sdk .io .kafka .KafkaIO .ReadSourceDescriptors ;
3234import org .apache .beam .sdk .io .kafka .KafkaIOUtils .MovingAvg ;
@@ -338,13 +340,18 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
338340 public double getSize (
339341 @ Element KafkaSourceDescriptor kafkaSourceDescriptor , @ Restriction OffsetRange offsetRange )
340342 throws Exception {
343+ // If present, estimates the record size to offset gap ratio. Compacted topics may hold less
344+ // records than the estimated offset range due to record deletion within a partition.
341345 final LoadingCache <TopicPartition , AverageRecordSize > avgRecordSize =
342346 Preconditions .checkStateNotNull (this .avgRecordSize );
343- double numRecords =
347+ // The tracker estimates the offset range by subtracting the last claimed position from the
348+ // currently observed end offset for the partition belonging to this split.
349+ double estimatedOffsetRange =
344350 restrictionTracker (kafkaSourceDescriptor , offsetRange ).getProgress ().getWorkRemaining ();
345351 // Before processing elements, we don't have a good estimated size of records and offset gap.
352+ // Return the estimated offset range without scaling by a size to gap ratio.
346353 if (!avgRecordSize .asMap ().containsKey (kafkaSourceDescriptor .getTopicPartition ())) {
347- return numRecords ;
354+ return estimatedOffsetRange ;
348355 }
349356 if (offsetEstimatorCache != null ) {
350357 for (Map .Entry <TopicPartition , KafkaLatestOffsetEstimator > tp :
@@ -353,7 +360,12 @@ public double getSize(
353360 }
354361 }
355362
356- return avgRecordSize .get (kafkaSourceDescriptor .getTopicPartition ()).getTotalSize (numRecords );
363+ // When processing elements, a moving average estimates the size of records and offset gap.
364+ // Return the estimated offset range scaled by the estimated size to gap ratio.
365+ return estimatedOffsetRange
366+ * avgRecordSize
367+ .get (kafkaSourceDescriptor .getTopicPartition ())
368+ .estimateRecordByteSizeToOffsetCountRatio ();
357369 }
358370
359371 @ NewTracker
@@ -665,22 +677,42 @@ private Map<String, Object> overrideBootstrapServersConfig(
665677 return config ;
666678 }
667679
680+ // TODO: Collapse the two moving average trackers into a single accumulator using a single Guava
681+ // AtomicDouble. Note that this requires that a single thread will call update and that while get
682+ // may be called by multiple threads the method must only load the accumulator itself.
683+ @ ThreadSafe
668684 private static class AverageRecordSize {
685+ @ GuardedBy ("this" )
669686 private MovingAvg avgRecordSize ;
687+
688+ @ GuardedBy ("this" )
670689 private MovingAvg avgRecordGap ;
671690
672691 public AverageRecordSize () {
673692 this .avgRecordSize = new MovingAvg ();
674693 this .avgRecordGap = new MovingAvg ();
675694 }
676695
677- public void update (int recordSize , long gap ) {
696+ public synchronized void update (int recordSize , long gap ) {
678697 avgRecordSize .update (recordSize );
679698 avgRecordGap .update (gap );
680699 }
681700
682- public double getTotalSize (double numRecords ) {
683- return avgRecordSize .get () * numRecords / (1 + avgRecordGap .get ());
701+ public double estimateRecordByteSizeToOffsetCountRatio () {
702+ double avgRecordSize ;
703+ double avgRecordGap ;
704+
705+ synchronized (this ) {
706+ avgRecordSize = this .avgRecordSize .get ();
707+ avgRecordGap = this .avgRecordGap .get ();
708+ }
709+
710+ // The offset increases between records in a batch fetched from a compacted topic may be
711+ // greater than 1. Compacted topics only store records with the greatest offset per key per
712+ // partition, the records in between are deleted and will not be observed by a consumer.
713+ // The observed gap between offsets is used to estimate the number of records that are likely
714+ // to be observed for the provided number of records.
715+ return avgRecordSize / (1 + avgRecordGap );
684716 }
685717 }
686718
0 commit comments