Skip to content

Commit eadf510

Browse files
committed
BE: Expose Kafka Connect validation errors in the UI
1 parent 318bcc9 commit eadf510

File tree

3 files changed

+36
-10
lines changed

3 files changed

+36
-10
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kafbat.ui.client;
22

3+
import com.fasterxml.jackson.annotation.JsonProperty;
34
import io.kafbat.ui.config.ClustersProperties;
45
import io.kafbat.ui.connect.ApiClient;
56
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
@@ -14,9 +15,11 @@
1415
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
1516
import io.kafbat.ui.exception.ValidationException;
1617
import io.kafbat.ui.util.WebClientConfigurator;
18+
import jakarta.validation.constraints.NotNull;
1719
import java.time.Duration;
1820
import java.util.List;
1921
import java.util.Map;
22+
import java.util.Objects;
2023
import javax.annotation.Nullable;
2124
import lombok.extern.slf4j.Slf4j;
2225
import org.springframework.http.ResponseEntity;
@@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
5861

5962
private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
6063
return publisher
61-
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
62-
Mono.error(new ValidationException("Invalid configuration")))
63-
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
64-
Mono.error(new ValidationException("Invalid configuration")));
64+
.onErrorResume(WebClientResponseException.BadRequest.class,
65+
RetryingKafkaConnectClient::parseConnectErrorMessage)
66+
.onErrorResume(WebClientResponseException.InternalServerError.class,
67+
RetryingKafkaConnectClient::parseConnectErrorMessage);
68+
}
69+
70+
// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
71+
// Adding the connect runtime dependency for this single class seems excessive
72+
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
73+
}
74+
75+
private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
76+
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
77+
return Mono.error(new ValidationException(
78+
Objects.requireNonNull(errorMessage,
79+
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80+
"This should not happen according to the ConnectExceptionMapper")
81+
.message()));
6582
}
6683

6784
@Override

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.assertj.core.api.Assertions.assertThat;
55
import static org.junit.jupiter.api.Assertions.assertEquals;
66

7+
import io.kafbat.ui.api.model.ErrorResponse;
78
import io.kafbat.ui.model.ConnectorDTO;
89
import io.kafbat.ui.model.ConnectorPluginConfigDTO;
910
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
@@ -273,14 +274,22 @@ public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
273274
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
274275
LOCAL, connectName, connectorName)
275276
.bodyValue(Map.of(
276-
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
277-
"tasks.max", "invalid number",
278-
"topics", "another-topic",
279-
"file", "/tmp/test"
277+
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
278+
"tasks.max", "invalid number",
279+
"topics", "another-topic",
280+
"file", "/tmp/test"
280281
)
281282
)
282283
.exchange()
283-
.expectStatus().isBadRequest();
284+
.expectStatus().isBadRequest()
285+
.expectBody(ErrorResponse.class)
286+
.value(response -> assertThat(response.getMessage()).isEqualTo(
287+
"""
288+
Connector configuration is invalid and contains the following 2 error(s):
289+
Invalid value invalid number for configuration tasks.max: Not a number of type INT
290+
Invalid value null for configuration tasks.max: Value must be non-null
291+
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"""
292+
));
284293

285294
webTestClient.get()
286295
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",

etc/checkstyle/checkstyle.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
<module name="LineLength">
4444
<property name="fileExtensions" value="java"/>
45-
<property name="max" value="120"/>
45+
<property name="max" value="130"/>
4646
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
4747
</module>
4848

0 commit comments

Comments
 (0)