1919
2020import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
2121
22- import java .io .Closeable ;
2322import java .math .BigDecimal ;
2423import java .math .MathContext ;
2524import java .time .Duration ;
2625import java .util .Collections ;
2726import java .util .List ;
2827import java .util .Map ;
2928import java .util .Optional ;
30- import java .util .function . Supplier ;
29+ import java .util .concurrent . atomic . AtomicLong ;
3130import org .apache .beam .sdk .coders .Coder ;
3231import org .apache .beam .sdk .io .kafka .KafkaIO .ReadSourceDescriptors ;
3332import org .apache .beam .sdk .io .kafka .KafkaIOUtils .MovingAvg ;
4847import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimator ;
4948import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimators .MonotonicallyIncreasing ;
5049import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
51- import org .apache .beam .sdk .util .ExpiringMemoizingSerializableSupplier ;
5250import org .apache .beam .sdk .util .MemoizingPerInstantiationSerializableSupplier ;
5351import org .apache .beam .sdk .util .Preconditions ;
5452import org .apache .beam .sdk .util .SerializableSupplier ;
7068import org .apache .kafka .clients .consumer .ConsumerConfig ;
7169import org .apache .kafka .clients .consumer .ConsumerRecord ;
7270import org .apache .kafka .clients .consumer .ConsumerRecords ;
71+ import org .apache .kafka .common .KafkaException ;
7372import org .apache .kafka .common .PartitionInfo ;
7473import org .apache .kafka .common .TopicPartition ;
7574import org .apache .kafka .common .config .ConfigDef ;
@@ -225,30 +224,12 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor)
225224 CacheBuilder .newBuilder ()
226225 .concurrencyLevel (Runtime .getRuntime ().availableProcessors ())
227226 .weakValues ()
228- .removalListener (
229- (RemovalNotification <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
230- notification ) -> {
231- final @ Nullable KafkaLatestOffsetEstimator value ;
232- if (notification .getCause () == RemovalCause .COLLECTED
233- && (value = notification .getValue ()) != null ) {
234- value .close ();
235- }
236- })
237227 .build (
238- new CacheLoader <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >() {
228+ new CacheLoader <KafkaSourceDescriptor , AtomicLong >() {
239229 @ Override
240- public KafkaLatestOffsetEstimator load (
241- final KafkaSourceDescriptor sourceDescriptor ) {
242- LOG .info (
243- "Creating Kafka consumer for offset estimation for {}" ,
244- sourceDescriptor );
245- final Map <String , Object > config =
246- KafkaIOUtils .overrideBootstrapServersConfig (
247- consumerConfig , sourceDescriptor );
248- final Consumer <byte [], byte []> consumer =
249- consumerFactoryFn .apply (config );
250- return new KafkaLatestOffsetEstimator (
251- consumer , sourceDescriptor .getTopicPartition ());
230+ public AtomicLong load (final KafkaSourceDescriptor sourceDescriptor ) {
231+ LOG .info ("Creating end offset estimator for {}" , sourceDescriptor );
232+ return new AtomicLong (Long .MIN_VALUE );
252233 }
253234 }));
254235 this .pollConsumerCacheSupplier =
@@ -309,8 +290,7 @@ public Consumer<byte[], byte[]> load(
309290 private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , MovingAvg >>
310291 avgRecordSizeCacheSupplier ;
311292
312- private final SerializableSupplier <
313- LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >>
293+ private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , AtomicLong >>
314294 latestOffsetEstimatorCacheSupplier ;
315295
316296 private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>>>
@@ -319,8 +299,7 @@ public Consumer<byte[], byte[]> load(
319299 private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , MovingAvg >
320300 avgRecordSizeCache ;
321301
322- private transient @ MonotonicNonNull LoadingCache <
323- KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
302+ private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , AtomicLong >
324303 latestOffsetEstimatorCache ;
325304
326305 private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>>
@@ -339,46 +318,6 @@ public Consumer<byte[], byte[]> load(
339318 @ VisibleForTesting
340319 static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader .RAW_SIZE_METRIC_PREFIX ;
341320
342- /**
343- * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
344- * fetch backlog.
345- */
346- private static class KafkaLatestOffsetEstimator
347- implements GrowableOffsetRangeTracker .RangeEndEstimator , Closeable {
348- private final Consumer <byte [], byte []> offsetConsumer ;
349- private final Supplier <Long > offsetSupplier ;
350-
351- KafkaLatestOffsetEstimator (
352- final Consumer <byte [], byte []> offsetConsumer , final TopicPartition topicPartition ) {
353- this .offsetConsumer = offsetConsumer ;
354- this .offsetSupplier =
355- new ExpiringMemoizingSerializableSupplier <>(
356- () -> {
357- try {
358- return offsetConsumer
359- .endOffsets (Collections .singleton (topicPartition ))
360- .getOrDefault (topicPartition , Long .MIN_VALUE );
361- } catch (Throwable t ) {
362- LOG .error ("Failed to get end offset for {}" , topicPartition , t );
363- return Long .MIN_VALUE ;
364- }
365- },
366- Duration .ofSeconds (1 ),
367- Long .MIN_VALUE ,
368- Duration .ZERO );
369- }
370-
371- @ Override
372- public long estimate () {
373- return offsetSupplier .get ();
374- }
375-
376- @ Override
377- public void close () {
378- offsetConsumer .close ();
379- }
380- }
381-
382321 @ GetInitialRestriction
383322 @ RequiresNonNull ({"pollConsumerCache" })
384323 public OffsetRange initialRestriction (@ Element KafkaSourceDescriptor kafkaSourceDescriptor ) {
@@ -490,8 +429,8 @@ public double getSize(
490429 @ RequiresNonNull ({"latestOffsetEstimatorCache" })
491430 public OffsetRangeTracker restrictionTracker (
492431 @ Element KafkaSourceDescriptor kafkaSourceDescriptor , @ Restriction OffsetRange restriction ) {
493- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
494- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
432+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
433+ this .latestOffsetEstimatorCache ;
495434
496435 if (restriction .getTo () < Long .MAX_VALUE ) {
497436 return new OffsetRangeTracker (restriction );
@@ -500,8 +439,9 @@ public OffsetRangeTracker restrictionTracker(
500439 // OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
501440 // so we want to minimize the amount of connections that we start and track with Kafka. Another
502441 // point is that it has a memoized backlog, and this should make that more reusable estimations.
503- return new GrowableOffsetRangeTracker (
504- restriction .getFrom (), latestOffsetEstimatorCache .getUnchecked (kafkaSourceDescriptor ));
442+ final AtomicLong latestOffsetEstimator =
443+ latestOffsetEstimatorCache .getUnchecked (kafkaSourceDescriptor );
444+ return new GrowableOffsetRangeTracker (restriction .getFrom (), latestOffsetEstimator ::get );
505445 }
506446
507447 @ ProcessElement
@@ -514,14 +454,13 @@ public ProcessContinuation processElement(
514454 throws Exception {
515455 final LoadingCache <KafkaSourceDescriptor , MovingAvg > avgRecordSizeCache =
516456 this .avgRecordSizeCache ;
517- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
518- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
457+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
458+ this .latestOffsetEstimatorCache ;
519459 final LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>> pollConsumerCache =
520460 this .pollConsumerCache ;
521461
522462 final MovingAvg avgRecordSize = avgRecordSizeCache .get (kafkaSourceDescriptor );
523- final KafkaLatestOffsetEstimator latestOffsetEstimator =
524- latestOffsetEstimatorCache .get (kafkaSourceDescriptor );
463+ final AtomicLong latestOffsetEstimator = latestOffsetEstimatorCache .get (kafkaSourceDescriptor );
525464 final Consumer <byte [], byte []> consumer = pollConsumerCache .get (kafkaSourceDescriptor );
526465 final Deserializer <K > keyDeserializerInstance =
527466 Preconditions .checkStateNotNull (this .keyDeserializerInstance );
@@ -569,6 +508,14 @@ public ProcessContinuation processElement(
569508 // Fetch the next records.
570509 final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (remainingTimeout );
571510 final Duration elapsed = pollTimer .elapsed ();
511+ try {
512+ final long position = consumer .position (topicPartition );
513+ consumer
514+ .currentLag (topicPartition )
515+ .ifPresent (lag -> latestOffsetEstimator .lazySet (position + lag ));
516+ } catch (KafkaException e ) {
517+ }
518+
572519 try {
573520 remainingTimeout = remainingTimeout .minus (elapsed );
574521 } catch (ArithmeticException e ) {
@@ -676,7 +623,7 @@ public ProcessContinuation processElement(
676623
677624 final long estimatedBacklogBytes =
678625 (long )
679- (BigDecimal .valueOf (latestOffsetEstimator .estimate ())
626+ (BigDecimal .valueOf (latestOffsetEstimator .get ())
680627 .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
681628 .doubleValue ()
682629 * avgRecordSize .get ());
@@ -741,8 +688,8 @@ public void setup() throws Exception {
741688 public void teardown () throws Exception {
742689 final LoadingCache <KafkaSourceDescriptor , MovingAvg > avgRecordSizeCache =
743690 this .avgRecordSizeCache ;
744- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
745- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
691+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
692+ this .latestOffsetEstimatorCache ;
746693 final LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>> pollConsumerCache =
747694 this .pollConsumerCache ;
748695
0 commit comments