Skip to content

MINOR: Deflake streams admin api describe streams group with short timeout #20320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class DescribeStreamsGroupTest {
private static final String OUTPUT_TOPIC = "customOutputTopic";
private static final String INPUT_TOPIC_2 = "customInputTopic2";
private static final String OUTPUT_TOPIC_2 = "customOutputTopic2";
private static final String INPUT_TOPIC_3 = "customInputTopic3";
private static final String OUTPUT_TOPIC_3 = "customOutputTopic3";
private static String bootstrapServers;
@BeforeAll
public static void setup() throws Exception {
Expand All @@ -92,6 +94,7 @@ public static void setup() throws Exception {

@AfterAll
public static void closeCluster() {
cluster.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, OUTPUT_TOPIC_2, INPUT_TOPIC_3, OUTPUT_TOPIC_3);
streams.close();
cluster.stop();
cluster = null;
Expand Down Expand Up @@ -252,10 +255,17 @@ public void testDescribeNonExistingStreamsGroup() {
}

@Test
public void testDescribeStreamsGroupWithShortTimeout() {
public void testDescribeStreamsGroupWithShortTimeout() throws Exception {
cluster.createTopic(INPUT_TOPIC_2, 1, 1);
KafkaStreams streams2 = new KafkaStreams(topology(INPUT_TOPIC_2, OUTPUT_TOPIC_2), streamsProp(APP_ID_2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use try-with-resource.

startApplicationAndWaitUntilRunning(streams2);

List<String> args = List.of("--bootstrap-server", bootstrapServers, "--describe", "--members", "--verbose", "--group", APP_ID, "--timeout", "1");
Throwable e = assertThrows(ExecutionException.class, () -> getStreamsGroupService(args.toArray(new String[0])).describeGroups());
assertEquals(TimeoutException.class, e.getCause().getClass());

streams2.close();
streams2.cleanUp();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use try-with-resource, we can remove close() and put cleanUp() into finally-block.

}

private static Topology topology(String inputTopic, String outputTopic) {
Expand Down