Skip to content

Commit 793c0a5

Browse files
committed
Handle error 500 by repeating the kafka connect test setUp
1 parent 62598e4 commit 793c0a5

File tree

1 file changed

+36
-25
lines changed

1 file changed

+36
-25
lines changed

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.springframework.beans.factory.annotation.Autowired;
2626
import org.springframework.core.ParameterizedTypeReference;
27+
import org.springframework.http.HttpStatus;
28+
import org.springframework.test.web.reactive.server.ExchangeResult;
2729
import org.springframework.test.web.reactive.server.WebTestClient;
2830

2931
@Slf4j
@@ -51,34 +53,35 @@ public void setUp() {
5153
boolean failed = false;
5254

5355
do {
54-
try {
55-
TimeUnit.SECONDS.sleep(1);
56-
webTestClient.get()
57-
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL,
58-
connectName)
59-
.exchange()
60-
.expectStatus().isOk();
61-
} catch (Exception e) {
62-
failed = true;
63-
System.out.println("failed to retrieve connectors: " + tries);
64-
}
56+
57+
ExchangeResult result = webTestClient.post()
58+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
59+
.bodyValue(new NewConnectorDTO()
60+
.name(connectorName)
61+
.config(Map.of(
62+
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
63+
"tasks.max", "1",
64+
"topics", "output-topic",
65+
"file", "/tmp/test",
66+
"test.password", "test-credentials")))
67+
.exchange()
68+
.expectBody()
69+
.returnResult();
70+
71+
// Kafka Connect returns an error 500 during occasional rebalances
72+
failed = result.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR;
6573
tries++;
74+
75+
if (failed) {
76+
System.out.println("Failed to setUp connector %s time(s), got status: %s".formatted(tries, result.getStatus()));
77+
try {
78+
TimeUnit.SECONDS.sleep(1);
79+
} catch (Exception e) {
80+
System.out.println("Sleep got interrupted");
81+
}
82+
}
6683
} while (failed == true && tries < limit);
6784

68-
webTestClient.post()
69-
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
70-
.bodyValue(new NewConnectorDTO()
71-
.name(connectorName)
72-
.config(Map.of(
73-
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
74-
"tasks.max", "1",
75-
"topics", "output-topic",
76-
"file", "/tmp/test",
77-
"test.password", "test-credentials"
78-
))
79-
)
80-
.exchange()
81-
.expectStatus().isOk();
8285
}
8386

8487
@AfterEach
@@ -433,6 +436,14 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
433436
@Test
434437
public void shouldResetConnectorWhenInStoppedState() {
435438

439+
webTestClient.get()
440+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
441+
LOCAL, connectName, connectorName)
442+
.exchange()
443+
.expectStatus().isOk()
444+
.expectBody(ConnectorDTO.class)
445+
.value(connector -> assertThat(connector.getStatus().getState()).isEqualTo(ConnectorStateDTO.RUNNING));
446+
436447
webTestClient.post()
437448
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/STOP",
438449
LOCAL, connectName, connectorName)

0 commit comments

Comments
 (0)