Skip to content

Commit 4a499e4

Browse files
committed
Optimistic creation of test connectors. Check existence and ignore create errors.
1 parent 793c0a5 commit 4a499e4

File tree

1 file changed

+24
-33
lines changed

1 file changed

+24
-33
lines changed

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,40 +48,31 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {
4848
@BeforeEach
4949
public void setUp() {
5050

51-
int limit = 5;
52-
int tries = 0;
53-
boolean failed = false;
54-
55-
do {
56-
57-
ExchangeResult result = webTestClient.post()
58-
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
59-
.bodyValue(new NewConnectorDTO()
60-
.name(connectorName)
61-
.config(Map.of(
62-
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
63-
"tasks.max", "1",
64-
"topics", "output-topic",
65-
"file", "/tmp/test",
66-
"test.password", "test-credentials")))
67-
.exchange()
68-
.expectBody()
69-
.returnResult();
70-
71-
// Kafka Connect returns an error 500 during occasional rebalances
72-
failed = result.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR;
73-
tries++;
74-
75-
if (failed) {
76-
System.out.println("Failed to setUp connector %s time(s), got status: %s".formatted(tries, result.getStatus()));
77-
try {
78-
TimeUnit.SECONDS.sleep(1);
79-
} catch (Exception e) {
80-
System.out.println("Sleep got interrupted");
81-
}
82-
}
83-
} while (failed == true && tries < limit);
51+
ExchangeResult creationResult = webTestClient.post()
52+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
53+
.bodyValue(new NewConnectorDTO()
54+
.name(connectorName)
55+
.config(Map.of(
56+
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
57+
"tasks.max", "1",
58+
"topics", "output-topic",
59+
"file", "/tmp/test",
60+
"test.password", "test-credentials")))
61+
.exchange()
62+
.expectBody()
63+
.returnResult();
64+
65+
webTestClient.get()
66+
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
67+
LOCAL, connectName, connectorName)
68+
.exchange()
69+
.expectStatus().isOk();
8470

71+
// Kafka Connect may return transient HTTP 500 errors during rebalances
72+
if (creationResult.getStatus() != HttpStatus.OK) {
73+
log.warn(
74+
"Ignoring a transient error while setting up the tested connector, because it has been created anyway.");
75+
}
8576
}
8677

8778
@AfterEach

0 commit comments

Comments
 (0)