Skip to content

Commit ca19be9

Browse files
committed
Implement loadAndrun
1 parent 9d66864 commit ca19be9

File tree

2 files changed

+14
-22
lines changed

2 files changed

+14
-22
lines changed

src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ trait TopicLoader {
5454
}
5555
.flatten
5656

57-
private def load[F[_] : Async : LoggerFactory, K, V](
58-
consumer: KafkaConsumer[F, K, V],
59-
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
60-
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))
61-
6257
def loadAndRun[F[_] : Async : LoggerFactory, K, V](
6358
topics: NonEmptyList[String],
6459
consumerSettings: ConsumerSettings[F, K, V]
@@ -74,15 +69,16 @@ trait TopicLoader {
7469
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
7570
)
7671
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
77-
_ <- OptionT.liftF(
78-
consumer.assign(logOffsets.keys) *>
79-
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.highest) }
80-
)
8172
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
8273
}.getOrElse(Stream.empty)
8374
}
8475
.flatten
8576

77+
private def load[F[_] : Async : LoggerFactory, K, V](
78+
consumer: KafkaConsumer[F, K, V],
79+
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
80+
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))
81+
8682
private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
8783
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
8884
): Pipe[F, ConsumerRecord[K, V], ConsumerRecord[K, V]] = {

src/test/scala/integration/TopicLoaderIntSpec.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -188,31 +188,27 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
188188
import ctx.*
189189

190190
val (preLoad, postLoad) = records(1 to 15).splitAt(10)
191-
println(s"preload: $preLoad")
192-
println(s"postLoad: $postLoad")
193191

194-
val ref: IO[Ref[IO, Boolean]] = Ref.of(false)
192+
val loadRef: IO[Ref[IO, Boolean]] = Ref.of(false)
195193
val topicRef: IO[Ref[IO, Seq[(String, String)]]] = Ref.empty
196194

197195
for {
198-
state <- ref
196+
loadState <- loadRef
199197
topicState <- topicRef
200198
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
201199
_ <- publishStringMessages(testTopic1, preLoad)
202-
fibre <- loadAndRunLoader(NonEmptyList.one(testTopic1))(_ => state.set(true))
203-
.debug()
200+
fiber <- loadAndRunLoader(NonEmptyList.one(testTopic1))(_ => loadState.set(true))
204201
.map(recordToTuple)
205-
.evalTap(r => IO.println(s"Updating with $r") *> topicState.getAndUpdate(inter => inter :+ r))
202+
.evalTap(r => topicState.getAndUpdate(_ :+ r))
206203
.compile
207-
.toList
204+
.drain
208205
.start
209206
_ <- retry(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
210-
_ <- state.get.asserting(_ shouldBe true)
211-
_ <- IO.println("\n\n\nREPUBLISHING\n\n\n")
207+
_ <- loadState.get.asserting(_ shouldBe true)
212208
_ <- publishStringMessages(testTopic1, postLoad)
213-
_ <- retry(topicState.get.asserting(_.sorted should contain theSameElementsAs (preLoad ++ postLoad).sorted))
214-
outcome <- fibre.joinWith(IO.raiseError(new IllegalStateException("Something happened")))
215-
} yield outcome should contain theSameElementsAs preLoad ++ postLoad
209+
assertion <- retry(topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad)))
210+
_ <- fiber.cancel
211+
} yield assertion
216212
}
217213

218214
}

0 commit comments

Comments
 (0)