Skip to content

Commit f54663c

Browse files
committed
feat(tests): add TestingEmbeddedZookeeper/TestingEmbeddedKafka utility classes
This commits add a new maven module tests that packs with utility classes for running an embedded single-node Kafka cluster. In addition it adds some unit-tests for KafkaProducerContainer
1 parent 57e113f commit f54663c

File tree

10 files changed

+646
-6
lines changed

10 files changed

+646
-6
lines changed

clients/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

12+
<name>Kafka Clients for Kotlin</name>
1213
<artifactId>kafka-clients-kotlin</artifactId>
1314

1415
<dependencies>
@@ -20,6 +21,10 @@
2021
<groupId>org.jetbrains.kotlin</groupId>
2122
<artifactId>kotlin-stdlib</artifactId>
2223
</dependency>
24+
<dependency>
25+
<groupId>org.jetbrains.kotlin</groupId>
26+
<artifactId>kotlin-stdlib-jdk8</artifactId>
27+
</dependency>
2328
<dependency>
2429
<groupId>org.jetbrains.kotlinx</groupId>
2530
<artifactId>kotlinx-coroutines-core</artifactId>
@@ -29,6 +34,19 @@
2934
<artifactId>kotlin-test-junit</artifactId>
3035
<scope>test</scope>
3136
</dependency>
37+
<dependency>
38+
<groupId>io.streamthoughts</groupId>
39+
<artifactId>kafka-clients-kotlin-tests</artifactId>
40+
<version>${project.version}</version>
41+
<scope>test</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>io.streamthoughts</groupId>
45+
<artifactId>kafka-clients-kotlin-tests</artifactId>
46+
<version>${project.version}</version>
47+
<type>test-jar</type>
48+
<scope>test</scope>
49+
</dependency>
3250
<dependency>
3351
<groupId>org.junit.platform</groupId>
3452
<artifactId>junit-platform-launcher</artifactId>

clients/src/main/kotlin/io/streamthoughts/kafka/clients/producer/KafkaProducerContainer.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ import java.util.concurrent.atomic.AtomicInteger
4848
*/
4949
class KafkaProducerContainer<K, V> private constructor(
5050
private val configs: KafkaProducerConfigs,
51+
private val onSendCallback: ProducerSendCallback<K, V>,
5152
private val keySerializer: Serializer<K> ?= null,
5253
private val valueSerializer: Serializer<V> ?= null,
5354
private val producerFactory: ProducerFactory? = null,
54-
private val defaultTopic: String? = null,
55-
private val onSendCallback: ProducerSendCallback<K, V>
55+
private val defaultTopic: String? = null
5656
): ProducerContainer<K, V> {
5757

5858
companion object {
@@ -272,11 +272,11 @@ class KafkaProducerContainer<K, V> private constructor(
272272

273273
fun build(): ProducerContainer<K, V> = KafkaProducerContainer(
274274
configs,
275+
onSendCallback ?: DelegateSendCallback(onSendSuccess, onSendError),
275276
keySerializer,
276277
valueSerializer,
277278
producerFactory,
278-
defaultTopic,
279-
onSendCallback ?: DelegateSendCallback(onSendSuccess, onSendError)
279+
defaultTopic
280280
)
281281
}
282282

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.clients.producer
20+
21+
import io.streamthoughts.kafka.clients.Kafka
22+
import io.streamthoughts.kafka.clients.KafkaClientConfigs
23+
import io.streamthoughts.kafka.tests.junit.EmbeddedSingleNodeKafkaCluster
24+
import io.streamthoughts.kafka.tests.TestingEmbeddedKafka
25+
import org.apache.kafka.clients.consumer.ConsumerConfig
26+
import org.apache.kafka.clients.consumer.ConsumerRecord
27+
import org.apache.kafka.common.serialization.StringDeserializer
28+
import org.apache.kafka.common.serialization.StringSerializer
29+
import org.junit.jupiter.api.AfterEach
30+
import org.junit.jupiter.api.Assertions
31+
import org.junit.jupiter.api.BeforeAll
32+
import org.junit.jupiter.api.BeforeEach
33+
import org.junit.jupiter.api.Test
34+
import org.junit.jupiter.api.TestInstance
35+
import org.junit.jupiter.api.extension.ExtendWith
36+
import java.time.Duration
37+
import java.util.Properties
38+
39+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
40+
@ExtendWith(EmbeddedSingleNodeKafkaCluster::class)
41+
class KafkaProducerContainerTest(private val cluster: TestingEmbeddedKafka) {
42+
43+
companion object {
44+
const val DEFAULT_TOPIC = "default-topic"
45+
const val TEST_TOPIC = "test-topic"
46+
}
47+
private lateinit var kafka : Kafka
48+
49+
private lateinit var configs: KafkaProducerConfigs
50+
51+
private lateinit var container : ProducerContainer<String, String>
52+
53+
@BeforeAll
54+
fun setUp() {
55+
kafka = Kafka(cluster.bootstrapServers().split(",").toTypedArray())
56+
configs = KafkaProducerConfigs(KafkaClientConfigs(kafka))
57+
58+
createAndInitContainer()
59+
}
60+
61+
@BeforeEach
62+
fun createTopics() {
63+
cluster.createTopic(DEFAULT_TOPIC)
64+
cluster.createTopic(TEST_TOPIC)
65+
}
66+
67+
@AfterEach
68+
fun dropTopics() {
69+
cluster.deleteTopics(listOf(DEFAULT_TOPIC, TEST_TOPIC))
70+
}
71+
72+
private fun createAndInitContainer() {
73+
container = KafkaProducerContainer.Builder<String, String>(configs)
74+
.keySerializer(StringSerializer())
75+
.valueSerializer(StringSerializer())
76+
.defaultTopic(DEFAULT_TOPIC)
77+
.build()
78+
container.init()
79+
}
80+
81+
@Test
82+
fun should_produce_record_to_specific_topic_given_single_value() {
83+
container.send(value = "test-value", topic = TEST_TOPIC)
84+
container.flush()
85+
val records = consumeRecords(TEST_TOPIC)
86+
Assertions.assertEquals("test-value", records[0].value())
87+
}
88+
89+
@Test
90+
fun should_produce_record_to_default_topic_given_single_value() {
91+
container.send(value = "test-value")
92+
container.flush()
93+
val records = consumeRecords(DEFAULT_TOPIC)
94+
Assertions.assertEquals("test-value", records[0].value())
95+
}
96+
97+
@Test
98+
fun should_produce_record_to_specific_topic_given_key_value() {
99+
container.send(key = "test-key", value = "test-value", topic = TEST_TOPIC)
100+
container.flush()
101+
val records = consumeRecords(TEST_TOPIC)
102+
Assertions.assertEquals("test-value", records[0].value())
103+
Assertions.assertEquals("test-key", records[0].key())
104+
}
105+
106+
107+
private fun consumeRecords(topic: String,
108+
timeout: Duration = Duration.ofMinutes(1),
109+
expectedNumRecords: Int = 1): List<ConsumerRecord<String, String>> {
110+
val configs = Properties()
111+
configs.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
112+
val records = cluster.consumeUntilMinRecordsOrTimeout(
113+
topic = topic,
114+
timeout = Duration.ofMinutes(1),
115+
expectedNumRecords = expectedNumRecords,
116+
keyDeserializer = StringDeserializer(),
117+
valueDeserializer = StringDeserializer()
118+
)
119+
Assertions.assertTrue(
120+
records.size >= expectedNumRecords,
121+
"Did not receive all $expectedNumRecords records from topic $topic within ${timeout.toMillis()} ms"
122+
)
123+
return records
124+
125+
}
126+
}
File renamed without changes.

examples/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

12-
<artifactId>kafka-client-kotlin-example</artifactId>
12+
<name>Kafka Clients for Kotlin Examples</name>
13+
<artifactId>kafka-clients-kotlin-example</artifactId>
1314

1415
<dependencies>
1516
<dependency>

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
<artifactId>kafka-clients-kotlin-reactor</artifactId>
88
<version>0.2.0</version>
99
<modules>
10+
<module>tests</module>
1011
<module>clients</module>
1112
<module>examples</module>
1213
</modules>
1314
<packaging>pom</packaging>
1415

15-
<name>Kafka Clients for Kotlin</name>
16+
<name>Kafka Clients for Kotlin Reactor</name>
1617
<description>Kafka Clients for Kotlin</description>
1718

1819
<developers>
@@ -180,6 +181,11 @@
180181
<artifactId>kotlin-stdlib</artifactId>
181182
<version>${kotlin.version}</version>
182183
</dependency>
184+
<dependency>
185+
<groupId>org.jetbrains.kotlin</groupId>
186+
<artifactId>kotlin-stdlib-jdk8</artifactId>
187+
<version>${kotlin.version}</version>
188+
</dependency>
183189
<dependency>
184190
<groupId>org.jetbrains.kotlinx</groupId>
185191
<artifactId>kotlinx-coroutines-core</artifactId>

tests/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>kafka-clients-kotlin-reactor</artifactId>
7+
<groupId>io.streamthoughts</groupId>
8+
<version>0.2.0</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>kafka-clients-kotlin-tests</artifactId>
13+
14+
<name>Kafka Clients for Kotlin Tests</name>
15+
<description>Utility classes for testing Kafka Clients in Kotlin</description>
16+
17+
<properties>
18+
<curator.version>5.1.0</curator.version>
19+
</properties>
20+
21+
<dependencies>
22+
<!-- START Kotlin dependencies -->
23+
<dependency>
24+
<groupId>org.jetbrains.kotlin</groupId>
25+
<artifactId>kotlin-stdlib</artifactId>
26+
</dependency>
27+
<dependency>
28+
<groupId>org.jetbrains.kotlin</groupId>
29+
<artifactId>kotlin-stdlib-jdk8</artifactId>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.jetbrains.kotlin</groupId>
33+
<artifactId>kotlin-test-junit</artifactId>
34+
<scope>test</scope>
35+
</dependency>
36+
<!-- END Kotlin dependencies -->
37+
38+
<!-- START Apache Kafka dependencies -->
39+
<dependency>
40+
<groupId>org.apache.kafka</groupId>
41+
<artifactId>kafka_2.13</artifactId>
42+
<version>${kafka.version}</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.kafka</groupId>
46+
<artifactId>kafka_2.13</artifactId>
47+
<classifier>test</classifier>
48+
<version>${kafka.version}</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.kafka</groupId>
52+
<artifactId>kafka-clients</artifactId>
53+
<classifier>test</classifier>
54+
<version>${kafka.version}</version>
55+
</dependency>
56+
<!-- END Apache Kafka dependencies -->
57+
58+
<dependency>
59+
<groupId>org.apache.curator</groupId>
60+
<artifactId>curator-test</artifactId>
61+
<version>${curator.version}</version>
62+
</dependency>
63+
64+
<!-- START JUnit dependencies -->
65+
<dependency>
66+
<groupId>org.junit.platform</groupId>
67+
<artifactId>junit-platform-launcher</artifactId>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.junit.jupiter</groupId>
72+
<artifactId>junit-jupiter-engine</artifactId>
73+
<scope>test</scope>
74+
</dependency>
75+
<!-- END JUnit dependencies -->
76+
</dependencies>
77+
78+
<build>
79+
<plugins>
80+
<plugin>
81+
<groupId>org.apache.maven.plugins</groupId>
82+
<artifactId>maven-jar-plugin</artifactId>
83+
<version>3.2.0</version>
84+
<executions>
85+
<execution>
86+
<goals>
87+
<goal>test-jar</goal>
88+
</goals>
89+
</execution>
90+
</executions>
91+
</plugin>
92+
</plugins>
93+
</build>
94+
</project>

0 commit comments

Comments
 (0)