Skip to content

Commit 55e77c4

Browse files
committed
Fiber resource
1 parent 67de72a commit 55e77c4

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

src/test/scala/integration/TopicLoaderIntSpec.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,25 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
189189

190190
val (preLoad, postLoad) = records(1 to 15).splitAt(10)
191191

192-
val loadRef: IO[Ref[IO, Boolean]] = Ref.of(false)
193-
val topicRef: IO[Ref[IO, Seq[(String, String)]]] = Ref.empty
194-
195192
for {
196-
loadState <- loadRef
197-
topicState <- topicRef
193+
loadState <- Ref.of[IO, Boolean](false)
194+
topicState <- Ref.empty[IO, Seq[(String, String)]]
198195
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
199196
_ <- publishStringMessages(testTopic1, preLoad)
200-
fiber <- loadAndRunLoader(NonEmptyList.one(testTopic1))(_ => loadState.set(true))
201-
.map(recordToTuple)
202-
.evalTap(r => topicState.getAndUpdate(_ :+ r))
203-
.compile
204-
.drain
205-
.start
206-
_ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
207-
_ <- loadState.get.asserting(_ shouldBe true)
208-
_ <- publishStringMessages(testTopic1, postLoad)
209-
assertion <- eventually(topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad)))
210-
_ <- fiber.cancel
197+
assertion <- loadAndRunR(NonEmptyList.one(testTopic1))(
198+
_ => loadState.set(true),
199+
r => topicState.getAndUpdate(_ :+ r).void
200+
).use { _ =>
201+
for {
202+
_ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
203+
_ <- loadState.get.asserting(_ shouldBe true)
204+
_ <- publishStringMessages(testTopic1, postLoad)
205+
assertion <-
206+
eventually(
207+
topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad))
208+
)
209+
} yield assertion
210+
}
211211
} yield assertion
212212
}
213213

src/test/scala/utils/KafkaHelpers.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import java.util.UUID
44

55
import base.AsyncIntSpec
66
import cats.data.{NonEmptyList, NonEmptySet}
7+
import cats.effect.implicits.*
8+
import cats.effect.kernel.Fiber
79
import cats.effect.{Async, Resource}
810
import cats.syntax.all.*
911
import fs2.Stream
@@ -66,6 +68,21 @@ trait KafkaHelpers[F[_]] {
6668
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
6769
}
6870

71+
def loadAndRunR(topics: NonEmptyList[String])(
72+
onLoad: Resource.ExitCase => F[Unit],
73+
onRecord: ((String, String)) => F[Unit]
74+
)(implicit
75+
consumerSettings: ConsumerSettings[F, String, String],
76+
F: Async[F]
77+
): Resource[F, Fiber[F, Throwable, Unit]] = Resource.make {
78+
loadAndRunLoader(topics)(onLoad)
79+
.map(recordToTuple)
80+
.evalTap(onRecord)
81+
.compile
82+
.drain
83+
.start
84+
}(_.cancel.void)
85+
6986
def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
7087
consumerSettings: ConsumerSettings[F, String, String],
7188
F: Async[F]

0 commit comments

Comments
 (0)