Skip to content

Commit 0b1498b

Browse files
authored
Merge branch 'main' into k-diger/#675
2 parents cbec569 + d507a9f commit 0b1498b

29 files changed

+149
-111
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kafbat.ui.client;
22

3+
import com.fasterxml.jackson.annotation.JsonProperty;
34
import io.kafbat.ui.config.ClustersProperties;
45
import io.kafbat.ui.connect.ApiClient;
56
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
@@ -14,9 +15,11 @@
1415
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
1516
import io.kafbat.ui.exception.ValidationException;
1617
import io.kafbat.ui.util.WebClientConfigurator;
18+
import jakarta.validation.constraints.NotNull;
1719
import java.time.Duration;
1820
import java.util.List;
1921
import java.util.Map;
22+
import java.util.Objects;
2023
import javax.annotation.Nullable;
2124
import lombok.extern.slf4j.Slf4j;
2225
import org.springframework.http.ResponseEntity;
@@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
5861

5962
private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
6063
return publisher
61-
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
62-
Mono.error(new ValidationException("Invalid configuration")))
63-
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
64-
Mono.error(new ValidationException("Invalid configuration")));
64+
.onErrorResume(WebClientResponseException.BadRequest.class,
65+
RetryingKafkaConnectClient::parseConnectErrorMessage)
66+
.onErrorResume(WebClientResponseException.InternalServerError.class,
67+
RetryingKafkaConnectClient::parseConnectErrorMessage);
68+
}
69+
70+
// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
71+
// Adding the connect runtime dependency for this single class seems excessive
72+
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
73+
}
74+
75+
private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
76+
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
77+
return Mono.error(new ValidationException(
78+
Objects.requireNonNull(errorMessage,
79+
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80+
"This should not happen according to the ConnectExceptionMapper")
81+
.message()));
6582
}
6683

6784
@Override

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.assertj.core.api.Assertions.assertThat;
55
import static org.junit.jupiter.api.Assertions.assertEquals;
66

7+
import io.kafbat.ui.api.model.ErrorResponse;
78
import io.kafbat.ui.model.ConnectorDTO;
89
import io.kafbat.ui.model.ConnectorPluginConfigDTO;
910
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
@@ -166,7 +167,7 @@ public void shouldReturnNotFoundForNonExistingConnectName() {
166167

167168
@Test
168169
public void shouldRetrieveConnector() {
169-
ConnectorDTO expected = (ConnectorDTO) new ConnectorDTO()
170+
ConnectorDTO expected = new ConnectorDTO()
170171
.connect(connectName)
171172
.status(new ConnectorStatusDTO()
172173
.state(ConnectorStateDTO.RUNNING)
@@ -268,19 +269,28 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
268269

269270

270271
@Test
272+
@SuppressWarnings("checkstyle:LineLength")
271273
public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
272274
webTestClient.put()
273275
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
274276
LOCAL, connectName, connectorName)
275277
.bodyValue(Map.of(
276-
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
277-
"tasks.max", "invalid number",
278-
"topics", "another-topic",
279-
"file", "/tmp/test"
278+
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
279+
"tasks.max", "invalid number",
280+
"topics", "another-topic",
281+
"file", "/tmp/test"
280282
)
281283
)
282284
.exchange()
283-
.expectStatus().isBadRequest();
285+
.expectStatus().isBadRequest()
286+
.expectBody(ErrorResponse.class)
287+
.value(response -> assertThat(response.getMessage()).isEqualTo(
288+
"""
289+
Connector configuration is invalid and contains the following 2 error(s):
290+
Invalid value invalid number for configuration tasks.max: Not a number of type INT
291+
Invalid value null for configuration tasks.max: Value must be non-null
292+
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"""
293+
));
284294

285295
webTestClient.get()
286296
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
@@ -383,7 +393,7 @@ public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
383393
.map(ConnectorPluginConfigDTO::getValue)
384394
.map(ConnectorPluginConfigValueDTO::getErrors)
385395
.filter(not(List::isEmpty))
386-
.findFirst().get();
396+
.findFirst().orElseThrow();
387397
assertEquals(
388398
"Invalid value 0 for configuration tasks.max: Value must be at least 1",
389399
error.get(0)

api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kafbat.ui.producer.KafkaTestProducer;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.Objects;
1617
import java.util.UUID;
1718
import java.util.stream.Stream;
1819
import lombok.extern.slf4j.Slf4j;
@@ -55,15 +56,17 @@ public void shouldDeleteRecords() {
5556
throw new RuntimeException(e);
5657
}
5758

58-
long count = webTestClient.get()
59-
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
60-
.accept(TEXT_EVENT_STREAM)
61-
.exchange()
62-
.expectStatus()
63-
.isOk()
64-
.expectBodyList(TopicMessageEventDTO.class)
65-
.returnResult()
66-
.getResponseBody()
59+
long count = Objects.requireNonNull(
60+
webTestClient.get()
61+
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
62+
.accept(TEXT_EVENT_STREAM)
63+
.exchange()
64+
.expectStatus()
65+
.isOk()
66+
.expectBodyList(TopicMessageEventDTO.class)
67+
.returnResult()
68+
.getResponseBody()
69+
)
6770
.stream()
6871
.filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
6972
.count();
@@ -76,14 +79,16 @@ public void shouldDeleteRecords() {
7679
.expectStatus()
7780
.isOk();
7881

79-
count = webTestClient.get()
80-
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
81-
.exchange()
82-
.expectStatus()
83-
.isOk()
84-
.expectBodyList(TopicMessageEventDTO.class)
85-
.returnResult()
86-
.getResponseBody()
82+
count = Objects.requireNonNull(
83+
webTestClient.get()
84+
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
85+
.exchange()
86+
.expectStatus()
87+
.isOk()
88+
.expectBodyList(TopicMessageEventDTO.class)
89+
.returnResult()
90+
.getResponseBody()
91+
)
8792
.stream()
8893
.filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
8994
.count();
@@ -120,7 +125,7 @@ public void shouldIncreasePartitionsUpTo10() {
120125
.returnResult()
121126
.getResponseBody();
122127

123-
assert response != null;
128+
Assertions.assertNotNull(response);
124129
Assertions.assertEquals(10, response.getTotalPartitionsCount());
125130

126131
TopicDetailsDTO topicDetails = webTestClient.get()
@@ -134,7 +139,7 @@ public void shouldIncreasePartitionsUpTo10() {
134139
.returnResult()
135140
.getResponseBody();
136141

137-
assert topicDetails != null;
142+
Assertions.assertNotNull(topicDetails);
138143
Assertions.assertEquals(10, topicDetails.getPartitionCount());
139144
}
140145

@@ -157,8 +162,6 @@ public void shouldReturn404ForNonExistingTopic() {
157162

158163
@Test
159164
public void shouldReturnConfigsForBroker() {
160-
var topicName = UUID.randomUUID().toString();
161-
162165
List<BrokerConfigDTO> configs = webTestClient.get()
163166
.uri("/api/clusters/{clusterName}/brokers/{id}/configs",
164167
LOCAL,
@@ -171,7 +174,7 @@ public void shouldReturnConfigsForBroker() {
171174
.getResponseBody();
172175

173176
Assertions.assertNotNull(configs);
174-
assert !configs.isEmpty();
177+
Assertions.assertFalse(configs.isEmpty());
175178
Assertions.assertNotNull(configs.get(0).getName());
176179
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
177180
Assertions.assertNotNull(configs.get(0).getIsSensitive());
@@ -216,7 +219,7 @@ public void shouldRetrieveTopicConfig() {
216219
.getResponseBody();
217220

218221
Assertions.assertNotNull(configs);
219-
assert !configs.isEmpty();
222+
Assertions.assertFalse(configs.isEmpty());
220223
Assertions.assertNotNull(configs.get(0).getName());
221224
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
222225
Assertions.assertNotNull(configs.get(0).getIsSensitive());

api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.azure.core.credential.AccessToken;
1818
import com.azure.core.credential.TokenCredential;
1919
import com.azure.core.credential.TokenRequestContext;
20-
import java.util.HashMap;
2120
import java.util.List;
2221
import java.util.Map;
2322
import javax.security.auth.callback.Callback;
@@ -59,9 +58,6 @@ public class AzureEntraLoginCallbackHandlerTest {
5958
@Mock
6059
private OAuthBearerTokenCallback oauthBearerTokenCallBack;
6160

62-
@Mock
63-
private OAuthBearerToken oauthBearerToken;
64-
6561
@Mock
6662
private TokenCredential tokenCredential;
6763

@@ -77,12 +73,8 @@ public void beforeEach() {
7773
}
7874

7975
@Test
80-
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
81-
throws UnsupportedCallbackException {
82-
final Map<String, Object> configs = new HashMap<>();
83-
configs.put(
84-
"bootstrap.servers",
85-
List.of("test-eh.servicebus.windows.net:9093"));
76+
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest() throws UnsupportedCallbackException {
77+
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093"));
8678

8779
when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken));
8880
when(accessToken.getToken()).thenReturn(VALID_SAMPLE_TOKEN);
@@ -114,10 +106,7 @@ public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
114106

115107
@Test
116108
public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException {
117-
final Map<String, Object> configs = new HashMap<>();
118-
configs.put(
119-
"bootstrap.servers",
120-
List.of("test-eh.servicebus.windows.net:9093"));
109+
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093"));
121110

122111
when(tokenCredential.getToken(any(TokenRequestContext.class)))
123112
.thenThrow(new RuntimeException("failed to acquire token"));
@@ -136,16 +125,13 @@ public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallb
136125

137126
@Test
138127
public void shouldThrowExceptionWithNullBootstrapServers() {
139-
final Map<String, Object> configs = new HashMap<>();
140-
141128
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
142-
configs, null, null));
129+
Map.of(), null, null));
143130
}
144131

145132
@Test
146133
public void shouldThrowExceptionWithMultipleBootstrapServers() {
147-
final Map<String, Object> configs = new HashMap<>();
148-
configs.put("bootstrap.servers", List.of("server1", "server2"));
134+
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("server1", "server2"));
149135

150136
assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
151137
configs, null, null));

api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public class AzureEntraOAuthBearerTokenTest {
3939
void constructorShouldParseToken() {
4040
final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN);
4141

42-
final AzureEntraOAuthBearerToken azureOAuthBearerToken =
43-
new AzureEntraOAuthBearerToken(accessToken);
42+
final AzureEntraOAuthBearerToken azureOAuthBearerToken = new AzureEntraOAuthBearerToken(accessToken);
4443

4544
assertThat(azureOAuthBearerToken, is(notNullValue()));
4645
assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN));

api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ public KafkaConnectContainer withKafka(Network network, String bootstrapServers)
4747
}
4848

4949
public String getTarget() {
50-
return "http://" + getContainerIpAddress() + ":" + getMappedPort(CONNECT_PORT);
50+
return "http://" + getHost() + ":" + getMappedPort(CONNECT_PORT);
5151
}
5252
}

api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ private KsqlDbContainer withKafka(Network network, String bootstrapServers) {
3434
}
3535

3636
public String url() {
37-
return "http://" + getContainerIpAddress() + ":" + getMappedPort(PORT);
37+
return "http://" + getHost() + ":" + getMappedPort(PORT);
3838
}
3939
}

api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public SchemaRegistryContainer withKafka(Network network, String bootstrapServer
2828
}
2929

3030
public String getUrl() {
31-
return "http://" + getContainerIpAddress() + ":" + getMappedPort(SCHEMA_PORT);
31+
return "http://" + getHost() + ":" + getMappedPort(SCHEMA_PORT);
3232
}
3333

3434
public SchemaRegistryClient schemaRegistryClient() {

api/src/test/java/io/kafbat/ui/emitter/CursorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private Cursor assertCursor(PollingModeDTO expectedMode,
9999

100100
var cursorPosition = registeredCursor.consumerPosition();
101101
assertThat(cursorPosition).isNotNull();
102+
assertThat(cursorPosition.offsets()).isNotNull();
102103
assertThat(cursorPosition.topic()).isEqualTo(TOPIC);
103104
assertThat(cursorPosition.partitions()).isEqualTo(List.of());
104105
assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
@@ -111,6 +112,7 @@ private void waitMgsgEmitted(AbstractEmitter emitter, int expectedMsgsCnt) {
111112
List<TopicMessageEventDTO> events = Flux.create(emitter)
112113
.collectList()
113114
.block();
115+
assertThat(events).isNotNull();
114116
assertThat(events.stream().filter(m -> m.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE).count())
115117
.isEqualTo(expectedMsgsCnt);
116118
}

api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private Flux<TopicMessageEventDTO> createTailingFlux(
106106
String query) {
107107
var cluster = applicationContext.getBean(ClustersStorage.class)
108108
.getClusterByName(LOCAL)
109-
.get();
109+
.orElseThrow();
110110

111111
return applicationContext.getBean(MessagesService.class)
112112
.loadMessages(cluster, topicName,

0 commit comments

Comments
 (0)