Skip to content

Commit cc28c79

Browse files
authored
Add methods for simplified testing of Kafka endpoints (#270)
1 parent 2271bc7 commit cc28c79

File tree

18 files changed

+691
-550
lines changed

18 files changed

+691
-550
lines changed

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,22 @@
2424

2525
package com.bakdata.kafka;
2626

27-
import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
28-
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
29-
import static java.util.Collections.emptyMap;
27+
import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
28+
import static com.bakdata.kafka.KafkaTest.newCluster;
3029
import static org.assertj.core.api.Assertions.assertThat;
3130

32-
import com.bakdata.kafka.util.ImprovedAdminClient;
31+
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
3332
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
3433
import java.time.Duration;
3534
import java.util.List;
3635
import java.util.regex.Pattern;
36+
import org.apache.kafka.clients.consumer.ConsumerConfig;
3737
import org.apache.kafka.clients.consumer.ConsumerRecord;
38+
import org.apache.kafka.clients.producer.ProducerConfig;
3839
import org.apache.kafka.common.serialization.Serdes;
3940
import org.apache.kafka.common.serialization.Serdes.StringSerde;
40-
import org.apache.kafka.streams.KeyValue;
41+
import org.apache.kafka.common.serialization.StringDeserializer;
42+
import org.apache.kafka.common.serialization.StringSerializer;
4143
import org.apache.kafka.streams.kstream.Consumed;
4244
import org.junit.jupiter.api.Test;
4345
import org.testcontainers.kafka.KafkaContainer;
@@ -214,7 +216,7 @@ public SerdeConfig defaultSerializationConfig() {
214216
@ExpectSystemExitWithStatus(1)
215217
void shouldExitWithErrorInTopology() throws InterruptedException {
216218
final String input = "input";
217-
try (final KafkaContainer kafkaCluster = newKafkaCluster();
219+
try (final KafkaContainer kafkaCluster = newCluster();
218220
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
219221
@Override
220222
public void buildTopology(final TopologyBuilder builder) {
@@ -240,8 +242,12 @@ public SerdeConfig defaultSerializationConfig() {
240242
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
241243
"--input-topics", input
242244
);
243-
new KafkaContainerHelper(kafkaCluster).send()
244-
.to(input, List.of(new KeyValue<>("foo", "bar")));
245+
new KafkaTestClient(KafkaEndpointConfig.builder()
246+
.bootstrapServers(kafkaCluster.getBootstrapServers())
247+
.build()).send()
248+
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
249+
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
250+
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
245251
Thread.sleep(Duration.ofSeconds(10).toMillis());
246252
}
247253
}
@@ -251,7 +257,7 @@ public SerdeConfig defaultSerializationConfig() {
251257
void shouldExitWithSuccessCodeOnShutdown() {
252258
final String input = "input";
253259
final String output = "output";
254-
try (final KafkaContainer kafkaCluster = newKafkaCluster();
260+
try (final KafkaContainer kafkaCluster = newCluster();
255261
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
256262
@Override
257263
public void buildTopology(final TopologyBuilder builder) {
@@ -270,20 +276,24 @@ public SerdeConfig defaultSerializationConfig() {
270276
}
271277
})) {
272278
kafkaCluster.start();
273-
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster);
274-
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
275-
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
276-
}
279+
final KafkaTestClient testClient = new KafkaTestClient(KafkaEndpointConfig.builder()
280+
.bootstrapServers(kafkaCluster.getBootstrapServers())
281+
.build());
282+
testClient.createTopic(output);
277283

278284
runApp(app,
279285
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
280286
"--input-topics", input,
281287
"--output-topic", output
282288
);
283-
kafkaContainerHelper.send()
284-
.to(input, List.of(new KeyValue<>("foo", "bar")));
285-
final List<ConsumerRecord<String, String>> keyValues = kafkaContainerHelper.read()
286-
.from(output, Duration.ofSeconds(10));
289+
testClient.send()
290+
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
291+
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
292+
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
293+
final List<ConsumerRecord<String, String>> keyValues = testClient.read()
294+
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
295+
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
296+
.from(output, POLL_TIMEOUT);
287297
assertThat(keyValues)
288298
.hasSize(1)
289299
.anySatisfy(kv -> {

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,18 @@
2424

2525
package com.bakdata.kafka.integration;
2626

27-
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
2827
import static org.assertj.core.api.Assertions.assertThat;
2928

30-
import com.bakdata.kafka.KafkaContainerHelper;
3129
import com.bakdata.kafka.KafkaProducerApplication;
30+
import com.bakdata.kafka.KafkaTest;
31+
import com.bakdata.kafka.KafkaTestClient;
3232
import com.bakdata.kafka.ProducerApp;
3333
import com.bakdata.kafka.ProducerBuilder;
3434
import com.bakdata.kafka.ProducerRunnable;
3535
import com.bakdata.kafka.SerializerConfig;
3636
import com.bakdata.kafka.SimpleKafkaProducerApplication;
3737
import com.bakdata.kafka.TestRecord;
3838
import com.bakdata.kafka.util.ImprovedAdminClient;
39-
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
4039
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
4140
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
4241
import java.time.Duration;
@@ -45,29 +44,10 @@
4544
import org.apache.kafka.clients.producer.ProducerRecord;
4645
import org.apache.kafka.common.serialization.StringDeserializer;
4746
import org.apache.kafka.common.serialization.StringSerializer;
48-
import org.junit.jupiter.api.AfterEach;
49-
import org.junit.jupiter.api.BeforeEach;
5047
import org.junit.jupiter.api.Test;
51-
import org.testcontainers.junit.jupiter.Container;
52-
import org.testcontainers.junit.jupiter.Testcontainers;
53-
import org.testcontainers.kafka.KafkaContainer;
5448

55-
@Testcontainers
56-
class RunProducerAppTest {
49+
class RunProducerAppTest extends KafkaTest {
5750
private static final Duration TIMEOUT = Duration.ofSeconds(10);
58-
private static final String SCHEMA_REGISTRY_URL = "mock://";
59-
@Container
60-
private final KafkaContainer kafkaCluster = newKafkaCluster();
61-
62-
@BeforeEach
63-
void setup() {
64-
this.kafkaCluster.start();
65-
}
66-
67-
@AfterEach
68-
void tearDown() {
69-
this.kafkaCluster.stop();
70-
}
7151

7252
@Test
7353
void shouldRunApp() throws InterruptedException {
@@ -88,24 +68,24 @@ public SerializerConfig defaultSerializationConfig() {
8868
return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class);
8969
}
9070
})) {
91-
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
92-
app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL);
71+
app.setBootstrapServers(this.getBootstrapServers());
72+
final String schemaRegistryUrl = this.getSchemaRegistryUrl();
73+
app.setSchemaRegistryUrl(schemaRegistryUrl);
9374
app.setOutputTopic(output);
9475
app.run();
95-
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
96-
assertThat(kafkaContainerHelper.read()
76+
final KafkaTestClient testClient = this.newTestClient();
77+
assertThat(testClient.read()
9778
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
9879
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
99-
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
100-
.<String, TestRecord>from(output, TIMEOUT))
80+
.<String, TestRecord>from(output, POLL_TIMEOUT))
10181
.hasSize(1)
10282
.anySatisfy(kv -> {
10383
assertThat(kv.key()).isEqualTo("foo");
10484
assertThat(kv.value().getContent()).isEqualTo("bar");
10585
});
10686
app.clean();
10787
Thread.sleep(TIMEOUT.toMillis());
108-
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
88+
try (final ImprovedAdminClient admin = testClient.admin()) {
10989
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
11090
.as("Output topic is deleted")
11191
.isFalse();

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,63 +24,50 @@
2424

2525
package com.bakdata.kafka.integration;
2626

27-
import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS;
28-
import static com.bakdata.kafka.TestUtil.newKafkaCluster;
29-
import static java.util.Collections.emptyMap;
3027
import static org.assertj.core.api.Assertions.assertThat;
3128

32-
import com.bakdata.kafka.KafkaContainerHelper;
3329
import com.bakdata.kafka.KafkaStreamsApplication;
30+
import com.bakdata.kafka.KafkaTest;
31+
import com.bakdata.kafka.KafkaTestClient;
32+
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
3433
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
3534
import com.bakdata.kafka.test_applications.Mirror;
36-
import com.bakdata.kafka.util.ImprovedAdminClient;
37-
import java.time.Duration;
3835
import java.util.List;
3936
import java.util.Map;
4037
import org.apache.kafka.clients.consumer.ConsumerConfig;
4138
import org.apache.kafka.clients.producer.ProducerConfig;
4239
import org.apache.kafka.common.serialization.StringDeserializer;
4340
import org.apache.kafka.common.serialization.StringSerializer;
44-
import org.apache.kafka.streams.KeyValue;
4541
import org.junit.jupiter.api.Test;
4642
import org.junit.jupiter.api.extension.ExtendWith;
4743
import org.mockito.junit.jupiter.MockitoExtension;
48-
import org.testcontainers.junit.jupiter.Container;
49-
import org.testcontainers.junit.jupiter.Testcontainers;
50-
import org.testcontainers.kafka.KafkaContainer;
5144

52-
@Testcontainers
5345
@ExtendWith(MockitoExtension.class)
54-
class RunStreamsAppTest {
55-
private static final Duration TIMEOUT = Duration.ofSeconds(10);
56-
@Container
57-
private final KafkaContainer kafkaCluster = newKafkaCluster();
46+
class RunStreamsAppTest extends KafkaTest {
5847

5948
@Test
6049
void shouldRunApp() {
6150
final String input = "input";
6251
final String output = "output";
63-
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
64-
try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) {
65-
admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap());
66-
}
52+
final KafkaTestClient testClient = this.newTestClient();
53+
testClient.createTopic(output);
6754
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
68-
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
55+
app.setBootstrapServers(this.getBootstrapServers());
6956
app.setKafkaConfig(Map.of(
7057
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
7158
));
7259
app.setInputTopics(List.of(input));
7360
app.setOutputTopic(output);
7461
// run in Thread because the application blocks indefinitely
7562
new Thread(app).start();
76-
kafkaContainerHelper.send()
63+
testClient.send()
7764
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
7865
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
79-
.to(input, List.of(new KeyValue<>("foo", "bar")));
80-
assertThat(kafkaContainerHelper.read()
66+
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
67+
assertThat(testClient.read()
8168
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
8269
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
83-
.from(output, TIMEOUT))
70+
.from(output, POLL_TIMEOUT))
8471
.hasSize(1);
8572
}
8673
}

0 commit comments

Comments
 (0)