Skip to content

Commit 2026833

Browse files
lucasbruCopilot
andauthored
MINOR: Deflake and improve SmokeTestDriverIntegrationTest (#20509)
This improves the SmokeTestDriverIntegrationTest in three ways: 1) If a SmokeTestClient fails (enters a terminal ERROR state), the SmokeTestDriverIntegrationTest currently times out, because it keeps waiting for state NOT_RUNNING. This makes debugging quite difficult. This minor change makes sure to just fail the test immediately, if a SmokeTestClient enters the ERROR state. 2) If a test times out or fails prematurely, because a SmokeTestClient crashed, the SmokeTestClients aren't shut down correctly, which will affect the following test runs. Therefore, I am adding clean-up logic that running SmokeTestClients in `@AfterAll`. 3) Finally, I found that the processingThread variation of this thread triggers a subtle race condition. Since this features is currently not actively developed, I disabled those variations and created a ticket to reactivate the test. Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai <[email protected]>, Bill Bejeck <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent dd824a2 commit 2026833

File tree

2 files changed

+35
-11
lines changed

2 files changed

+35
-11
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.streams.tests.SmokeTestDriver;
2828

2929
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.AfterEach;
3031
import org.junit.jupiter.api.BeforeAll;
3132
import org.junit.jupiter.api.BeforeEach;
3233
import org.junit.jupiter.api.Tag;
@@ -46,29 +47,43 @@
4647
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
4748
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
4849
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
50+
import static org.junit.jupiter.api.Assertions.assertFalse;
4951
import static org.junit.jupiter.api.Assertions.assertTrue;
5052

5153
@Timeout(600)
5254
@Tag("integration")
5355
public class SmokeTestDriverIntegrationTest {
54-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
56+
private static EmbeddedKafkaCluster cluster = null;
5557
public TestInfo testInfo;
58+
private ArrayList<SmokeTestClient> clients = new ArrayList<>();
5659

5760
@BeforeAll
5861
public static void startCluster() throws IOException {
59-
CLUSTER.start();
62+
cluster = new EmbeddedKafkaCluster(3);
63+
cluster.start();
6064
}
6165

6266
@AfterAll
6367
public static void closeCluster() {
64-
CLUSTER.stop();
68+
cluster.stop();
69+
cluster = null;
6570
}
6671

6772
@BeforeEach
6873
public void setUp(final TestInfo testInfo) {
6974
this.testInfo = testInfo;
7075
}
7176

77+
@AfterEach
78+
public void shutDown(final TestInfo testInfo) {
79+
// Clean up clients in case the test failed or timed out
80+
for (final SmokeTestClient client : clients) {
81+
if (!client.closed() && !client.error()) {
82+
client.close();
83+
}
84+
}
85+
}
86+
7287
private static class Driver extends Thread {
7388
private final String bootstrapServers;
7489
private final int numKeys;
@@ -108,12 +123,11 @@ SmokeTestDriver.VerificationResult result() {
108123
// During the new stream added and old stream left, the stream process should still complete without issue.
109124
// We set 2 timeout condition to fail the test before passing the verification:
110125
// (1) 10 min timeout, (2) 30 tries of polling without getting any data
126+
// The processing thread variations where disabled since they triggered a race condition, see KAFKA-19696
111127
@ParameterizedTest
112128
@CsvSource({
113129
"false, true",
114-
"false, false",
115-
"true, true",
116-
"true, false"
130+
"false, false"
117131
})
118132
public void shouldWorkWithRebalance(
119133
final boolean processingThreadsEnabled,
@@ -126,11 +140,10 @@ public void shouldWorkWithRebalance(
126140
throw new AssertionError("Test called halt(). code:" + statusCode + " message:" + message);
127141
});
128142
int numClientsCreated = 0;
129-
final ArrayList<SmokeTestClient> clients = new ArrayList<>();
130143

131-
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
144+
IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics());
132145

133-
final String bootstrapServers = CLUSTER.bootstrapServers();
146+
final String bootstrapServers = cluster.bootstrapServers();
134147
final Driver driver = new Driver(bootstrapServers, 10, 1000);
135148
driver.start();
136149
System.out.println("started driver");
@@ -144,8 +157,8 @@ public void shouldWorkWithRebalance(
144157
if (streamsProtocolEnabled) {
145158
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
146159
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
147-
CLUSTER.setGroupSessionTimeout(appId, 10000);
148-
CLUSTER.setGroupHeartbeatTimeout(appId, 1000);
160+
cluster.setGroupSessionTimeout(appId, 10000);
161+
cluster.setGroupHeartbeatTimeout(appId, 1000);
149162
} else {
150163
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
151164
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
@@ -167,6 +180,7 @@ public void shouldWorkWithRebalance(
167180

168181
client.closeAsync();
169182
while (!client.closed()) {
183+
assertFalse(client.error(), "The streams application seems to have crashed.");
170184
Thread.sleep(100);
171185
}
172186
}
@@ -184,6 +198,7 @@ public void shouldWorkWithRebalance(
184198
// then, wait for them to stop
185199
for (final SmokeTestClient client : clients) {
186200
while (!client.closed()) {
201+
assertFalse(client.error(), "The streams application seems to have crashed.");
187202
Thread.sleep(100);
188203
}
189204
}

streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class SmokeTestClient extends SmokeTestUtil {
5454
private KafkaStreams streams;
5555
private boolean uncaughtException = false;
5656
private volatile boolean closed;
57+
private volatile boolean error;
5758

5859
private static void addShutdownHook(final String name, final Runnable runnable) {
5960
if (name != null) {
@@ -71,6 +72,10 @@ public boolean closed() {
7172
return closed;
7273
}
7374

75+
public boolean error() {
76+
return error;
77+
}
78+
7479
public void start(final Properties streamsProperties) {
7580
final Topology build = getTopology();
7681
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
@@ -85,6 +90,10 @@ public void start(final Properties streamsProperties) {
8590
if (newState == KafkaStreams.State.NOT_RUNNING) {
8691
closed = true;
8792
}
93+
94+
if (newState == KafkaStreams.State.ERROR) {
95+
error = true;
96+
}
8897
});
8998

9099
streams.setUncaughtExceptionHandler(e -> {

0 commit comments

Comments
 (0)