Skip to content

Commit d967deb

Browse files
authored
Use Awaitility (#273)
1 parent cc28c79 commit d967deb

File tree

16 files changed

+405
-185
lines changed

16 files changed

+405
-185
lines changed

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ junitVersion=5.11.4
1010
mockitoVersion=5.15.2
1111
assertJVersion=3.27.2
1212
log4jVersion=2.24.3
13+
awaitilityVersion=4.2.2
1314
org.gradle.jvmargs=-Xmx4096m

streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2024 bakdata
4+
* Copyright (c) 2025 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -95,7 +95,7 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
9595
private Map<String, String> kafkaConfig = emptyMap();
9696

9797
/**
98-
* <p>This methods needs to be called in the executable custom application class inheriting from
98+
* <p>This method needs to be called in the executable custom application class inheriting from
9999
* {@code KafkaApplication}.</p>
100100
* <p>This method calls System exit</p>
101101
*
@@ -109,7 +109,7 @@ public static void startApplication(final KafkaApplication<?, ?, ?, ?, ?, ?, ?>
109109
}
110110

111111
/**
112-
* <p>This methods needs to be called in the executable custom application class inheriting from
112+
* <p>This method needs to be called in the executable custom application class inheriting from
113113
* {@code KafkaApplication}.</p>
114114
*
115115
* @param app An instance of the custom application class.

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
2828
import static com.bakdata.kafka.KafkaTest.newCluster;
2929
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.awaitility.Awaitility.await;
3031

3132
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
3233
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
@@ -46,8 +47,10 @@
4647

4748
class CliTest {
4849

49-
private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
50-
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
50+
private static Thread runApp(final KafkaStreamsApplication<?> app, final String... args) {
51+
final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args));
52+
thread.start();
53+
return thread;
5154
}
5255

5356
@Test
@@ -214,7 +217,7 @@ public SerdeConfig defaultSerializationConfig() {
214217

215218
@Test
216219
@ExpectSystemExitWithStatus(1)
217-
void shouldExitWithErrorInTopology() throws InterruptedException {
220+
void shouldExitWithErrorInTopology() {
218221
final String input = "input";
219222
try (final KafkaContainer kafkaCluster = newCluster();
220223
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@@ -238,7 +241,7 @@ public SerdeConfig defaultSerializationConfig() {
238241
})) {
239242
kafkaCluster.start();
240243

241-
runApp(app,
244+
final Thread thread = runApp(app,
242245
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
243246
"--input-topics", input
244247
);
@@ -248,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() {
248251
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
249252
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
250253
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
251-
Thread.sleep(Duration.ofSeconds(10).toMillis());
254+
await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
252255
}
253256
}
254257

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import com.bakdata.kafka.util.ImprovedAdminClient;
3939
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
4040
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
41-
import java.time.Duration;
4241
import org.apache.kafka.clients.consumer.ConsumerConfig;
4342
import org.apache.kafka.clients.producer.Producer;
4443
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -47,10 +46,9 @@
4746
import org.junit.jupiter.api.Test;
4847

4948
class RunProducerAppTest extends KafkaTest {
50-
private static final Duration TIMEOUT = Duration.ofSeconds(10);
5149

5250
@Test
53-
void shouldRunApp() throws InterruptedException {
51+
void shouldRunApp() {
5452
final String output = "output";
5553
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
5654
@Override
@@ -84,7 +82,6 @@ public SerializerConfig defaultSerializationConfig() {
8482
assertThat(kv.value().getContent()).isEqualTo("bar");
8583
});
8684
app.clean();
87-
Thread.sleep(TIMEOUT.toMillis());
8885
try (final ImprovedAdminClient admin = testClient.admin()) {
8986
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
9087
.as("Output topic is deleted")

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@
3939
import org.apache.kafka.common.serialization.StringDeserializer;
4040
import org.apache.kafka.common.serialization.StringSerializer;
4141
import org.junit.jupiter.api.Test;
42-
import org.junit.jupiter.api.extension.ExtendWith;
43-
import org.mockito.junit.jupiter.MockitoExtension;
4442

45-
@ExtendWith(MockitoExtension.class)
4643
class RunStreamsAppTest extends KafkaTest {
4744

4845
@Test

streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
3434
import com.bakdata.kafka.test_applications.WordCount;
3535
import com.bakdata.kafka.util.ImprovedAdminClient;
36-
import java.time.Duration;
3736
import java.util.List;
3837
import java.util.Map;
3938
import java.util.stream.Collectors;
@@ -52,33 +51,15 @@
5251
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
5352
import org.junit.jupiter.api.Test;
5453
import org.junit.jupiter.api.extension.ExtendWith;
55-
import org.mockito.junit.jupiter.MockitoExtension;
56-
import org.mockito.junit.jupiter.MockitoSettings;
57-
import org.mockito.quality.Strictness;
5854

5955
@Slf4j
6056
@ExtendWith(SoftAssertionsExtension.class)
61-
@ExtendWith(MockitoExtension.class)
62-
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
6357
class StreamsCleanUpTest extends KafkaTest {
64-
private static final Duration TIMEOUT = Duration.ofSeconds(10);
6558
@InjectSoftAssertions
6659
private SoftAssertions softly;
6760

68-
private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
69-
runApp(app);
70-
app.stop();
71-
}
72-
73-
private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
74-
// run in Thread because the application blocks indefinitely
75-
new Thread(app).start();
76-
// Wait until stream application has consumed all data
77-
Thread.sleep(TIMEOUT.toMillis());
78-
}
79-
8061
@Test
81-
void shouldClean() throws InterruptedException {
62+
void shouldClean() {
8263
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
8364
final KafkaTestClient testClient = this.newTestClient();
8465
testClient.createTopic(app.getOutputTopic());
@@ -98,8 +79,8 @@ void shouldClean() throws InterruptedException {
9879
);
9980
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);
10081

101-
// Wait until all stream application are completely stopped before triggering cleanup
102-
Thread.sleep(TIMEOUT.toMillis());
82+
// Wait until all stream applications are completely stopped before triggering cleanup
83+
this.awaitClosed(app.createExecutableApp());
10384
app.clean();
10485

10586
try (final ImprovedAdminClient admin = testClient.admin()) {
@@ -114,7 +95,7 @@ void shouldClean() throws InterruptedException {
11495
}
11596

11697
@Test
117-
void shouldReset() throws InterruptedException {
98+
void shouldReset() {
11899
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
119100
final KafkaTestClient testClient = this.newTestClient();
120101
testClient.createTopic(app.getOutputTopic());
@@ -134,8 +115,8 @@ void shouldReset() throws InterruptedException {
134115
);
135116
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);
136117

137-
// Wait until all stream application are completely stopped before triggering cleanup
138-
Thread.sleep(TIMEOUT.toMillis());
118+
// Wait until all stream applications are completely stopped before triggering cleanup
119+
this.awaitClosed(app.createExecutableApp());
139120
app.reset();
140121

141122
try (final ImprovedAdminClient admin = testClient.admin()) {
@@ -152,21 +133,31 @@ void shouldReset() throws InterruptedException {
152133
}
153134

154135
@Test
155-
void shouldCallClose() throws InterruptedException {
136+
void shouldCallClose() {
156137
try (final CloseFlagApp app = this.createCloseFlagApplication()) {
157138
this.newTestClient().createTopic(app.getInputTopics().get(0));
158-
Thread.sleep(TIMEOUT.toMillis());
159139
this.softly.assertThat(app.isClosed()).isFalse();
160140
this.softly.assertThat(app.isAppClosed()).isFalse();
161141
app.clean();
162142
this.softly.assertThat(app.isAppClosed()).isTrue();
163143
app.setAppClosed(false);
164-
Thread.sleep(TIMEOUT.toMillis());
165144
app.reset();
166145
this.softly.assertThat(app.isAppClosed()).isTrue();
167146
}
168147
}
169148

149+
private void runAppAndClose(final KafkaStreamsApplication<?> app) {
150+
this.runApp(app);
151+
app.stop();
152+
}
153+
154+
private void runApp(final KafkaStreamsApplication<?> app) {
155+
// run in Thread because the application blocks indefinitely
156+
new Thread(app).start();
157+
// Wait until stream application has consumed all data
158+
this.awaitProcessing(app.createExecutableApp());
159+
}
160+
170161
private CloseFlagApp createCloseFlagApplication() {
171162
final CloseFlagApp app = new CloseFlagApp();
172163
app.setInputTopics(List.of("input"));
@@ -185,9 +176,8 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
185176
}
186177

187178
private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
188-
final String description, final KafkaStreamsApplication<?> app)
189-
throws InterruptedException {
190-
runAppAndClose(app);
179+
final String description, final KafkaStreamsApplication<?> app) {
180+
this.runAppAndClose(app);
191181

192182
final List<KeyValue<String, Long>> output = this.readOutputTopic(app.getOutputTopic());
193183
this.softly.assertThat(output)

streams-bootstrap-core/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ dependencies {
3838
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
3939
val log4jVersion: String by project
4040
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
41+
val awaitilityVersion: String by project
42+
testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion)
4143
}
4244

4345
tasks.withType<Test> {

streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
import lombok.extern.slf4j.Slf4j;
3737
import org.apache.kafka.clients.admin.Admin;
3838
import org.apache.kafka.clients.admin.AdminClient;
39+
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
3940
import org.apache.kafka.clients.admin.ConsumerGroupListing;
41+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
42+
import org.apache.kafka.common.TopicPartition;
4043
import org.apache.kafka.common.errors.GroupIdNotFoundException;
4144

4245
/**
@@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) {
6871
return new KafkaAdminException("Failed to list consumer groups", ex);
6972
}
7073

74+
private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) {
75+
return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex);
76+
}
77+
78+
private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) {
79+
return new KafkaAdminException("Failed to describe consumer group" + groupName, ex);
80+
}
81+
7182
/**
7283
* Delete a consumer group.
7384
*
@@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) {
93104
}
94105
}
95106

107+
/**
108+
* Describe a consumer group.
109+
*
110+
* @param groupName the consumer group name
111+
* @return consumer group description
112+
*/
113+
public ConsumerGroupDescription describe(final String groupName) {
114+
log.info("Describing consumer group '{}'", groupName);
115+
try {
116+
final ConsumerGroupDescription description =
117+
this.adminClient.describeConsumerGroups(List.of(groupName))
118+
.all()
119+
.get(this.timeout.toSeconds(), TimeUnit.SECONDS)
120+
.get(groupName);
121+
log.info("Described consumer group '{}'", groupName);
122+
return description;
123+
} catch (final InterruptedException ex) {
124+
Thread.currentThread().interrupt();
125+
throw failedToDescribeGroup(groupName, ex);
126+
} catch (final ExecutionException ex) {
127+
if (ex.getCause() instanceof RuntimeException) {
128+
throw (RuntimeException) ex.getCause();
129+
}
130+
throw failedToDescribeGroup(groupName, ex);
131+
} catch (final TimeoutException ex) {
132+
throw failedToDescribeGroup(groupName, ex);
133+
}
134+
}
135+
136+
/**
137+
* List offsets for a consumer group.
138+
*
139+
* @param groupName the consumer group name
140+
* @return consumer group offsets
141+
*/
142+
public Map<TopicPartition, OffsetAndMetadata> listOffsets(final String groupName) {
143+
log.info("Listing offsets for consumer group '{}'", groupName);
144+
try {
145+
final Map<TopicPartition, OffsetAndMetadata> offsets =
146+
this.adminClient.listConsumerGroupOffsets(groupName)
147+
.partitionsToOffsetAndMetadata(groupName)
148+
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
149+
log.info("Listed offsets for consumer group '{}'", groupName);
150+
return offsets;
151+
} catch (final InterruptedException ex) {
152+
Thread.currentThread().interrupt();
153+
throw failedToListOffsets(groupName, ex);
154+
} catch (final ExecutionException ex) {
155+
if (ex.getCause() instanceof RuntimeException) {
156+
throw (RuntimeException) ex.getCause();
157+
}
158+
throw failedToListOffsets(groupName, ex);
159+
} catch (final TimeoutException ex) {
160+
throw failedToListOffsets(groupName, ex);
161+
}
162+
}
163+
96164
@Override
97165
public void close() {
98166
this.adminClient.close();

0 commit comments

Comments
 (0)