Skip to content

Commit 77b5a70

Browse files
committed
adapt to connect retry mechanism
1 parent cb06d48 commit 77b5a70

File tree

2 files changed

+7
-11
lines changed

2 files changed

+7
-11
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,12 @@ public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorNa
277277
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
278278
}
279279

280+
@Override
281+
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String connectorName)
282+
throws WebClientResponseException {
283+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
284+
}
285+
280286
@Override
281287
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
282288
Map<String, Object> requestBody)

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,10 @@ public void setUp() {
5959
"file", "/tmp/test",
6060
"test.password", "test-credentials")))
6161
.exchange()
62+
.expectStatus().isOk()
6263
.expectBody()
6364
.returnResult();
6465

65-
webTestClient.get()
66-
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
67-
LOCAL, connectName, connectorName)
68-
.exchange()
69-
.expectStatus().isOk();
70-
71-
// Kafka Connect may return transient HTTP 500 errors during rebalances
72-
if (creationResult.getStatus() != HttpStatus.OK) {
73-
log.warn(
74-
"Ignoring a transient error while setting up the tested connector, because it has been created anyway.");
75-
}
7666
}
7767

7868
@AfterEach

0 commit comments

Comments
 (0)