Skip to content

Commit ee38b6c

Browse files
Integration test
1 parent d000944 commit ee38b6c

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

build.sbt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ lazy val core = (project in file("core"))
5252
libraryDependencies ++= List(
5353
"org.apache.kafka" % "kafka-clients" % "1.0.0",
5454
"com.typesafe.akka" %% "akka-actor" % "2.5.6",
55+
"com.typesafe.akka" %% "akka-stream" % "2.5.6",
5556
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
56-
"org.scalatest" %% "scalatest" % "3.0.4" % "test"
57-
)
57+
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
58+
"com.typesafe.akka" %% "akka-testkit" % "2.5.6" % "test",
59+
"com.typesafe.akka" %% "akka-stream-kafka" % "0.17" % "test",
60+
"net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % "test" exclude ("javax.jms", "jms")
61+
)
5862
)
5963

6064
lazy val exampleJava = (project in file("example-java"))
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.softwaremill.kmq.redelivery
2+
3+
import java.time.Duration
4+
import java.util.Random
5+
6+
import akka.actor.ActorSystem
7+
import akka.kafka.scaladsl.{Consumer, Producer}
8+
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
9+
import akka.stream.ActorMaterializer
10+
import akka.testkit.TestKit
11+
import com.softwaremill.kmq._
12+
import com.softwaremill.kmq.redelivery.infrastructure.KafkaSpec
13+
import org.apache.kafka.clients.consumer.ConsumerConfig
14+
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
15+
import org.apache.kafka.common.serialization.StringDeserializer
16+
import org.scalatest.concurrent.Eventually
17+
import org.scalatest.time.{Seconds, Span}
18+
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
class IntegrationTest extends TestKit(ActorSystem("test-system")) with FlatSpecLike with KafkaSpec with BeforeAndAfterAll with Eventually with Matchers {
23+
24+
implicit val materializer = ActorMaterializer()
25+
import system.dispatcher
26+
27+
"KMQ" should "resend message if not committed" in {
28+
val bootstrapServer = s"localhost:${testKafkaConfig.kafkaPort}"
29+
val kmqConfig = new KmqConfig("queue", "markers", "kmq_client", "kmq_redelivery", Duration.ofSeconds(1).toMillis,
30+
1000)
31+
32+
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
33+
.withBootstrapServers(bootstrapServer)
34+
.withGroupId(kmqConfig.getMsgConsumerGroupId)
35+
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
36+
37+
val markerProducerSettings = ProducerSettings(system,
38+
new MarkerKey.MarkerKeySerializer(), new MarkerValue.MarkerValueSerializer())
39+
.withBootstrapServers(bootstrapServer)
40+
.withProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, classOf[ParititionFromMarkerKey].getName)
41+
val markerProducer = markerProducerSettings.createKafkaProducer()
42+
43+
val random = new Random()
44+
45+
lazy val processedMessages = ArrayBuffer[String]()
46+
lazy val receivedMessages = ArrayBuffer[String]()
47+
48+
val control = Consumer.committableSource(consumerSettings, Subscriptions.topics(kmqConfig.getMsgTopic)) // 1. get messages from topic
49+
.map { msg =>
50+
ProducerMessage.Message(
51+
new ProducerRecord[MarkerKey, MarkerValue](kmqConfig.getMarkerTopic, MarkerKey.fromRecord(msg.record), new StartMarker(kmqConfig.getMsgTimeoutMs)), msg)
52+
}
53+
.via(Producer.flow(markerProducerSettings, markerProducer)) // 2. write the "start" marker
54+
.map(_.message.passThrough)
55+
.mapAsync(1) { msg =>
56+
msg.committableOffset.commitScaladsl().map(_ => msg.record) // this should be batched
57+
}
58+
.map { msg =>
59+
receivedMessages += msg.value
60+
msg
61+
}
62+
.filter(_ => random.nextInt(5) != 0)
63+
.map { processedMessage =>
64+
processedMessages += processedMessage.value
65+
new ProducerRecord[MarkerKey, MarkerValue](kmqConfig.getMarkerTopic, MarkerKey.fromRecord(processedMessage), EndMarker.INSTANCE)
66+
}
67+
.to(Producer.plainSink(markerProducerSettings, markerProducer)) // 5. write "end" markers
68+
.run()
69+
70+
val redeliveryHook = RedeliveryTracker.start(new KafkaClients(bootstrapServer), kmqConfig)
71+
72+
val messages = (0 to 20).map(_.toString)
73+
messages.foreach(msg => sendToKafka(kmqConfig.getMsgTopic,msg))
74+
75+
eventually {
76+
receivedMessages.size should be > processedMessages.size
77+
processedMessages.sortBy(_.toInt).distinct shouldBe messages
78+
}(PatienceConfig(timeout = Span(15, Seconds)), implicitly)
79+
80+
redeliveryHook.close()
81+
control.shutdown()
82+
}
83+
84+
override def afterAll(): Unit = {
85+
super.afterAll()
86+
TestKit.shutdownActorSystem(system)
87+
}
88+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.softwaremill.kmq.redelivery.infrastructure
2+
3+
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
4+
import org.apache.kafka.common.serialization.StringDeserializer
5+
import org.scalatest.{BeforeAndAfterEach, Suite}
6+
7+
trait KafkaSpec extends BeforeAndAfterEach { self: Suite =>
8+
9+
val testKafkaConfig = EmbeddedKafkaConfig(9092, 2182)
10+
private implicit val stringDeserializer = new StringDeserializer()
11+
12+
def sendToKafka(topic: String, message: String): Unit = {
13+
EmbeddedKafka.publishStringMessageToKafka(topic, message)(testKafkaConfig)
14+
}
15+
16+
def consumeFromKafka(topic: String): String = {
17+
EmbeddedKafka.consumeFirstStringMessageFrom(topic)(testKafkaConfig)
18+
}
19+
20+
override def beforeEach(): Unit = {
21+
super.beforeEach()
22+
EmbeddedKafka.start()(testKafkaConfig)
23+
}
24+
25+
override def afterEach(): Unit = {
26+
super.afterEach()
27+
EmbeddedKafka.stop()
28+
}
29+
}

0 commit comments

Comments
 (0)