Skip to content

Commit daa6cb7

Browse files
committed
Merge remote-tracking branch 'origin/main' into chore/rbac_ad
2 parents 514c053 + d093752 commit daa6cb7

File tree

68 files changed

+1701
-450
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1701
-450
lines changed

.dev/dev_arm64.yaml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ services:
3232
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
3333

3434
kafka0:
35-
image: confluentinc/cp-kafka:7.6.0.arm64
35+
image: confluentinc/cp-kafka:7.8.0.arm64
3636
user: "0:0"
3737
hostname: kafka0
3838
container_name: kafka0
@@ -60,7 +60,7 @@ services:
6060
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
6161

6262
schema-registry0:
63-
image: confluentinc/cp-schema-registry:7.6.0.arm64
63+
image: confluentinc/cp-schema-registry:7.8.0.arm64
6464
ports:
6565
- 8085:8085
6666
depends_on:
@@ -76,7 +76,7 @@ services:
7676
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
7777

7878
kafka-connect0:
79-
image: confluentinc/cp-kafka-connect:7.6.0.arm64
79+
image: confluentinc/cp-kafka-connect:7.8.0.arm64
8080
ports:
8181
- 8083:8083
8282
depends_on:
@@ -101,7 +101,7 @@ services:
101101
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
102102

103103
ksqldb0:
104-
image: confluentinc/cp-ksqldb-server:7.6.0.arm64
104+
image: confluentinc/cp-ksqldb-server:7.8.0.arm64
105105
depends_on:
106106
- kafka0
107107
- kafka-connect0
@@ -119,7 +119,7 @@ services:
119119
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
120120

121121
kafka-init-topics:
122-
image: confluentinc/cp-kafka:7.6.0.arm64
122+
image: confluentinc/cp-kafka:7.8.0.arm64
123123
volumes:
124124
- ../documentation/compose/data/message.json:/data/message.json
125125
depends_on:

.github/workflows/frontend_tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323

2424
- uses: pnpm/[email protected]
2525
with:
26-
version: 9.11.0
26+
version: 9.15.0
2727

2828
- name: Install node
2929
uses: actions/[email protected]

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ build/
4242
*.tgz
4343

4444
/docker/*.override.yaml
45+
/e2e-tests/allure-results/

.java-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
21

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
1515

1616
<p align="center">
1717
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
18-
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
18+
<a href="https://ui.docs.kafbat.io/quick-start/demo-run">Quick Start</a> •
1919
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
2020
<br/>
21-
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
2221
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
2322
</p>
2423

@@ -28,7 +27,7 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
2827

2928
#### Kafbat UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.
3029

31-
Kafbat UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
30+
[Kafbat UI](https://kafbat.io/) is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
3231

3332
<i>
3433
Kafbat UI, developed by <b>Kafbat</b>*, proudly carries forward the legacy of the UI Apache Kafka project.

api/pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@
5050
<dependency>
5151
<groupId>org.apache.kafka</groupId>
5252
<artifactId>kafka-clients</artifactId>
53-
<version>${kafka-clients.version}</version>
53+
<!-- ccs stands for Confluent Community Edition
54+
See https://www.confluent.io/confluent-community-license-faq/
55+
-->
56+
<version>${confluent.version}-ccs</version>
5457
</dependency>
5558
<dependency>
5659
<groupId>org.apache.commons</groupId>

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Objects;
2323
import javax.annotation.Nullable;
2424
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.springframework.http.ResponseEntity;
2627
import org.springframework.util.unit.DataSize;
2728
import org.springframework.web.client.RestClientException;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
5152
(WebClientResponseException.Conflict) signal.failure()));
5253
}
5354

54-
private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
55-
return publisher.retryWhen(conflictCodeRetry());
55+
private static @NotNull Retry retryOnRebalance() {
56+
return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {
57+
58+
if (e instanceof WebClientResponseException.InternalServerError exception) {
59+
final var errorMessage = getMessage(exception);
60+
return StringUtils.equals(errorMessage,
61+
// From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
62+
"Request cannot be completed because a rebalance is expected");
63+
}
64+
return false;
65+
});
66+
}
67+
68+
private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
69+
return publisher
70+
.retryWhen(retryOnRebalance())
71+
.retryWhen(conflictCodeRetry());
72+
}
73+
74+
private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
75+
return publisher
76+
.retryWhen(retryOnRebalance())
77+
.retryWhen(conflictCodeRetry());
5678
}
5779

58-
private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
59-
return publisher.retryWhen(conflictCodeRetry());
80+
private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
81+
return publisher.retryWhen(retryOnRebalance());
6082
}
6183

84+
6285
private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
6386
return publisher
6487
.onErrorResume(WebClientResponseException.BadRequest.class,
@@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
7396
}
7497

7598
private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
99+
return Mono.error(new ValidationException(getMessage(parseException)));
100+
}
101+
102+
private static String getMessage(WebClientResponseException parseException) {
76103
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
77-
return Mono.error(new ValidationException(
78-
Objects.requireNonNull(errorMessage,
79-
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80-
"This should not happen according to the ConnectExceptionMapper")
81-
.message()));
104+
return Objects.requireNonNull(errorMessage,
105+
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
106+
"This should not happen according to the ConnectExceptionMapper")
107+
.message();
82108
}
83109

84110
@Override
85111
public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
86112
return withBadRequestErrorHandling(
87-
super.createConnector(newConnector)
113+
withRetryOnRebalance(super.createConnector(newConnector))
88114
);
89115
}
90116

91117
@Override
92118
public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
93119
throws RestClientException {
94120
return withBadRequestErrorHandling(
95-
super.setConnectorConfig(connectorName, requestBody)
121+
withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
96122
);
97123
}
98124

99125
@Override
100126
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
101127
throws WebClientResponseException {
102-
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
128+
return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
103129
}
104130

105131
@Override
106132
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
107-
return withRetryOnConflict(super.deleteConnector(connectorName));
133+
return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
108134
}
109135

110136
@Override
111137
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
112138
throws WebClientResponseException {
113-
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
139+
return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
114140
}
115141

116142

117143
@Override
118144
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
119-
return withRetryOnConflict(super.getConnector(connectorName));
145+
return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
120146
}
121147

122148
@Override
123149
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
124150
throws WebClientResponseException {
125-
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
151+
return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
126152
}
127153

128154
@Override
129155
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
130-
return withRetryOnConflict(super.getConnectorConfig(connectorName));
156+
return withRetryOnConflictOrRebalance(super.getConnectorConfig(connectorName));
131157
}
132158

133159
@Override
134160
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
135161
throws WebClientResponseException {
136-
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
162+
return withRetryOnConflictOrRebalance(super.getConnectorConfigWithHttpInfo(connectorName));
137163
}
138164

139165
@Override
140166
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
141-
return withRetryOnConflict(super.getConnectorPlugins());
167+
return withRetryOnConflictOrRebalance(super.getConnectorPlugins());
142168
}
143169

144170
@Override
145171
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
146172
throws WebClientResponseException {
147-
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
173+
return withRetryOnConflictOrRebalance(super.getConnectorPluginsWithHttpInfo());
148174
}
149175

150176
@Override
151177
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
152-
return withRetryOnConflict(super.getConnectorStatus(connectorName));
178+
return withRetryOnConflictOrRebalance(super.getConnectorStatus(connectorName));
153179
}
154180

155181
@Override
156182
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
157183
throws WebClientResponseException {
158-
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
184+
return withRetryOnConflictOrRebalance(super.getConnectorStatusWithHttpInfo(connectorName));
159185
}
160186

161187
@Override
162188
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
163189
throws WebClientResponseException {
164-
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
190+
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatus(connectorName, taskId));
165191
}
166192

167193
@Override
168194
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
169195
throws WebClientResponseException {
170-
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
196+
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
171197
}
172198

173199
@Override
174200
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
175-
return withRetryOnConflict(super.getConnectorTasks(connectorName));
201+
return withRetryOnConflictOrRebalance(super.getConnectorTasks(connectorName));
176202
}
177203

178204
@Override
179205
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
180206
throws WebClientResponseException {
181-
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
207+
return withRetryOnConflictOrRebalance(super.getConnectorTasksWithHttpInfo(connectorName));
182208
}
183209

184210
@Override
185211
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
186-
return withRetryOnConflict(super.getConnectorTopics(connectorName));
212+
return withRetryOnConflictOrRebalance(super.getConnectorTopics(connectorName));
187213
}
188214

189215
@Override
190216
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
191217
throws WebClientResponseException {
192-
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
218+
return withRetryOnConflictOrRebalance(super.getConnectorTopicsWithHttpInfo(connectorName));
193219
}
194220

195221
@Override
196222
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
197-
return withRetryOnConflict(super.getConnectors(search));
223+
return withRetryOnConflictOrRebalance(super.getConnectors(search));
198224
}
199225

200226
@Override
201227
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
202-
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
228+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
203229
}
204230

205231
@Override
206232
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
207-
return withRetryOnConflict(super.pauseConnector(connectorName));
233+
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
208234
}
209235

210236
@Override
211237
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
212-
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
238+
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
213239
}
214240

215241
@Override
216242
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
217243
throws WebClientResponseException {
218-
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
244+
return withRetryOnConflictOrRebalance(super.restartConnector(connectorName, includeTasks, onlyFailed));
219245
}
220246

221247
@Override
222248
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
223249
Boolean onlyFailed) throws WebClientResponseException {
224-
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
250+
return withRetryOnConflictOrRebalance(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
225251
}
226252

227253
@Override
228254
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
229-
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
255+
return withRetryOnConflictOrRebalance(super.restartConnectorTask(connectorName, taskId));
230256
}
231257

232258
@Override
233259
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
234260
throws WebClientResponseException {
235-
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
261+
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
236262
}
237263

238264
@Override
239265
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
240-
return super.resumeConnector(connectorName);
266+
return withRetryOnRebalance(super.resumeConnector(connectorName));
241267
}
242268

243269
@Override
244270
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
245271
throws WebClientResponseException {
246-
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
272+
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
247273
}
248274

249275
@Override
250276
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
251277
Map<String, Object> requestBody)
252278
throws WebClientResponseException {
253-
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
279+
return withRetryOnConflictOrRebalance(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
254280
}
255281

256282
@Override
257283
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
258284
Map<String, Object> requestBody)
259285
throws WebClientResponseException {
260-
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
286+
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfig(pluginName, requestBody));
261287
}
262288

263289
@Override
264290
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
265291
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
266-
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
292+
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
267293
}
268294

269295
private static class RetryingApiClient extends ApiClient {

0 commit comments

Comments
 (0)