diff --git a/.dev/dev_arm64.yaml b/.dev/dev_arm64.yaml
index 220140d3d..dc1a8726e 100644
--- a/.dev/dev_arm64.yaml
+++ b/.dev/dev_arm64.yaml
@@ -32,7 +32,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
kafka0:
- image: confluentinc/cp-kafka:7.6.0.arm64
+ image: confluentinc/cp-kafka:7.8.0.arm64
user: "0:0"
hostname: kafka0
container_name: kafka0
@@ -60,7 +60,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry0:
- image: confluentinc/cp-schema-registry:7.6.0.arm64
+ image: confluentinc/cp-schema-registry:7.8.0.arm64
ports:
- 8085:8085
depends_on:
@@ -76,7 +76,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
kafka-connect0:
- image: confluentinc/cp-kafka-connect:7.6.0.arm64
+ image: confluentinc/cp-kafka-connect:7.8.0.arm64
ports:
- 8083:8083
depends_on:
@@ -101,7 +101,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
ksqldb0:
- image: confluentinc/cp-ksqldb-server:7.6.0.arm64
+ image: confluentinc/cp-ksqldb-server:7.8.0.arm64
depends_on:
- kafka0
- kafka-connect0
@@ -119,7 +119,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
kafka-init-topics:
- image: confluentinc/cp-kafka:7.6.0.arm64
+ image: confluentinc/cp-kafka:7.8.0.arm64
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
diff --git a/.gitignore b/.gitignore
index efd6a9749..51efbef39 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,3 +42,4 @@ build/
*.tgz
/docker/*.override.yaml
+/e2e-tests/allure-results/
diff --git a/api/pom.xml b/api/pom.xml
index 3f7c044d0..dc774c09a 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -50,7 +50,10 @@
org.apache.kafka
kafka-clients
- ${kafka-clients.version}
+
+ ${confluent.version}-ccs
org.apache.commons
diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
index 72ab7386a..474a0c159 100644
--- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
+++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
@@ -22,6 +22,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
(WebClientResponseException.Conflict) signal.failure()));
}
- private static Mono withRetryOnConflict(Mono publisher) {
- return publisher.retryWhen(conflictCodeRetry());
+ private static @NotNull Retry retryOnRebalance() {
+ return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {
+
+ if (e instanceof WebClientResponseException.InternalServerError exception) {
+ final var errorMessage = getMessage(exception);
+ return StringUtils.equals(errorMessage,
+ // From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
+ "Request cannot be completed because a rebalance is expected");
+ }
+ return false;
+ });
+ }
+
+ private static Mono withRetryOnConflictOrRebalance(Mono publisher) {
+ return publisher
+ .retryWhen(retryOnRebalance())
+ .retryWhen(conflictCodeRetry());
+ }
+
+ private static Flux withRetryOnConflictOrRebalance(Flux publisher) {
+ return publisher
+ .retryWhen(retryOnRebalance())
+ .retryWhen(conflictCodeRetry());
}
- private static Flux withRetryOnConflict(Flux publisher) {
- return publisher.retryWhen(conflictCodeRetry());
+ private static Mono withRetryOnRebalance(Mono publisher) {
+ return publisher.retryWhen(retryOnRebalance());
}
+
private static Mono withBadRequestErrorHandling(Mono publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class,
@@ -73,18 +96,21 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}
private static @NotNull Mono parseConnectErrorMessage(WebClientResponseException parseException) {
+ return Mono.error(new ValidationException(getMessage(parseException)));
+ }
+
+ private static String getMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
- return Mono.error(new ValidationException(
- Objects.requireNonNull(errorMessage,
- // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
- "This should not happen according to the ConnectExceptionMapper")
- .message()));
+ return Objects.requireNonNull(errorMessage,
+ // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
+ "This should not happen according to the ConnectExceptionMapper")
+ .message();
}
@Override
public Mono createConnector(NewConnector newConnector) throws RestClientException {
return withBadRequestErrorHandling(
- super.createConnector(newConnector)
+ withRetryOnRebalance(super.createConnector(newConnector))
);
}
@@ -92,178 +118,178 @@ public Mono createConnector(NewConnector newConnector) throws RestCli
public Mono setConnectorConfig(String connectorName, Map requestBody)
throws RestClientException {
return withBadRequestErrorHandling(
- super.setConnectorConfig(connectorName, requestBody)
+ withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
);
}
@Override
public Mono> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
- return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
+ return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
}
@Override
public Mono deleteConnector(String connectorName) throws WebClientResponseException {
- return withRetryOnConflict(super.deleteConnector(connectorName));
+ return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
}
@Override
public Mono> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
- return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
+ return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
}
@Override
public Mono getConnector(String connectorName) throws WebClientResponseException {
- return withRetryOnConflict(super.getConnector(connectorName));
+ return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
}
@Override
public Mono> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
- return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
+ return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
}
@Override
public Mono