5454import org .apache .druid .indexing .seekablestream .common .RecordSupplier ;
5555import org .apache .druid .indexing .seekablestream .common .StreamException ;
5656import org .apache .druid .indexing .seekablestream .common .StreamPartition ;
57+ import org .apache .druid .indexing .seekablestream .supervisor .OffsetSnapshot ;
5758import org .apache .druid .indexing .seekablestream .supervisor .SeekableStreamSupervisor ;
5859import org .apache .druid .indexing .seekablestream .supervisor .SeekableStreamSupervisorIOConfig ;
5960import org .apache .druid .indexing .seekablestream .supervisor .SeekableStreamSupervisorReportPayload ;
7374import java .util .Optional ;
7475import java .util .Set ;
7576import java .util .TreeMap ;
77+ import java .util .concurrent .atomic .AtomicReference ;
78+ import java .util .function .Function ;
7679import java .util .regex .Pattern ;
7780import java .util .stream .Collectors ;
7881
@@ -94,11 +97,13 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
9497 private static final Long END_OF_PARTITION = Long .MAX_VALUE ;
9598
9699 private final Pattern pattern ;
97- private volatile Map <KafkaTopicPartition , Long > latestSequenceFromStream ;
98100 private volatile Map <KafkaTopicPartition , Long > partitionToTimeLag ;
99101
100102 private final KafkaSupervisorSpec spec ;
101103
104+ private final AtomicReference <OffsetSnapshot <KafkaTopicPartition , Long >> offsetSnapshotRef = new AtomicReference <>(
105+ OffsetSnapshot .of (Map .of (), Map .of ()));
106+
102107 public KafkaSupervisor (
103108 final TaskStorage taskStorage ,
104109 final TaskMaster taskMaster ,
@@ -125,7 +130,6 @@ public KafkaSupervisor(
125130 this .pattern = getIoConfig ().isMultiTopic () ? Pattern .compile (getIoConfig ().getStream ()) : null ;
126131 }
127132
128-
129133 @ Override
130134 protected RecordSupplier <KafkaTopicPartition , Long , KafkaRecordEntity > setupRecordSupplier ()
131135 {
@@ -172,15 +176,15 @@ protected SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
172176 )
173177 {
174178 KafkaSupervisorIOConfig ioConfig = spec .getIoConfig ();
175- Map <KafkaTopicPartition , Long > partitionLag = getRecordLagPerPartitionInLatestSequences (getHighestCurrentOffsets () );
179+ Map <KafkaTopicPartition , Long > partitionLag = getRecordLagPerPartitionInLatestSequences ();
176180 return new KafkaSupervisorReportPayload (
177181 spec .getId (),
178182 spec .getDataSchema ().getDataSource (),
179183 ioConfig .getStream (),
180184 numPartitions ,
181185 ioConfig .getReplicas (),
182186 ioConfig .getTaskDuration ().getMillis () / 1000 ,
183- includeOffsets ? latestSequenceFromStream : null ,
187+ includeOffsets ? getLatestSequencesFromStream () : null ,
184188 includeOffsets ? partitionLag : null ,
185189 includeOffsets ? getPartitionTimeLag () : null ,
186190 includeOffsets ? aggregatePartitionLags (partitionLag ).getTotalLag () : null ,
@@ -261,14 +265,15 @@ protected List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEnt
261265 @ Override
262266 protected Map <KafkaTopicPartition , Long > getPartitionRecordLag ()
263267 {
264- Map <KafkaTopicPartition , Long > highestCurrentOffsets = getHighestCurrentOffsets ();
268+ Map <KafkaTopicPartition , Long > latestSequencesFromStream = getLatestSequencesFromStream ();
269+ Map <KafkaTopicPartition , Long > highestIngestedOffsets = getHighestIngestedOffsets ();
265270
266- if (latestSequenceFromStream == null ) {
271+ if (latestSequencesFromStream . isEmpty () ) {
267272 return null ;
268273 }
269274
270- Set <KafkaTopicPartition > kafkaPartitions = latestSequenceFromStream .keySet ();
271- Set <KafkaTopicPartition > taskPartitions = highestCurrentOffsets .keySet ();
275+ Set <KafkaTopicPartition > kafkaPartitions = latestSequencesFromStream .keySet ();
276+ Set <KafkaTopicPartition > taskPartitions = highestIngestedOffsets .keySet ();
272277 if (!kafkaPartitions .equals (taskPartitions )) {
273278 try {
274279 log .warn ("Mismatched kafka and task partitions: Missing Task Partitions %s, Missing Kafka Partitions %s" ,
@@ -281,7 +286,7 @@ protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
281286 }
282287 }
283288
284- return getRecordLagPerPartitionInLatestSequences (highestCurrentOffsets );
289+ return getRecordLagPerPartitionInLatestSequences ();
285290 }
286291
287292 @ Nullable
@@ -294,44 +299,48 @@ protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
294299 // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
295300 @ SuppressWarnings ("SSBasedInspection" )
296301 // Used while calculating cummulative lag for entire stream
297- private Map <KafkaTopicPartition , Long > getRecordLagPerPartitionInLatestSequences (Map < KafkaTopicPartition , Long > currentOffsets )
302+ private Map <KafkaTopicPartition , Long > getRecordLagPerPartitionInLatestSequences ()
298303 {
299- if (latestSequenceFromStream == null ) {
304+ Map <KafkaTopicPartition , Long > highestIngestedOffsets = getHighestIngestedOffsets ();
305+ Map <KafkaTopicPartition , Long > latestSequencesFromStream = getLatestSequencesFromStream ();
306+
307+ if (latestSequencesFromStream .isEmpty ()) {
300308 return Collections .emptyMap ();
301309 }
302310
303- return latestSequenceFromStream
304- .entrySet ()
305- .stream ()
306- .collect (
307- Collectors .toMap (
308- Entry ::getKey ,
309- e -> e .getValue () != null
310- ? e .getValue () - Optional .ofNullable (currentOffsets .get (e .getKey ())).orElse (0L )
311- : 0
312- )
313- );
311+ return latestSequencesFromStream .entrySet ()
312+ .stream ()
313+ .collect (
314+ Collectors .toMap (
315+ Entry ::getKey ,
316+ e ->
317+ e .getValue () - highestIngestedOffsets .getOrDefault (e .getKey (), 0L )
318+ )
319+ );
314320 }
315321
322+ // This function is defined and called by the parent class to compute the lag for specific partitions.
323+ // The `currentOffsets` parameter is provided by the parent class and indicates the partitions to query.
324+ // Note: This function differs from `getRecordLagPerPartitionInLatestSequences()`:
325+ // `getRecordLagPerPartitionInLatestSequences()` queries lag for all partitions,
326+ // whereas `getRecordLagPerPartition()` queries lag only for the specified partitions.
316327 @ Override
317328 protected Map <KafkaTopicPartition , Long > getRecordLagPerPartition (Map <KafkaTopicPartition , Long > currentOffsets )
318329 {
319- if (latestSequenceFromStream == null || currentOffsets == null ) {
330+ Map <KafkaTopicPartition , Long > latestSequencesFromStream = getLatestSequencesFromStream ();
331+ Map <KafkaTopicPartition , Long > highestIngestedOffsets = getHighestIngestedOffsets ();
332+
333+ if (latestSequencesFromStream .isEmpty () || highestIngestedOffsets .isEmpty () || currentOffsets == null ) {
320334 return Collections .emptyMap ();
321335 }
322336
323- return currentOffsets
324- .entrySet ()
325- .stream ()
326- .filter (e -> latestSequenceFromStream .get (e .getKey ()) != null )
327- .collect (
328- Collectors .toMap (
329- Entry ::getKey ,
330- e -> e .getValue () != null
331- ? latestSequenceFromStream .get (e .getKey ()) - e .getValue ()
332- : 0
333- )
334- );
337+ return currentOffsets .keySet ().stream ()
338+ .filter (latestSequencesFromStream ::containsKey )
339+ .collect (Collectors .toMap (
340+ Function .identity (),
341+ // compute the lag using offsets from the snapshot.
342+ p -> latestSequencesFromStream .get (p ) - highestIngestedOffsets .getOrDefault (p , 0L )
343+ ));
335344 }
336345
337346 @ Override
@@ -436,7 +445,7 @@ private void updatePartitionTimeAndRecordLagFromStream()
436445 yetToReadPartitions .forEach (p -> lastIngestedTimestamps .put (p , 0L ));
437446
438447 recordSupplier .seekToLatest (partitions );
439- latestSequenceFromStream = recordSupplier .getLatestSequenceNumbers (partitions );
448+ Map < KafkaTopicPartition , Long > latestSequenceFromStream = recordSupplier .getLatestSequenceNumbers (partitions );
440449
441450 for (Map .Entry <KafkaTopicPartition , Long > entry : latestSequenceFromStream .entrySet ()) {
442451 // if there are no messages .getEndOffset would return 0, but if there are n msgs it would return n+1
@@ -454,6 +463,8 @@ private void updatePartitionTimeAndRecordLagFromStream()
454463 e -> e .getValue () - lastIngestedTimestamps .get (e .getKey ())
455464 )
456465 );
466+
467+ updateOffsetSnapshot (highestCurrentOffsets , latestSequenceFromStream );
457468 }
458469 catch (InterruptedException e ) {
459470 throw new StreamException (e );
@@ -506,6 +517,8 @@ protected void updatePartitionLagFromStream()
506517 return ;
507518 }
508519
520+ Map <KafkaTopicPartition , Long > highestCurrentOffsets = getHighestCurrentOffsets ();
521+
509522 getRecordSupplierLock ().lock ();
510523 try {
511524 Set <KafkaTopicPartition > partitionIds ;
@@ -524,8 +537,10 @@ protected void updatePartitionLagFromStream()
524537
525538 recordSupplier .seekToLatest (partitions );
526539
527- latestSequenceFromStream =
540+ Map < KafkaTopicPartition , Long > latestSequenceFromStream =
528541 partitions .stream ().collect (Collectors .toMap (StreamPartition ::getPartitionId , recordSupplier ::getPosition ));
542+
543+ updateOffsetSnapshot (highestCurrentOffsets , latestSequenceFromStream );
529544 }
530545 catch (InterruptedException e ) {
531546 throw new StreamException (e );
@@ -535,10 +550,24 @@ protected void updatePartitionLagFromStream()
535550 }
536551 }
537552
553+ private void updateOffsetSnapshot (
554+ Map <KafkaTopicPartition , Long > highestIngestedOffsets ,
555+ Map <KafkaTopicPartition , Long > latestOffsetsFromStream
556+ )
557+ {
558+ offsetSnapshotRef .set (
559+ OffsetSnapshot .of (highestIngestedOffsets , latestOffsetsFromStream ));
560+ }
561+
538562 @ Override
539563 protected Map <KafkaTopicPartition , Long > getLatestSequencesFromStream ()
540564 {
541- return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap <>();
565+ return offsetSnapshotRef .get ().getLatestOffsetsFromStream ();
566+ }
567+
568+ private Map <KafkaTopicPartition , Long > getHighestIngestedOffsets ()
569+ {
570+ return offsetSnapshotRef .get ().getHighestIngestedOffsets ();
542571 }
543572
544573 @ Override
0 commit comments