Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader

import cats.data.{NonEmptyList, NonEmptyMap, OptionT}
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.syntax.all.*
import cats.{Monad, Order}
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
Expand Down Expand Up @@ -43,20 +44,40 @@ trait TopicLoader {
.evalMap { consumer =>
{
for {
allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer))
logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets)))
logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer))
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield load(consumer, logOffsets)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] =
KafkaConsumer
.stream(consumerSettings)
.evalMap { consumer =>
{
for {
logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer))
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield consumer.records
.map(_.record)
.through(filterBelowHighestOffset(logOffsets))
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun(): Unit = ()
private def load[F[_] : Async : LoggerFactory, K, V](
consumer: KafkaConsumer[F, K, V],
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))

private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
Expand All @@ -73,20 +94,24 @@ trait TopicLoader {
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumer: KafkaConsumer[F, K, V]
): F[Map[TopicPartition, LogOffsets]] = for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield logOffsets.filter { case (_, o) => o.highest > o.lowest }
): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] =
for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield {
val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest }
NonEmptyMap.fromMap(SortedMap.from(offsets))
}

private def earliestOffsets[F[_] : Monad, K, V](
consumer: KafkaConsumer[F, K, V],
Expand Down
29 changes: 26 additions & 3 deletions src/test/scala/integration/TopicLoaderIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package integration

import base.KafkaSpecBase
import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.{IO, Ref}
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import io.github.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
Expand Down Expand Up @@ -184,8 +184,31 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {

"loadAndRun" should {

"execute callback when finished loading and keep streaming" in {
pending
"execute callback when finished loading and keep streaming" in withKafkaContext { ctx =>
import ctx.*

val (preLoad, postLoad) = records(1 to 15).splitAt(10)

val loadRef: IO[Ref[IO, Boolean]] = Ref.of(false)
val topicRef: IO[Ref[IO, Seq[(String, String)]]] = Ref.empty

for {
loadState <- loadRef
topicState <- topicRef
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
_ <- publishStringMessages(testTopic1, preLoad)
fiber <- loadAndRunLoader(NonEmptyList.one(testTopic1))(_ => loadState.set(true))
.map(recordToTuple)
.evalTap(r => topicState.getAndUpdate(_ :+ r))
.compile
.drain
.start
_ <- retry(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
_ <- loadState.get.asserting(_ shouldBe true)
_ <- publishStringMessages(testTopic1, postLoad)
assertion <- retry(topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad)))
_ <- fiber.cancel
} yield assertion
}

}
Expand Down
10 changes: 9 additions & 1 deletion src/test/scala/utils/KafkaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ trait KafkaHelpers[F[_]] {
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
}

def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
consumerSettings: ConsumerSettings[F, String, String],
F: Async[F]
): Stream[F, ConsumerRecord[String, String]] = {
implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
TopicLoader.loadAndRun(topics, consumerSettings)(onLoad)
}

def moveOffsetToEnd(
partitions: NonEmptySet[TopicPartition]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] =
Expand Down Expand Up @@ -165,7 +173,7 @@ trait KafkaHelpers[F[_]] {
KafkaConsumer[F].resource(settings)
}

def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] =
def retry[A](fa: F[A], delay: FiniteDuration = 1.second, max: Int = 10)(implicit F: Async[F]): F[A] =
if (max <= 1) fa
else
fa handleErrorWith { _ =>
Expand Down