66import java .util .HashSet ;
77import java .util .Map ;
88import java .util .Set ;
9+ import java .util .stream .Collectors ;
910import lombok .Getter ;
1011import lombok .extern .slf4j .Slf4j ;
1112import org .apache .commons .lang3 .mutable .MutableLong ;
1213import org .apache .kafka .clients .consumer .Consumer ;
1314import org .apache .kafka .common .TopicPartition ;
15+ import org .apache .kafka .common .errors .UnsupportedVersionException ;
1416
1517@ Slf4j
1618@ Getter
@@ -34,7 +36,7 @@ class OffsetsInfo {
3436
3537 OffsetsInfo (Consumer <?, ?> consumer , Collection <TopicPartition > targetPartitions ) {
3638 this .consumer = consumer ;
37- this .beginOffsets = consumer . beginningOffsets ( targetPartitions );
39+ this .beginOffsets = firstOffsetsForPolling ( consumer , targetPartitions );
3840 this .endOffsets = consumer .endOffsets (targetPartitions );
3941 endOffsets .forEach ((tp , endOffset ) -> {
4042 var beginningOffset = beginOffsets .get (tp );
@@ -46,6 +48,28 @@ class OffsetsInfo {
4648 });
4749 }
4850
51+
52+ private Map <TopicPartition , Long > firstOffsetsForPolling (Consumer <?, ?> consumer ,
53+ Collection <TopicPartition > partitions ) {
54+ try {
55+ // we try to use offsetsForTimes() to find earliest offsets, since for
56+ // some topics (like compacted) beginningOffsets() ruturning 0 offsets
57+ // even when effectively first offset can be very high
58+ var offsets = consumer .offsetsForTimes (
59+ partitions .stream ().collect (Collectors .toMap (p -> p , p -> 0L ))
60+ );
61+ // result of offsetsForTimes() can be null, if message version < 0.10.0
62+ if (offsets .entrySet ().stream ().noneMatch (e -> e .getValue () == null )) {
63+ return offsets .entrySet ().stream ()
64+ .collect (Collectors .toMap (Map .Entry ::getKey , e -> e .getValue ().offset ()));
65+ }
66+ } catch (UnsupportedOperationException | UnsupportedVersionException e ) {
67+ // offsetsForTimes() not supported
68+ }
69+ //falling back to beginningOffsets() if offsetsForTimes() not supported
70+ return consumer .beginningOffsets (partitions );
71+ }
72+
4973 boolean assignedPartitionsFullyPolled () {
5074 for (var tp : consumer .assignment ()) {
5175 Preconditions .checkArgument (endOffsets .containsKey (tp ));
0 commit comments