diff --git a/it/src/test/scala/integration/LoadExampleIntSpec.scala b/it/src/test/scala/integration/LoadExampleIntSpec.scala index 538b7e2..56532ee 100644 --- a/it/src/test/scala/integration/LoadExampleIntSpec.scala +++ b/it/src/test/scala/integration/LoadExampleIntSpec.scala @@ -83,6 +83,6 @@ class LoadExampleIntSpec extends KafkaSpecBase[IO] { private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = { object testContext extends TestContext[IO] import testContext.* - embeddedKafka.use(_ => test(testContext)) + embeddedKafka.surround(test(testContext)) } } diff --git a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala index c7d8519..ba7ff0c 100644 --- a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala @@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader import cats.data.{NonEmptyList, NonEmptyMap, OptionT} import cats.effect.Async +import cats.effect.kernel.Resource import cats.syntax.all.* import cats.{Monad, Order} import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer} @@ -43,20 +44,40 @@ trait TopicLoader { .evalMap { consumer => { for { - allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer)) - logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets))) + logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer)) + _ <- OptionT.liftF( + consumer.assign(logOffsets.keys) *> + logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) } + ) + } yield load(consumer, logOffsets) + }.getOrElse(Stream.empty) + } + .flatten + + def loadAndRun[F[_] : Async : LoggerFactory, K, V]( + topics: NonEmptyList[String], + consumerSettings: ConsumerSettings[F, K, V] + )(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] = + KafkaConsumer + .stream(consumerSettings) + .evalMap { consumer => + { + for { + logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer)) _ <- OptionT.liftF( consumer.assign(logOffsets.keys) *> logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) } ) - } yield consumer.records - .map(_.record) - .through(filterBelowHighestOffset(logOffsets)) + preLoadStream <- OptionT.pure(load(consumer, logOffsets)) + } yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record) }.getOrElse(Stream.empty) } .flatten - def loadAndRun(): Unit = () + private def load[F[_] : Async : LoggerFactory, K, V]( + consumer: KafkaConsumer[F, K, V], + logOffsets: NonEmptyMap[TopicPartition, LogOffsets] + ): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets)) private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V]( logOffsets: NonEmptyMap[TopicPartition, LogOffsets] @@ -73,20 +94,24 @@ trait TopicLoader { topics: NonEmptyList[String], strategy: LoadTopicStrategy, consumer: KafkaConsumer[F, K, V] - ): F[Map[TopicPartition, LogOffsets]] = for { - _ <- consumer.subscribe(topics) - partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor) - topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet - beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions) - endOffsets <- strategy match { - case LoadAll => consumer.endOffsets(topicPartitions) - case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition) - } - logOffsets = beginningOffsetPerPartition.map { case (partition, offset) => - partition -> LogOffsets(offset, endOffsets(partition)) - } - _ <- consumer.unsubscribe - } yield logOffsets.filter { case (_, o) => o.highest > o.lowest } + ): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] = + for { + _ <- consumer.subscribe(topics) + partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor) + topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet + beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions) + endOffsets <- strategy match { + case LoadAll => consumer.endOffsets(topicPartitions) + case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition) + } + logOffsets = beginningOffsetPerPartition.map { case (partition, offset) => + partition -> LogOffsets(offset, endOffsets(partition)) + } + _ <- consumer.unsubscribe + } yield { + val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest } + NonEmptyMap.fromMap(SortedMap.from(offsets)) + } private def earliestOffsets[F[_] : Monad, K, V]( consumer: KafkaConsumer[F, K, V], diff --git a/src/test/scala/base/AsyncIntSpec.scala b/src/test/scala/base/AsyncIntSpec.scala index bffc50a..d2b018c 100644 --- a/src/test/scala/base/AsyncIntSpec.scala +++ b/src/test/scala/base/AsyncIntSpec.scala @@ -1,8 +1,30 @@ package base +import cats.effect.Async +import cats.effect.std.Dispatcher import cats.effect.testing.scalatest.AsyncIOSpec +import cats.syntax.all.* +import org.scalactic.source.Position import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.enablers.Retrying import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Span import org.scalatest.wordspec.AsyncWordSpec -trait AsyncIntSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues +import scala.concurrent.duration.* + +trait AsyncIntSpec[F[_]] extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually { + override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 500.millis) + + implicit def fRetrying[T](implicit F: Async[F]): Retrying[F[T]] = new Retrying[F[T]] { + override def retry(timeout: Span, interval: Span, pos: Position)(fun: => F[T]): F[T] = + Dispatcher.sequential[F].use { dispatcher => + F.fromFuture( + F.executionContext.map( + Retrying.retryingNatureOfFutureT[T](_).retry(timeout, interval, pos)(dispatcher.unsafeToFuture(fun)) + ) + ) + } + } +} diff --git a/src/test/scala/base/KafkaSpecBase.scala b/src/test/scala/base/KafkaSpecBase.scala index a60b2bb..84621c1 100644 --- a/src/test/scala/base/KafkaSpecBase.scala +++ b/src/test/scala/base/KafkaSpecBase.scala @@ -2,4 +2,4 @@ package base import utils.{EmbeddedKafka, KafkaHelpers} -abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec with KafkaHelpers[F] with EmbeddedKafka[F] +abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec[F] with KafkaHelpers[F] with EmbeddedKafka[F] diff --git a/src/test/scala/base/UnitSpecBase.scala b/src/test/scala/base/UnitSpecBase.scala deleted file mode 100644 index e97ff1c..0000000 --- a/src/test/scala/base/UnitSpecBase.scala +++ /dev/null @@ -1,5 +0,0 @@ -package base - -import org.scalatest.concurrent.Eventually - -trait UnitSpecBase extends WordSpecBase with Eventually diff --git a/src/test/scala/base/WordSpecBase.scala b/src/test/scala/base/WordSpecBase.scala deleted file mode 100644 index 67b0ffd..0000000 --- a/src/test/scala/base/WordSpecBase.scala +++ /dev/null @@ -1,8 +0,0 @@ -package base - -import org.scalatest.OptionValues -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -abstract class WordSpecBase extends AnyWordSpec with Matchers with ScalaFutures with OptionValues diff --git a/src/test/scala/integration/TopicLoaderIntSpec.scala b/src/test/scala/integration/TopicLoaderIntSpec.scala index d41e5ef..316e17c 100644 --- a/src/test/scala/integration/TopicLoaderIntSpec.scala +++ b/src/test/scala/integration/TopicLoaderIntSpec.scala @@ -2,7 +2,7 @@ package integration import base.KafkaSpecBase import cats.data.NonEmptyList -import cats.effect.IO +import cats.effect.{IO, Ref} import fs2.kafka.{AutoOffsetReset, ConsumerSettings} import io.github.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException @@ -184,8 +184,31 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] { "loadAndRun" should { - "execute callback when finished loading and keep streaming" in { - pending + "execute callback when finished loading and keep streaming" in withKafkaContext { ctx => + import ctx.* + + val (preLoad, postLoad) = records(1 to 15).splitAt(10) + + for { + loadState <- Ref.of[IO, Boolean](false) + topicState <- Ref.empty[IO, Seq[(String, String)]] + _ <- createCustomTopics(NonEmptyList.one(testTopic1)) + _ <- publishStringMessages(testTopic1, preLoad) + assertion <- loadAndRunR(NonEmptyList.one(testTopic1))( + _ => loadState.set(true), + r => topicState.getAndUpdate(_ :+ r).void + ).surround { + for { + _ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad)) + _ <- loadState.get.asserting(_ shouldBe true) + _ <- publishStringMessages(testTopic1, postLoad) + assertion <- + eventually( + topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad)) + ) + } yield assertion + } + } yield assertion } } @@ -198,6 +221,6 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] { private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = { object testContext extends TestContext[IO] import testContext.* - embeddedKafka.use(_ => test(testContext)) + embeddedKafka.surround(test(testContext)) } } diff --git a/src/test/scala/utils/KafkaHelpers.scala b/src/test/scala/utils/KafkaHelpers.scala index 6a4c739..b9ea05d 100644 --- a/src/test/scala/utils/KafkaHelpers.scala +++ b/src/test/scala/utils/KafkaHelpers.scala @@ -2,14 +2,17 @@ package utils import java.util.UUID +import base.AsyncIntSpec import cats.data.{NonEmptyList, NonEmptySet} +import cats.effect.implicits.* +import cats.effect.kernel.Fiber import cats.effect.{Async, Resource} import cats.syntax.all.* import fs2.Stream import fs2.kafka.{AutoOffsetReset, ConsumerRecord, ConsumerSettings, KafkaConsumer} import io.github.embeddedkafka.EmbeddedKafkaConfig import org.apache.kafka.common.TopicPartition -import org.scalatest.exceptions.TestFailedException +import org.scalatest.Assertion import org.typelevel.log4cats.LoggerFactory import org.typelevel.log4cats.slf4j.Slf4jFactory import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader} @@ -17,7 +20,7 @@ import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader} import scala.concurrent.duration.* trait KafkaHelpers[F[_]] { - self: EmbeddedKafka[F] => + self: AsyncIntSpec[F] & EmbeddedKafka[F] => val groupId = "test-consumer-group" val testTopic1 = "load-state-topic-1" @@ -65,6 +68,29 @@ trait KafkaHelpers[F[_]] { TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple)) } + def loadAndRunR(topics: NonEmptyList[String])( + onLoad: Resource.ExitCase => F[Unit], + onRecord: ((String, String)) => F[Unit] + )(implicit + consumerSettings: ConsumerSettings[F, String, String], + F: Async[F] + ): Resource[F, Fiber[F, Throwable, Unit]] = Resource.make { + loadAndRunLoader(topics)(onLoad) + .map(recordToTuple) + .evalTap(onRecord) + .compile + .drain + .start + }(_.cancel.void) + + def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit + consumerSettings: ConsumerSettings[F, String, String], + F: Async[F] + ): Stream[F, ConsumerRecord[String, String]] = { + implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F] + TopicLoader.loadAndRun(topics, consumerSettings)(onLoad) + } + def moveOffsetToEnd( partitions: NonEmptySet[TopicPartition] )(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] = @@ -92,43 +118,33 @@ trait KafkaHelpers[F[_]] { def waitForCompaction( partitions: NonEmptySet[TopicPartition] - )(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] = + )(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] = consumeEventually(partitions) { r => for { records <- r messageKeys = records.map { case (k, _) => k } - result <- - if (messageKeys.sorted == messageKeys.toSet.toList.sorted) F.unit - else F.raiseError(new TestFailedException("Topic has not compacted within timeout", 1)) - } yield result + } yield { + messageKeys should not be empty + messageKeys should contain theSameElementsAs messageKeys.toSet + } } def consumeEventually( partitions: NonEmptySet[TopicPartition], groupId: String = UUID.randomUUID().toString )( - f: F[List[(String, String)]] => F[Unit] - )(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] = - retry( - fa = { - val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]]( - autoCommit = false, - offsetReset = AutoOffsetReset.Earliest, - partitions, - groupId.some - )( - _.records - .map(_.record) - .interruptAfter(5.second) - .compile - .toList - ) - - f(records.map(_.map(r => r.key -> r.value))) - }, - delay = 1.second, - max = 5 - ) + f: F[List[(String, String)]] => F[Assertion] + )(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] = + eventually { + val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]]( + autoCommit = false, + offsetReset = AutoOffsetReset.Earliest, + partitions, + groupId.some + )(_.records.map(_.record).interruptAfter(5.second).compile.toList) + + f(records.map(_.map(r => r.key -> r.value))) + } def withAssignedConsumer[T]( autoCommit: Boolean, @@ -164,11 +180,4 @@ trait KafkaHelpers[F[_]] { val settings = groupId.fold(baseSettings)(baseSettings.withGroupId) KafkaConsumer[F].resource(settings) } - - def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] = - if (max <= 1) fa - else - fa handleErrorWith { _ => - F.sleep(delay) *> retry(fa, delay, max - 1) - } }