Skip to content

Commit ca8c426

Browse files
authored
Merge pull request #48643 from pcasaes/pc/dont-wait-topics-if-check-disabled
Do not wait for topics to created if topic check disabled
2 parents 694d856 + 577b68d commit ca8c426

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public Set<String> getMissingTopics() throws InterruptedException {
136136
}
137137

138138
public void waitForTopicsToBeCreated() throws InterruptedException {
139+
if (!isTopicsCheckEnabled()) {
140+
return;
141+
}
139142
Set<String> lastMissingTopics = null;
140143
while (!closed) {
141144
try {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.quarkus.kafka.streams.runtime;
2+
3+
import static org.mockito.ArgumentMatchers.any;
4+
import static org.mockito.ArgumentMatchers.anyLong;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
import static org.mockito.Mockito.when;
10+
11+
import java.time.Duration;
12+
import java.util.List;
13+
import java.util.Optional;
14+
import java.util.Set;
15+
16+
import org.apache.kafka.clients.admin.AdminClient;
17+
import org.apache.kafka.clients.admin.ListTopicsResult;
18+
import org.apache.kafka.common.KafkaFuture;
19+
import org.apache.kafka.streams.Topology;
20+
import org.junit.jupiter.api.Test;
21+
22+
class KafkaStreamsTopologyManagerTest {
23+
24+
@Test
25+
void waitForTopicsWhenCheckWhenIsDisabledShouldNotInteractWithAdminClient() throws Exception {
26+
AdminClient adminClient = mock(AdminClient.class);
27+
KafkaStreamsRuntimeConfig config = mock(KafkaStreamsRuntimeConfig.class);
28+
Topology topology = mock(Topology.class);
29+
30+
// GIVEN a KafkaStreamsTopologyManager with topics check disabled
31+
when(config.topicsTimeout()).thenReturn(Duration.ZERO);
32+
KafkaStreamsTopologyManager manager = new KafkaStreamsTopologyManager(
33+
adminClient,
34+
topology,
35+
config);
36+
37+
// WHEN waiting for topics to be created
38+
manager.waitForTopicsToBeCreated();
39+
40+
// THEN it should not interact with the admin client
41+
verify(adminClient, never()).listTopics();
42+
}
43+
44+
@Test
45+
void waitForTopicsWhenCheckWhenIsEnabledShouldInteractWithAdminClient() throws Exception {
46+
AdminClient adminClient = mock(AdminClient.class);
47+
KafkaStreamsRuntimeConfig config = mock(KafkaStreamsRuntimeConfig.class);
48+
Topology topology = mock(Topology.class);
49+
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
50+
KafkaFuture<Set<String>> namesFuture = mock(KafkaFuture.class);
51+
52+
// GIVEN a KafkaStreamsTopologyManager with topics check enabled expecting topic `topic1`
53+
// AND an admin client that returns `topic1` immediately
54+
String expectedTopic = "topic1";
55+
when(config.topicsTimeout()).thenReturn(Duration.ofSeconds(30));
56+
when(config.topics()).thenReturn(Optional.of(List.of(expectedTopic)));
57+
58+
when(adminClient.listTopics()).thenReturn(listTopicsResult);
59+
when(listTopicsResult.names()).thenReturn(namesFuture);
60+
when(namesFuture.get(anyLong(), any())).thenReturn(Set.of(expectedTopic));
61+
62+
KafkaStreamsTopologyManager manager = new KafkaStreamsTopologyManager(
63+
adminClient,
64+
topology,
65+
config);
66+
67+
// WHEN waiting for topics to be created
68+
manager.waitForTopicsToBeCreated();
69+
70+
// THEN it should interact with the admin client
71+
verify(adminClient, times(1)).listTopics();
72+
}
73+
}

0 commit comments

Comments
 (0)