Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader

import cats.data.{NonEmptyList, NonEmptyMap, OptionT}
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.syntax.all.*
import cats.{Monad, Order}
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
Expand Down Expand Up @@ -43,20 +44,40 @@ trait TopicLoader {
.evalMap { consumer =>
{
for {
allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer))
logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets)))
logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer))
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield load(consumer, logOffsets)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun[F[_] : Async : LoggerFactory, K, V](
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] =
KafkaConsumer
.stream(consumerSettings)
.evalMap { consumer =>
{
for {
logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer))
_ <- OptionT.liftF(
consumer.assign(logOffsets.keys) *>
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
)
} yield consumer.records
.map(_.record)
.through(filterBelowHighestOffset(logOffsets))
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
}.getOrElse(Stream.empty)
}
.flatten

def loadAndRun(): Unit = ()
private def load[F[_] : Async : LoggerFactory, K, V](
consumer: KafkaConsumer[F, K, V],
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))

private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
Expand All @@ -73,20 +94,24 @@ trait TopicLoader {
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumer: KafkaConsumer[F, K, V]
): F[Map[TopicPartition, LogOffsets]] = for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield logOffsets.filter { case (_, o) => o.highest > o.lowest }
): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] =
for {
_ <- consumer.subscribe(topics)
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
endOffsets <- strategy match {
case LoadAll => consumer.endOffsets(topicPartitions)
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
}
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
partition -> LogOffsets(offset, endOffsets(partition))
}
_ <- consumer.unsubscribe
} yield {
val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest }
NonEmptyMap.fromMap(SortedMap.from(offsets))
}

private def earliestOffsets[F[_] : Monad, K, V](
consumer: KafkaConsumer[F, K, V],
Expand Down
24 changes: 23 additions & 1 deletion src/test/scala/base/AsyncIntSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
package base

import cats.effect.Async
import cats.effect.std.Dispatcher
import cats.effect.testing.scalatest.AsyncIOSpec
import cats.syntax.all.*
import org.scalactic.source.Position
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.enablers.Retrying
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Span
import org.scalatest.wordspec.AsyncWordSpec

trait AsyncIntSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues
import scala.concurrent.duration.*

trait AsyncIntSpec[F[_]] extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 500.millis)

implicit def fRetrying[T](implicit F: Async[F]): Retrying[F[T]] = new Retrying[F[T]] {
override def retry(timeout: Span, interval: Span, pos: Position)(fun: => F[T]): F[T] =
Dispatcher.sequential[F].use { dispatcher =>
F.fromFuture(
F.executionContext.map(
Retrying.retryingNatureOfFutureT[T](_).retry(timeout, interval, pos)(dispatcher.unsafeToFuture(fun))
)
)
}
}
}
2 changes: 1 addition & 1 deletion src/test/scala/base/KafkaSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package base

import utils.{EmbeddedKafka, KafkaHelpers}

abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec with KafkaHelpers[F] with EmbeddedKafka[F]
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec[F] with KafkaHelpers[F] with EmbeddedKafka[F]
5 changes: 0 additions & 5 deletions src/test/scala/base/UnitSpecBase.scala

This file was deleted.

8 changes: 0 additions & 8 deletions src/test/scala/base/WordSpecBase.scala

This file was deleted.

29 changes: 26 additions & 3 deletions src/test/scala/integration/TopicLoaderIntSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package integration

import base.KafkaSpecBase
import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.{IO, Ref}
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import io.github.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
Expand Down Expand Up @@ -184,8 +184,31 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {

"loadAndRun" should {

"execute callback when finished loading and keep streaming" in {
pending
"execute callback when finished loading and keep streaming" in withKafkaContext { ctx =>
import ctx.*

val (preLoad, postLoad) = records(1 to 15).splitAt(10)

for {
loadState <- Ref.of[IO, Boolean](false)
topicState <- Ref.empty[IO, Seq[(String, String)]]
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
_ <- publishStringMessages(testTopic1, preLoad)
assertion <- loadAndRunR(NonEmptyList.one(testTopic1))(
_ => loadState.set(true),
r => topicState.getAndUpdate(_ :+ r).void
).use { _ =>
for {
_ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
_ <- loadState.get.asserting(_ shouldBe true)
_ <- publishStringMessages(testTopic1, postLoad)
assertion <-
eventually(
topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad))
)
} yield assertion
}
} yield assertion
}

}
Expand Down
81 changes: 45 additions & 36 deletions src/test/scala/utils/KafkaHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ package utils

import java.util.UUID

import base.AsyncIntSpec
import cats.data.{NonEmptyList, NonEmptySet}
import cats.effect.implicits.*
import cats.effect.kernel.Fiber
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerRecord, ConsumerSettings, KafkaConsumer}
import io.github.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.common.TopicPartition
import org.scalatest.exceptions.TestFailedException
import org.scalatest.Assertion
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory
import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader}

import scala.concurrent.duration.*

trait KafkaHelpers[F[_]] {
self: EmbeddedKafka[F] =>
self: AsyncIntSpec[F] & EmbeddedKafka[F] =>

val groupId = "test-consumer-group"
val testTopic1 = "load-state-topic-1"
Expand Down Expand Up @@ -65,6 +68,29 @@ trait KafkaHelpers[F[_]] {
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
}

def loadAndRunR(topics: NonEmptyList[String])(
onLoad: Resource.ExitCase => F[Unit],
onRecord: ((String, String)) => F[Unit]
)(implicit
consumerSettings: ConsumerSettings[F, String, String],
F: Async[F]
): Resource[F, Fiber[F, Throwable, Unit]] = Resource.make {
loadAndRunLoader(topics)(onLoad)
.map(recordToTuple)
.evalTap(onRecord)
.compile
.drain
.start
}(_.cancel.void)

def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
consumerSettings: ConsumerSettings[F, String, String],
F: Async[F]
): Stream[F, ConsumerRecord[String, String]] = {
implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
TopicLoader.loadAndRun(topics, consumerSettings)(onLoad)
}

def moveOffsetToEnd(
partitions: NonEmptySet[TopicPartition]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] =
Expand Down Expand Up @@ -92,43 +118,33 @@ trait KafkaHelpers[F[_]] {

def waitForCompaction(
partitions: NonEmptySet[TopicPartition]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
consumeEventually(partitions) { r =>
for {
records <- r
messageKeys = records.map { case (k, _) => k }
result <-
if (messageKeys.sorted == messageKeys.toSet.toList.sorted) F.unit
else F.raiseError(new TestFailedException("Topic has not compacted within timeout", 1))
} yield result
} yield {
messageKeys should not be empty
messageKeys should contain theSameElementsAs messageKeys.toSet
}
}

def consumeEventually(
partitions: NonEmptySet[TopicPartition],
groupId: String = UUID.randomUUID().toString
)(
f: F[List[(String, String)]] => F[Unit]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
retry(
fa = {
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
autoCommit = false,
offsetReset = AutoOffsetReset.Earliest,
partitions,
groupId.some
)(
_.records
.map(_.record)
.interruptAfter(5.second)
.compile
.toList
)

f(records.map(_.map(r => r.key -> r.value)))
},
delay = 1.second,
max = 5
)
f: F[List[(String, String)]] => F[Assertion]
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
eventually {
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
autoCommit = false,
offsetReset = AutoOffsetReset.Earliest,
partitions,
groupId.some
)(_.records.map(_.record).interruptAfter(5.second).compile.toList)

f(records.map(_.map(r => r.key -> r.value)))
}

def withAssignedConsumer[T](
autoCommit: Boolean,
Expand Down Expand Up @@ -164,11 +180,4 @@ trait KafkaHelpers[F[_]] {
val settings = groupId.fold(baseSettings)(baseSettings.withGroupId)
KafkaConsumer[F].resource(settings)
}

def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] =
if (max <= 1) fa
else
fa handleErrorWith { _ =>
F.sleep(delay) *> retry(fa, delay, max - 1)
}
}