Skip to content

Commit 9d66864

Browse files
committed
Working with load split
1 parent b89c421 commit 9d66864

File tree

3 files changed

+89
-25
lines changed

3 files changed

+89
-25
lines changed

src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader
22

33
import cats.data.{NonEmptyList, NonEmptyMap, OptionT}
44
import cats.effect.Async
5+
import cats.effect.kernel.Resource
56
import cats.syntax.all.*
67
import cats.{Monad, Order}
78
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
@@ -43,21 +44,45 @@ trait TopicLoader {
4344
.evalMap { consumer =>
4445
{
4546
for {
46-
allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer))
47-
logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets)))
47+
logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer))
48+
_ <- OptionT.liftF(
49+
consumer.assign(logOffsets.keys) *>
50+
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
51+
)
52+
} yield load(consumer, logOffsets)
53+
}.getOrElse(Stream.empty)
54+
}
55+
.flatten
56+
57+
private def load[F[_] : Async : LoggerFactory, K, V](
58+
consumer: KafkaConsumer[F, K, V],
59+
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
60+
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))
61+
62+
def loadAndRun[F[_] : Async : LoggerFactory, K, V](
63+
topics: NonEmptyList[String],
64+
consumerSettings: ConsumerSettings[F, K, V]
65+
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] =
66+
KafkaConsumer
67+
.stream(consumerSettings)
68+
.evalMap { consumer =>
69+
{
70+
for {
71+
logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer))
4872
_ <- OptionT.liftF(
4973
consumer.assign(logOffsets.keys) *>
5074
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
5175
)
52-
} yield consumer.records
53-
.map(_.record)
54-
.through(filterBelowHighestOffset(logOffsets))
76+
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
77+
_ <- OptionT.liftF(
78+
consumer.assign(logOffsets.keys) *>
79+
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.highest) }
80+
)
81+
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
5582
}.getOrElse(Stream.empty)
5683
}
5784
.flatten
5885

59-
def loadAndRun(): Unit = ()
60-
6186
private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
6287
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
6388
): Pipe[F, ConsumerRecord[K, V], ConsumerRecord[K, V]] = {
@@ -73,20 +98,24 @@ trait TopicLoader {
7398
topics: NonEmptyList[String],
7499
strategy: LoadTopicStrategy,
75100
consumer: KafkaConsumer[F, K, V]
76-
): F[Map[TopicPartition, LogOffsets]] = for {
77-
_ <- consumer.subscribe(topics)
78-
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
79-
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
80-
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
81-
endOffsets <- strategy match {
82-
case LoadAll => consumer.endOffsets(topicPartitions)
83-
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
84-
}
85-
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
86-
partition -> LogOffsets(offset, endOffsets(partition))
87-
}
88-
_ <- consumer.unsubscribe
89-
} yield logOffsets.filter { case (_, o) => o.highest > o.lowest }
101+
): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] =
102+
for {
103+
_ <- consumer.subscribe(topics)
104+
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
105+
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
106+
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
107+
endOffsets <- strategy match {
108+
case LoadAll => consumer.endOffsets(topicPartitions)
109+
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
110+
}
111+
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
112+
partition -> LogOffsets(offset, endOffsets(partition))
113+
}
114+
_ <- consumer.unsubscribe
115+
} yield {
116+
val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest }
117+
NonEmptyMap.fromMap(SortedMap.from(offsets))
118+
}
90119

91120
private def earliestOffsets[F[_] : Monad, K, V](
92121
consumer: KafkaConsumer[F, K, V],

src/test/scala/integration/TopicLoaderIntSpec.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package integration
22

33
import base.KafkaSpecBase
44
import cats.data.NonEmptyList
5-
import cats.effect.IO
5+
import cats.effect.{IO, Ref}
66
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
77
import io.github.embeddedkafka.EmbeddedKafkaConfig
88
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
@@ -184,8 +184,35 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
184184

185185
"loadAndRun" should {
186186

187-
"execute callback when finished loading and keep streaming" in {
188-
pending
187+
"execute callback when finished loading and keep streaming" in withKafkaContext { ctx =>
188+
import ctx.*
189+
190+
val (preLoad, postLoad) = records(1 to 15).splitAt(10)
191+
println(s"preload: $preLoad")
192+
println(s"postLoad: $postLoad")
193+
194+
val ref: IO[Ref[IO, Boolean]] = Ref.of(false)
195+
val topicRef: IO[Ref[IO, Seq[(String, String)]]] = Ref.empty
196+
197+
for {
198+
state <- ref
199+
topicState <- topicRef
200+
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
201+
_ <- publishStringMessages(testTopic1, preLoad)
202+
fibre <- loadAndRunLoader(NonEmptyList.one(testTopic1))(_ => state.set(true))
203+
.debug()
204+
.map(recordToTuple)
205+
.evalTap(r => IO.println(s"Updating with $r") *> topicState.getAndUpdate(inter => inter :+ r))
206+
.compile
207+
.toList
208+
.start
209+
_ <- retry(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
210+
_ <- state.get.asserting(_ shouldBe true)
211+
_ <- IO.println("\n\n\nREPUBLISHING\n\n\n")
212+
_ <- publishStringMessages(testTopic1, postLoad)
213+
_ <- retry(topicState.get.asserting(_.sorted should contain theSameElementsAs (preLoad ++ postLoad).sorted))
214+
outcome <- fibre.joinWith(IO.raiseError(new IllegalStateException("Something happened")))
215+
} yield outcome should contain theSameElementsAs preLoad ++ postLoad
189216
}
190217

191218
}

src/test/scala/utils/KafkaHelpers.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ trait KafkaHelpers[F[_]] {
6565
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
6666
}
6767

68+
def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
69+
consumerSettings: ConsumerSettings[F, String, String],
70+
F: Async[F]
71+
): Stream[F, ConsumerRecord[String, String]] = {
72+
implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
73+
TopicLoader.loadAndRun(topics, consumerSettings)(onLoad)
74+
}
75+
6876
def moveOffsetToEnd(
6977
partitions: NonEmptySet[TopicPartition]
7078
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] =
@@ -165,7 +173,7 @@ trait KafkaHelpers[F[_]] {
165173
KafkaConsumer[F].resource(settings)
166174
}
167175

168-
def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] =
176+
def retry[A](fa: F[A], delay: FiniteDuration = 1.second, max: Int = 10)(implicit F: Async[F]): F[A] =
169177
if (max <= 1) fa
170178
else
171179
fa handleErrorWith { _ =>

0 commit comments

Comments
 (0)