Skip to content

Commit 9e60e7e

Browse files
authored
Implement loadAndRun (#7)
1 parent b89c421 commit 9e60e7e

File tree

8 files changed

+142
-76
lines changed

8 files changed

+142
-76
lines changed

it/src/test/scala/integration/LoadExampleIntSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,6 @@ class LoadExampleIntSpec extends KafkaSpecBase[IO] {
8383
private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = {
8484
object testContext extends TestContext[IO]
8585
import testContext.*
86-
embeddedKafka.use(_ => test(testContext))
86+
embeddedKafka.surround(test(testContext))
8787
}
8888
}

src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package uk.sky.fs2.kafka.topicloader
22

33
import cats.data.{NonEmptyList, NonEmptyMap, OptionT}
44
import cats.effect.Async
5+
import cats.effect.kernel.Resource
56
import cats.syntax.all.*
67
import cats.{Monad, Order}
78
import fs2.kafka.{ConsumerRecord, ConsumerSettings, KafkaConsumer}
@@ -43,20 +44,40 @@ trait TopicLoader {
4344
.evalMap { consumer =>
4445
{
4546
for {
46-
allLogOffsets <- OptionT.liftF(logOffsetsForTopics(topics, strategy, consumer))
47-
logOffsets <- OptionT.fromOption(NonEmptyMap.fromMap(SortedMap.from(allLogOffsets)))
47+
logOffsets <- OptionT(logOffsetsForTopics(topics, strategy, consumer))
48+
_ <- OptionT.liftF(
49+
consumer.assign(logOffsets.keys) *>
50+
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
51+
)
52+
} yield load(consumer, logOffsets)
53+
}.getOrElse(Stream.empty)
54+
}
55+
.flatten
56+
57+
def loadAndRun[F[_] : Async : LoggerFactory, K, V](
58+
topics: NonEmptyList[String],
59+
consumerSettings: ConsumerSettings[F, K, V]
60+
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] =
61+
KafkaConsumer
62+
.stream(consumerSettings)
63+
.evalMap { consumer =>
64+
{
65+
for {
66+
logOffsets <- OptionT(logOffsetsForTopics(topics, LoadAll, consumer))
4867
_ <- OptionT.liftF(
4968
consumer.assign(logOffsets.keys) *>
5069
logOffsets.toNel.traverse { case (tp, o) => consumer.seek(tp, o.lowest) }
5170
)
52-
} yield consumer.records
53-
.map(_.record)
54-
.through(filterBelowHighestOffset(logOffsets))
71+
preLoadStream <- OptionT.pure(load(consumer, logOffsets))
72+
} yield preLoadStream.onFinalizeCase(onLoad) ++ consumer.records.map(_.record)
5573
}.getOrElse(Stream.empty)
5674
}
5775
.flatten
5876

59-
def loadAndRun(): Unit = ()
77+
private def load[F[_] : Async : LoggerFactory, K, V](
78+
consumer: KafkaConsumer[F, K, V],
79+
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
80+
): Stream[F, ConsumerRecord[K, V]] = consumer.records.map(_.record).through(filterBelowHighestOffset(logOffsets))
6081

6182
private def filterBelowHighestOffset[F[_] : Monad : LoggerFactory, K, V](
6283
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
@@ -73,20 +94,24 @@ trait TopicLoader {
7394
topics: NonEmptyList[String],
7495
strategy: LoadTopicStrategy,
7596
consumer: KafkaConsumer[F, K, V]
76-
): F[Map[TopicPartition, LogOffsets]] = for {
77-
_ <- consumer.subscribe(topics)
78-
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
79-
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
80-
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
81-
endOffsets <- strategy match {
82-
case LoadAll => consumer.endOffsets(topicPartitions)
83-
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
84-
}
85-
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
86-
partition -> LogOffsets(offset, endOffsets(partition))
87-
}
88-
_ <- consumer.unsubscribe
89-
} yield logOffsets.filter { case (_, o) => o.highest > o.lowest }
97+
): F[Option[NonEmptyMap[TopicPartition, LogOffsets]]] =
98+
for {
99+
_ <- consumer.subscribe(topics)
100+
partitionInfo <- topics.toList.flatTraverse(consumer.partitionsFor)
101+
topicPartitions = partitionInfo.map(pi => new TopicPartition(pi.topic, pi.partition)).toSet
102+
beginningOffsetPerPartition <- consumer.beginningOffsets(topicPartitions)
103+
endOffsets <- strategy match {
104+
case LoadAll => consumer.endOffsets(topicPartitions)
105+
case LoadCommitted => earliestOffsets(consumer, beginningOffsetPerPartition)
106+
}
107+
logOffsets = beginningOffsetPerPartition.map { case (partition, offset) =>
108+
partition -> LogOffsets(offset, endOffsets(partition))
109+
}
110+
_ <- consumer.unsubscribe
111+
} yield {
112+
val offsets = logOffsets.filter { case (_, o) => o.highest > o.lowest }
113+
NonEmptyMap.fromMap(SortedMap.from(offsets))
114+
}
90115

91116
private def earliestOffsets[F[_] : Monad, K, V](
92117
consumer: KafkaConsumer[F, K, V],
Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,30 @@
11
package base
22

3+
import cats.effect.Async
4+
import cats.effect.std.Dispatcher
35
import cats.effect.testing.scalatest.AsyncIOSpec
6+
import cats.syntax.all.*
7+
import org.scalactic.source.Position
48
import org.scalatest.OptionValues
9+
import org.scalatest.concurrent.Eventually
10+
import org.scalatest.enablers.Retrying
511
import org.scalatest.matchers.should.Matchers
12+
import org.scalatest.time.Span
613
import org.scalatest.wordspec.AsyncWordSpec
714

8-
trait AsyncIntSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues
15+
import scala.concurrent.duration.*
16+
17+
trait AsyncIntSpec[F[_]] extends AsyncWordSpec with AsyncIOSpec with Matchers with OptionValues with Eventually {
18+
override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 500.millis)
19+
20+
implicit def fRetrying[T](implicit F: Async[F]): Retrying[F[T]] = new Retrying[F[T]] {
21+
override def retry(timeout: Span, interval: Span, pos: Position)(fun: => F[T]): F[T] =
22+
Dispatcher.sequential[F].use { dispatcher =>
23+
F.fromFuture(
24+
F.executionContext.map(
25+
Retrying.retryingNatureOfFutureT[T](_).retry(timeout, interval, pos)(dispatcher.unsafeToFuture(fun))
26+
)
27+
)
28+
}
29+
}
30+
}

src/test/scala/base/KafkaSpecBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ package base
22

33
import utils.{EmbeddedKafka, KafkaHelpers}
44

5-
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec with KafkaHelpers[F] with EmbeddedKafka[F]
5+
abstract class KafkaSpecBase[F[_]] extends AsyncIntSpec[F] with KafkaHelpers[F] with EmbeddedKafka[F]

src/test/scala/base/UnitSpecBase.scala

Lines changed: 0 additions & 5 deletions
This file was deleted.

src/test/scala/base/WordSpecBase.scala

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/test/scala/integration/TopicLoaderIntSpec.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package integration
22

33
import base.KafkaSpecBase
44
import cats.data.NonEmptyList
5-
import cats.effect.IO
5+
import cats.effect.{IO, Ref}
66
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
77
import io.github.embeddedkafka.EmbeddedKafkaConfig
88
import org.apache.kafka.common.errors.TimeoutException as KafkaTimeoutException
@@ -184,8 +184,31 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
184184

185185
"loadAndRun" should {
186186

187-
"execute callback when finished loading and keep streaming" in {
188-
pending
187+
"execute callback when finished loading and keep streaming" in withKafkaContext { ctx =>
188+
import ctx.*
189+
190+
val (preLoad, postLoad) = records(1 to 15).splitAt(10)
191+
192+
for {
193+
loadState <- Ref.of[IO, Boolean](false)
194+
topicState <- Ref.empty[IO, Seq[(String, String)]]
195+
_ <- createCustomTopics(NonEmptyList.one(testTopic1))
196+
_ <- publishStringMessages(testTopic1, preLoad)
197+
assertion <- loadAndRunR(NonEmptyList.one(testTopic1))(
198+
_ => loadState.set(true),
199+
r => topicState.getAndUpdate(_ :+ r).void
200+
).surround {
201+
for {
202+
_ <- eventually(topicState.get.asserting(_ should contain theSameElementsAs preLoad))
203+
_ <- loadState.get.asserting(_ shouldBe true)
204+
_ <- publishStringMessages(testTopic1, postLoad)
205+
assertion <-
206+
eventually(
207+
topicState.get.asserting(_ should contain theSameElementsAs (preLoad ++ postLoad))
208+
)
209+
} yield assertion
210+
}
211+
} yield assertion
189212
}
190213

191214
}
@@ -198,6 +221,6 @@ class TopicLoaderIntSpec extends KafkaSpecBase[IO] {
198221
private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = {
199222
object testContext extends TestContext[IO]
200223
import testContext.*
201-
embeddedKafka.use(_ => test(testContext))
224+
embeddedKafka.surround(test(testContext))
202225
}
203226
}

src/test/scala/utils/KafkaHelpers.scala

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,25 @@ package utils
22

33
import java.util.UUID
44

5+
import base.AsyncIntSpec
56
import cats.data.{NonEmptyList, NonEmptySet}
7+
import cats.effect.implicits.*
8+
import cats.effect.kernel.Fiber
69
import cats.effect.{Async, Resource}
710
import cats.syntax.all.*
811
import fs2.Stream
912
import fs2.kafka.{AutoOffsetReset, ConsumerRecord, ConsumerSettings, KafkaConsumer}
1013
import io.github.embeddedkafka.EmbeddedKafkaConfig
1114
import org.apache.kafka.common.TopicPartition
12-
import org.scalatest.exceptions.TestFailedException
15+
import org.scalatest.Assertion
1316
import org.typelevel.log4cats.LoggerFactory
1417
import org.typelevel.log4cats.slf4j.Slf4jFactory
1518
import uk.sky.fs2.kafka.topicloader.{LoadTopicStrategy, TopicLoader}
1619

1720
import scala.concurrent.duration.*
1821

1922
trait KafkaHelpers[F[_]] {
20-
self: EmbeddedKafka[F] =>
23+
self: AsyncIntSpec[F] & EmbeddedKafka[F] =>
2124

2225
val groupId = "test-consumer-group"
2326
val testTopic1 = "load-state-topic-1"
@@ -65,6 +68,29 @@ trait KafkaHelpers[F[_]] {
6568
TopicLoader.load(topics, strategy, consumerSettings).compile.toList.map(_.map(recordToTuple))
6669
}
6770

71+
def loadAndRunR(topics: NonEmptyList[String])(
72+
onLoad: Resource.ExitCase => F[Unit],
73+
onRecord: ((String, String)) => F[Unit]
74+
)(implicit
75+
consumerSettings: ConsumerSettings[F, String, String],
76+
F: Async[F]
77+
): Resource[F, Fiber[F, Throwable, Unit]] = Resource.make {
78+
loadAndRunLoader(topics)(onLoad)
79+
.map(recordToTuple)
80+
.evalTap(onRecord)
81+
.compile
82+
.drain
83+
.start
84+
}(_.cancel.void)
85+
86+
def loadAndRunLoader(topics: NonEmptyList[String])(onLoad: Resource.ExitCase => F[Unit])(implicit
87+
consumerSettings: ConsumerSettings[F, String, String],
88+
F: Async[F]
89+
): Stream[F, ConsumerRecord[String, String]] = {
90+
implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
91+
TopicLoader.loadAndRun(topics, consumerSettings)(onLoad)
92+
}
93+
6894
def moveOffsetToEnd(
6995
partitions: NonEmptySet[TopicPartition]
7096
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): Stream[F, KafkaConsumer[F, String, String]] =
@@ -92,43 +118,33 @@ trait KafkaHelpers[F[_]] {
92118

93119
def waitForCompaction(
94120
partitions: NonEmptySet[TopicPartition]
95-
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
121+
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
96122
consumeEventually(partitions) { r =>
97123
for {
98124
records <- r
99125
messageKeys = records.map { case (k, _) => k }
100-
result <-
101-
if (messageKeys.sorted == messageKeys.toSet.toList.sorted) F.unit
102-
else F.raiseError(new TestFailedException("Topic has not compacted within timeout", 1))
103-
} yield result
126+
} yield {
127+
messageKeys should not be empty
128+
messageKeys should contain theSameElementsAs messageKeys.toSet
129+
}
104130
}
105131

106132
def consumeEventually(
107133
partitions: NonEmptySet[TopicPartition],
108134
groupId: String = UUID.randomUUID().toString
109135
)(
110-
f: F[List[(String, String)]] => F[Unit]
111-
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Unit] =
112-
retry(
113-
fa = {
114-
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
115-
autoCommit = false,
116-
offsetReset = AutoOffsetReset.Earliest,
117-
partitions,
118-
groupId.some
119-
)(
120-
_.records
121-
.map(_.record)
122-
.interruptAfter(5.second)
123-
.compile
124-
.toList
125-
)
126-
127-
f(records.map(_.map(r => r.key -> r.value)))
128-
},
129-
delay = 1.second,
130-
max = 5
131-
)
136+
f: F[List[(String, String)]] => F[Assertion]
137+
)(implicit kafkaConfig: EmbeddedKafkaConfig, F: Async[F]): F[Assertion] =
138+
eventually {
139+
val records = withAssignedConsumer[F[List[ConsumerRecord[String, String]]]](
140+
autoCommit = false,
141+
offsetReset = AutoOffsetReset.Earliest,
142+
partitions,
143+
groupId.some
144+
)(_.records.map(_.record).interruptAfter(5.second).compile.toList)
145+
146+
f(records.map(_.map(r => r.key -> r.value)))
147+
}
132148

133149
def withAssignedConsumer[T](
134150
autoCommit: Boolean,
@@ -164,11 +180,4 @@ trait KafkaHelpers[F[_]] {
164180
val settings = groupId.fold(baseSettings)(baseSettings.withGroupId)
165181
KafkaConsumer[F].resource(settings)
166182
}
167-
168-
def retry[A](fa: F[A], delay: FiniteDuration, max: Int)(implicit F: Async[F]): F[A] =
169-
if (max <= 1) fa
170-
else
171-
fa handleErrorWith { _ =>
172-
F.sleep(delay) *> retry(fa, delay, max - 1)
173-
}
174183
}

0 commit comments

Comments
 (0)