Skip to content

Commit 940ddd7

Browse files
authored
Implement load (#3)
1 parent 7daf4dc commit 940ddd7

File tree

18 files changed

+844
-33
lines changed

18 files changed

+844
-33
lines changed

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,33 @@
11
# fs2-kafka-topic-loader
22

3-
Reads the contents of provided Kafka topics
3+
Reads the contents of provided Kafka topics, in one of two ways:
4+
- reads the topics in their entirety
5+
- reads up to the last consumer group's committed Offset
6+
This is determined by the `LoadTopicStrategy`.
7+
8+
Add the following to your `build.sbt`:
9+
10+
```scala
11+
libraryDependencies += "uk.sky" %% "fs2-kafka-topic-loader" % "<version>"
12+
```
13+
14+
```scala
15+
import cats.data.NonEmptyList
16+
import cats.effect.{IO, IOApp}
17+
import fs2.kafka.ConsumerSettings
18+
19+
object Main extends IOApp.Simple {
20+
val consumerSettings: ConsumerSettings[IO, String, String] = ???
21+
22+
override def run: IO[Unit] =
23+
TopicLoader.load(NonEmptyList.one("topicToLoad"), LoadAll, consumerSettings).evalTap(IO.println).compile.drain
24+
}
25+
```
26+
27+
See [LoadExample.scala](./it/src/main/scala/load/LoadExample.scala) for a more detailed example.
28+
29+
## Configuration
30+
31+
Configuration from the Topic Loader is done via the `ConsumerSettings`. The group id of the Topic Loader should match
32+
the group id of your application.
33+

aliases.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ addCommandAlias("runFix", "scalafixAll OrganizeImports; scalafixAll")
33
addCommandAlias("checkFmt", "scalafmtCheckAll; scalafmtSbtCheck")
44
addCommandAlias("runFmt", "scalafmtAll; scalafmtSbt")
55

6-
addCommandAlias("ciBuild", "checkFmt; checkFix; +test")
6+
addCommandAlias("ciBuild", "project root; checkFmt; checkFix; +test; project it; checkFmt; checkFix; test")

build.sbt

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,50 @@ ThisBuild / organization := "uk.sky"
99
ThisBuild / description := "Read the contents of provided Kafka topics"
1010
ThisBuild / licenses := List("BSD New" -> url("https://opensource.org/licenses/BSD-3-Clause"))
1111

12-
ThisBuild / scalaVersion := scala213 // TODO - for development to get unused warnings
13-
ThisBuild / crossScalaVersions := supportedScalaVersions
14-
ThisBuild / semanticdbEnabled := true
15-
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
12+
ThisBuild / semanticdbEnabled := true
13+
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision
1614

1715
ThisBuild / scalafixDependencies += Dependencies.Plugins.organizeImports
1816

1917
tpolecatScalacOptions ++= Set(ScalacOptions.source3)
2018

2119
lazy val root = (project in file("."))
2220
.settings(
23-
name := "fs2-kafka-topic-loader",
24-
libraryDependencies ++= Seq(scalaTest)
21+
name := "fs2-kafka-topic-loader",
22+
scalaVersion := scala213,
23+
crossScalaVersions := supportedScalaVersions,
24+
libraryDependencies ++= Seq(
25+
Cats.core,
26+
Cats.effect,
27+
Cats.log4cats,
28+
Cats.log4catsSlf4j,
29+
Fs2.core,
30+
Fs2.kafka,
31+
embeddedKafka,
32+
scalaTest,
33+
catsEffectTesting,
34+
logbackClassic
35+
)
2536
)
2637

38+
lazy val it = (project in file("it"))
39+
.settings(
40+
name := "integration-test",
41+
scalaVersion := scala213,
42+
publish := false,
43+
libraryDependencies ++= Seq(
44+
Cats.core,
45+
Cats.effect,
46+
Fs2.core,
47+
Fs2.kafka,
48+
embeddedKafka,
49+
scalaTest,
50+
catsEffectTesting,
51+
logbackClassic
52+
)
53+
)
54+
.dependsOn(root % "test->test;compile->compile")
55+
2756
/** Scala 3 doesn't support two rules yet - RemoveUnused and ProcedureSyntax. So we require a different scalafix config
2857
* for Scala 3
2958
*
@@ -39,7 +68,15 @@ ThisBuild / scalafixConfig := {
3968
}
4069
}
4170

71+
ThisBuild / excludeDependencies ++= {
72+
CrossVersion.partialVersion(scalaVersion.value) match {
73+
case Some((3, _)) => Dependencies.scala3Exclusions
74+
case _ => Seq.empty
75+
}
76+
}
77+
4278
Test / parallelExecution := false
4379
Test / fork := true
4480

4581
Global / onChangedBuildSource := ReloadOnSourceChanges
82+
Global / scalafmtOnCompile := true

it/build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
tpolecatScalacOptions ++= Set(ScalacOptions.source3)
2+
3+
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package load
2+
3+
import cats.Traverse
4+
import cats.data.NonEmptyList
5+
import cats.effect.Ref
6+
import cats.effect.kernel.Async
7+
import cats.syntax.all.*
8+
import fs2.kafka.*
9+
import fs2.{Pipe, Stream}
10+
import org.typelevel.log4cats.LoggerFactory
11+
import uk.sky.fs2.kafka.topicloader.{LoadCommitted, TopicLoader}
12+
13+
import scala.concurrent.duration.*
14+
15+
/** Simple application that streams Kafka records into a memory store, and produce the same record back to Kafka. On
16+
* startup, it will read all of the currently committed records back into this store, but not produce them to Kafka.
17+
*/
18+
class LoadExample[F[_] : Async, G[_] : Traverse](
19+
load: Stream[F, String],
20+
run: Stream[F, G[String]],
21+
publishAndCommit: Pipe[F, G[String], Nothing],
22+
store: Ref[F, List[String]]
23+
) {
24+
private def process(message: String): F[Unit] = store.update(_ :+ message)
25+
26+
val stream: Stream[F, G[String]] =
27+
(load.evalTap(process).drain ++ run.evalTap(_.traverse(process))).observe(publishAndCommit)
28+
29+
}
30+
31+
object LoadExample {
32+
def kafka[F[_] : Async : LoggerFactory](
33+
topics: NonEmptyList[String],
34+
outputTopic: String,
35+
consumerSettings: ConsumerSettings[F, String, String],
36+
producerSettings: ProducerSettings[F, String, String],
37+
store: Ref[F, List[String]]
38+
): LoadExample[F, CommittableConsumerRecord[F, String, *]] = {
39+
val loadStream = TopicLoader.load[F, String, String](topics, LoadCommitted, consumerSettings).map(_.value)
40+
41+
val runStream = KafkaConsumer.stream(consumerSettings).subscribe(topics).records
42+
43+
val publishAndCommit: Pipe[F, CommittableConsumerRecord[F, String, String], Nothing] =
44+
_.map(message =>
45+
message.offset -> ProducerRecords.one(
46+
ProducerRecord(topic = outputTopic, key = message.record.key, value = message.record.value)
47+
)
48+
).through { offsetsAndProducerRecords =>
49+
KafkaProducer.stream(producerSettings).flatMap { producer =>
50+
offsetsAndProducerRecords.evalMap { case (offset, producerRecord) =>
51+
producer.produce(producerRecord).flatMap(_.as(offset))
52+
}
53+
}
54+
}.through(commitBatchWithin[F](1, 5.seconds)).drain
55+
56+
new LoadExample(
57+
load = loadStream,
58+
run = runStream,
59+
publishAndCommit = publishAndCommit,
60+
store = store
61+
)
62+
}
63+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package integration
2+
3+
import base.KafkaSpecBase
4+
import cats.data.NonEmptyList
5+
import cats.effect.{Async, IO, Ref}
6+
import cats.syntax.all.*
7+
import fs2.kafka.*
8+
import io.github.embeddedkafka.EmbeddedKafkaConfig
9+
import load.LoadExample
10+
import org.scalatest.Assertion
11+
import org.typelevel.log4cats.LoggerFactory
12+
import org.typelevel.log4cats.slf4j.Slf4jFactory
13+
import utils.RandomPort
14+
15+
import scala.concurrent.duration.*
16+
17+
class LoadExampleIntSpec extends KafkaSpecBase[IO] {
18+
val inputTopic = "test-topic-1"
19+
val outputTopic = "output-topic-1"
20+
private val timeout = 10.seconds
21+
22+
"LoadExample" should {
23+
"load previously seen messages into the store" in withKafkaContext { ctx =>
24+
import ctx.*
25+
26+
for {
27+
_ <- publishStringMessage(inputTopic, "key1", "value1")
28+
_ <- runAppAndDiscard
29+
_ <- publishStringMessage(inputTopic, "key2", "value2")
30+
result <- runApp
31+
} yield result should contain theSameElementsInOrderAs List("value1", "value2")
32+
}
33+
34+
"not publish previously committed messages" in withKafkaContext { ctx =>
35+
import ctx.*
36+
37+
for {
38+
_ <- publishStringMessage(inputTopic, "key1", "value1")
39+
_ <- runAppAndDiscard
40+
_ <- consumeStringMessage(outputTopic, autoCommit = true)
41+
_ <- publishStringMessage(inputTopic, "key2", "value2")
42+
_ <- runAppAndDiscard
43+
result <- consumeStringMessage(outputTopic, autoCommit = true)
44+
} yield result shouldBe "value2"
45+
}
46+
}
47+
48+
private abstract class TestContext[F[_] : Async] {
49+
private val store: F[Ref[F, List[String]]] = Ref[F].of(List.empty)
50+
51+
private implicit val loggerFactory: LoggerFactory[F] = Slf4jFactory.create[F]
52+
53+
implicit val kafkaConfig: EmbeddedKafkaConfig =
54+
EmbeddedKafkaConfig(kafkaPort = RandomPort(), zooKeeperPort = RandomPort(), Map("log.roll.ms" -> "10"))
55+
56+
private val consumerSettings: ConsumerSettings[F, String, String] =
57+
ConsumerSettings[F, String, String]
58+
.withBootstrapServers(s"localhost:${kafkaConfig.kafkaPort}")
59+
.withAutoOffsetReset(AutoOffsetReset.Earliest)
60+
.withGroupId("load-example-consumer-group")
61+
62+
private val producerSettings: ProducerSettings[F, String, String] =
63+
ProducerSettings[F, String, String]
64+
.withBootstrapServers(s"localhost:${kafkaConfig.kafkaPort}")
65+
66+
val runApp: F[List[String]] =
67+
for {
68+
store <- store
69+
example1 =
70+
LoadExample.kafka[F](
71+
topics = NonEmptyList.one(inputTopic),
72+
outputTopic = outputTopic,
73+
consumerSettings = consumerSettings,
74+
producerSettings = producerSettings,
75+
store = store
76+
)
77+
stored <- example1.stream.interruptAfter(timeout).compile.drain *> store.get
78+
} yield stored
79+
80+
val runAppAndDiscard: F[Unit] = runApp.void
81+
}
82+
83+
private def withKafkaContext(test: TestContext[IO] => IO[Assertion]): IO[Assertion] = {
84+
object testContext extends TestContext[IO]
85+
import testContext.*
86+
embeddedKafka.use(_ => test(testContext))
87+
}
88+
}

project/Dependencies.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,30 @@
11
import sbt.*
22

33
object Dependencies {
4+
45
object Plugins {
56
lazy val organizeImports = "com.github.liancheng" %% "organize-imports" % "0.6.0"
67
}
78

8-
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.2.15" % Test
9+
object Cats {
10+
lazy val core = "org.typelevel" %% "cats-core" % "2.9.0"
11+
lazy val effect = "org.typelevel" %% "cats-effect" % "3.4.10"
12+
lazy val log4cats = "org.typelevel" %% "log4cats-core" % "2.6.0"
13+
lazy val log4catsSlf4j = "org.typelevel" %% "log4cats-slf4j" % "2.6.0"
14+
}
15+
16+
object Fs2 {
17+
lazy val core = "co.fs2" %% "fs2-core" % "3.6.1"
18+
lazy val kafka = "com.github.fd4s" %% "fs2-kafka" % "3.0.1"
19+
}
20+
21+
lazy val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" % Test cross CrossVersion.for3Use2_13
22+
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.2.15" % Test
23+
lazy val catsEffectTesting = "org.typelevel" %% "cats-effect-testing-scalatest" % "1.5.0" % Test
24+
25+
lazy val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.4.6" % Runtime
26+
27+
val scala3Exclusions = Seq(
28+
"com.typesafe.scala-logging" % "scala-logging_2.13"
29+
)
930
}

project/plugins.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
2-
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
1+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
2+
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
33
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.1")

0 commit comments

Comments
 (0)