1919
2020import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
2121
22- import java .io .Closeable ;
2322import java .math .BigDecimal ;
2423import java .math .MathContext ;
2524import java .time .Duration ;
2625import java .util .Collections ;
2726import java .util .List ;
2827import java .util .Map ;
2928import java .util .Optional ;
30- import java .util .function . Supplier ;
29+ import java .util .concurrent . atomic . AtomicLong ;
3130import org .apache .beam .sdk .coders .Coder ;
3231import org .apache .beam .sdk .io .kafka .KafkaIO .ReadSourceDescriptors ;
3332import org .apache .beam .sdk .io .kafka .KafkaIOUtils .MovingAvg ;
4948import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimator ;
5049import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimators .MonotonicallyIncreasing ;
5150import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
52- import org .apache .beam .sdk .util .ExpiringMemoizingSerializableSupplier ;
5351import org .apache .beam .sdk .util .MemoizingPerInstantiationSerializableSupplier ;
5452import org .apache .beam .sdk .util .Preconditions ;
5553import org .apache .beam .sdk .util .SerializableSupplier ;
7169import org .apache .kafka .clients .consumer .ConsumerConfig ;
7270import org .apache .kafka .clients .consumer .ConsumerRecord ;
7371import org .apache .kafka .clients .consumer .ConsumerRecords ;
72+ import org .apache .kafka .common .KafkaException ;
7473import org .apache .kafka .common .PartitionInfo ;
7574import org .apache .kafka .common .TopicPartition ;
7675import org .apache .kafka .common .config .ConfigDef ;
@@ -235,30 +234,12 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor)
235234 CacheBuilder .newBuilder ()
236235 .concurrencyLevel (Runtime .getRuntime ().availableProcessors ())
237236 .weakValues ()
238- .removalListener (
239- (RemovalNotification <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
240- notification ) -> {
241- final @ Nullable KafkaLatestOffsetEstimator value ;
242- if (notification .getCause () == RemovalCause .COLLECTED
243- && (value = notification .getValue ()) != null ) {
244- value .close ();
245- }
246- })
247237 .build (
248- new CacheLoader <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >() {
238+ new CacheLoader <KafkaSourceDescriptor , AtomicLong >() {
249239 @ Override
250- public KafkaLatestOffsetEstimator load (
251- final KafkaSourceDescriptor sourceDescriptor ) {
252- LOG .info (
253- "Creating Kafka consumer for offset estimation for {}" ,
254- sourceDescriptor );
255- final Map <String , Object > config =
256- KafkaIOUtils .overrideBootstrapServersConfig (
257- consumerConfig , sourceDescriptor );
258- final Consumer <byte [], byte []> consumer =
259- consumerFactoryFn .apply (config );
260- return new KafkaLatestOffsetEstimator (
261- consumer , sourceDescriptor .getTopicPartition ());
240+ public AtomicLong load (final KafkaSourceDescriptor sourceDescriptor ) {
241+ LOG .info ("Creating end offset estimator for {}" , sourceDescriptor );
242+ return new AtomicLong (Long .MIN_VALUE );
262243 }
263244 }));
264245 this .pollConsumerCacheSupplier =
@@ -319,8 +300,7 @@ public Consumer<byte[], byte[]> load(
319300 private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , MovingAvg >>
320301 avgRecordSizeCacheSupplier ;
321302
322- private final SerializableSupplier <
323- LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >>
303+ private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , AtomicLong >>
324304 latestOffsetEstimatorCacheSupplier ;
325305
326306 private final SerializableSupplier <LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>>>
@@ -329,8 +309,7 @@ public Consumer<byte[], byte[]> load(
329309 private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , MovingAvg >
330310 avgRecordSizeCache ;
331311
332- private transient @ MonotonicNonNull LoadingCache <
333- KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
312+ private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , AtomicLong >
334313 latestOffsetEstimatorCache ;
335314
336315 private transient @ MonotonicNonNull LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>>
@@ -349,46 +328,6 @@ public Consumer<byte[], byte[]> load(
349328 @ VisibleForTesting
350329 static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader .RAW_SIZE_METRIC_PREFIX ;
351330
352- /**
353- * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to
354- * fetch backlog.
355- */
356- private static class KafkaLatestOffsetEstimator
357- implements GrowableOffsetRangeTracker .RangeEndEstimator , Closeable {
358- private final Consumer <byte [], byte []> offsetConsumer ;
359- private final Supplier <Long > offsetSupplier ;
360-
361- KafkaLatestOffsetEstimator (
362- final Consumer <byte [], byte []> offsetConsumer , final TopicPartition topicPartition ) {
363- this .offsetConsumer = offsetConsumer ;
364- this .offsetSupplier =
365- new ExpiringMemoizingSerializableSupplier <>(
366- () -> {
367- try {
368- return offsetConsumer
369- .endOffsets (Collections .singleton (topicPartition ))
370- .getOrDefault (topicPartition , Long .MIN_VALUE );
371- } catch (Throwable t ) {
372- LOG .error ("Failed to get end offset for {}" , topicPartition , t );
373- return Long .MIN_VALUE ;
374- }
375- },
376- Duration .ofSeconds (1 ),
377- Long .MIN_VALUE ,
378- Duration .ZERO );
379- }
380-
381- @ Override
382- public long estimate () {
383- return offsetSupplier .get ();
384- }
385-
386- @ Override
387- public void close () {
388- offsetConsumer .close ();
389- }
390- }
391-
392331 @ GetInitialRestriction
393332 @ RequiresNonNull ({"pollConsumerCache" })
394333 public OffsetRange initialRestriction (@ Element KafkaSourceDescriptor kafkaSourceDescriptor ) {
@@ -500,8 +439,8 @@ public double getSize(
500439 @ RequiresNonNull ({"latestOffsetEstimatorCache" })
501440 public UnsplittableRestrictionTracker <OffsetRange , Long > restrictionTracker (
502441 @ Element KafkaSourceDescriptor kafkaSourceDescriptor , @ Restriction OffsetRange restriction ) {
503- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
504- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
442+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
443+ this .latestOffsetEstimatorCache ;
505444
506445 if (restriction .getTo () < Long .MAX_VALUE ) {
507446 return new UnsplittableRestrictionTracker <>(new OffsetRangeTracker (restriction ));
@@ -510,9 +449,10 @@ public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
510449 // OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
511450 // so we want to minimize the amount of connections that we start and track with Kafka. Another
512451 // point is that it has a memoized backlog, and this should make that more reusable estimations.
452+ final AtomicLong latestOffsetEstimator =
453+ latestOffsetEstimatorCache .getUnchecked (kafkaSourceDescriptor );
513454 return new UnsplittableRestrictionTracker <>(
514- new GrowableOffsetRangeTracker (
515- restriction .getFrom (), latestOffsetEstimatorCache .getUnchecked (kafkaSourceDescriptor )));
455+ new GrowableOffsetRangeTracker (restriction .getFrom (), latestOffsetEstimator ::get ));
516456 }
517457
518458 @ ProcessElement
@@ -525,14 +465,13 @@ public ProcessContinuation processElement(
525465 throws Exception {
526466 final LoadingCache <KafkaSourceDescriptor , MovingAvg > avgRecordSizeCache =
527467 this .avgRecordSizeCache ;
528- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
529- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
468+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
469+ this .latestOffsetEstimatorCache ;
530470 final LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>> pollConsumerCache =
531471 this .pollConsumerCache ;
532472
533473 final MovingAvg avgRecordSize = avgRecordSizeCache .get (kafkaSourceDescriptor );
534- final KafkaLatestOffsetEstimator latestOffsetEstimator =
535- latestOffsetEstimatorCache .get (kafkaSourceDescriptor );
474+ final AtomicLong latestOffsetEstimator = latestOffsetEstimatorCache .get (kafkaSourceDescriptor );
536475 final Consumer <byte [], byte []> consumer = pollConsumerCache .get (kafkaSourceDescriptor );
537476 final Deserializer <K > keyDeserializerInstance =
538477 Preconditions .checkStateNotNull (this .keyDeserializerInstance );
@@ -580,6 +519,14 @@ public ProcessContinuation processElement(
580519 // Fetch the next records.
581520 final ConsumerRecords <byte [], byte []> rawRecords = consumer .poll (remainingTimeout );
582521 final Duration elapsed = pollTimer .elapsed ();
522+ try {
523+ final long position = consumer .position (topicPartition );
524+ consumer
525+ .currentLag (topicPartition )
526+ .ifPresent (lag -> latestOffsetEstimator .lazySet (position + lag ));
527+ } catch (KafkaException e ) {
528+ }
529+
583530 try {
584531 remainingTimeout = remainingTimeout .minus (elapsed );
585532 } catch (ArithmeticException e ) {
@@ -687,7 +634,7 @@ public ProcessContinuation processElement(
687634
688635 final long estimatedBacklogBytes =
689636 (long )
690- (BigDecimal .valueOf (latestOffsetEstimator .estimate ())
637+ (BigDecimal .valueOf (latestOffsetEstimator .get ())
691638 .subtract (BigDecimal .valueOf (expectedOffset ), MathContext .DECIMAL128 )
692639 .doubleValue ()
693640 * avgRecordSize .get ());
@@ -752,8 +699,8 @@ public void setup() throws Exception {
752699 public void teardown () throws Exception {
753700 final LoadingCache <KafkaSourceDescriptor , MovingAvg > avgRecordSizeCache =
754701 this .avgRecordSizeCache ;
755- final LoadingCache <KafkaSourceDescriptor , KafkaLatestOffsetEstimator >
756- latestOffsetEstimatorCache = this .latestOffsetEstimatorCache ;
702+ final LoadingCache <KafkaSourceDescriptor , AtomicLong > latestOffsetEstimatorCache =
703+ this .latestOffsetEstimatorCache ;
757704 final LoadingCache <KafkaSourceDescriptor , Consumer <byte [], byte []>> pollConsumerCache =
758705 this .pollConsumerCache ;
759706
0 commit comments