@@ -40,7 +40,6 @@ import org.apache.kafka.connect.data.Schema
4040import java .io .File
4141import scala .collection .immutable
4242import scala .collection .mutable
43- import scala .util .Try
4443
4544case class MapKey (topicPartition : TopicPartition , partitionValues : immutable.Map [PartitionField , String ])
4645
@@ -192,32 +191,36 @@ class WriterManager[SM <: FileMetadata](
192191 currentOffsets : immutable.Map [TopicPartition , OffsetAndMetadata ],
193192 ): immutable.Map [TopicPartition , OffsetAndMetadata ] =
194193 currentOffsets
195- .map {
196- case (tp, offAndMeta) => (tp, getOffsetAndMeta(tp, offAndMeta))
197- }
198- .collect {
199- case (k, v) if v.nonEmpty => (k, v.get)
194+ .flatMap { case (tp, offAndMeta) =>
195+ getOffsetAndMeta(tp, offAndMeta).map(tp -> _)
200196 }
201197
202198 private def writerForTopicPartitionWithMaxOffset (topicPartition : TopicPartition ): Option [Writer [SM ]] =
203- Try (
204- writers.collect {
199+ // Collect writers for the topic-partition that have a committed offset, convert to Seq and
200+ // pick the writer with the highest committed offset value using maxByOption (Scala 2.13)
201+ writers
202+ .collect {
205203 case (key, writer) if key.topicPartition == topicPartition && writer.getCommittedOffset.nonEmpty => writer
206204 }
207- .maxBy(_.getCommittedOffset),
208- ).toOption
209-
210- private def getOffsetAndMeta (topicPartition : TopicPartition , offsetAndMetadata : OffsetAndMetadata ) =
205+ .toSeq
206+ .maxByOption(_.getCommittedOffset.get.value)
207+
208+ private def getOffsetAndMeta (
209+ topicPartition : TopicPartition ,
210+ offsetAndMetadata : OffsetAndMetadata ,
211+ ): Option [OffsetAndMetadata ] =
212+ // Compose Options functionally: find the writer with max offset, then build the new OffsetAndMetadata
211213 for {
212- writer <- writerForTopicPartitionWithMaxOffset(topicPartition)
213- offsetAndMeta <- Try {
214- new OffsetAndMetadata (
215- writer.getCommittedOffset.get.value,
216- offsetAndMetadata.leaderEpoch(),
217- offsetAndMetadata.metadata(),
218- )
219- }.toOption
220- } yield offsetAndMeta
214+ writer <- writerForTopicPartitionWithMaxOffset(topicPartition)
215+ committed <- writer.getCommittedOffset
216+ } yield new OffsetAndMetadata (
217+ // kafka last offset for a partition is the last committed + 1
218+ // Therefore the connector should report last committed + 1 or it will lead to a constant lag og 1
219+ // which might confuse the users.
220+ committed.value + 1 ,
221+ offsetAndMetadata.leaderEpoch(),
222+ offsetAndMetadata.metadata(),
223+ )
221224
222225 def cleanUp (topicPartition : TopicPartition ): Unit =
223226 writers
0 commit comments