Skip to content

Commit 181fa4e

Browse files
authored
Merge branch 'main' into issue/445-fe2
2 parents f53f1f4 + bac16cc commit 181fa4e

File tree

11 files changed

+259
-54
lines changed

11 files changed

+259
-54
lines changed

README.md

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,23 @@ We extend our gratitude to Provectus for their past support in groundbreaking wo
4343
![Interface](https://raw.githubusercontent.com/kafbat/kafka-ui/images/overview.gif)
4444

4545
# Features
46-
* **Multi-Cluster Management** — monitor and manage all your clusters in one place
47-
* **Performance Monitoring with Metrics Dashboard** — track key Kafka metrics with a lightweight dashboard
48-
* **View Kafka Brokers** — view topic and partition assignments, controller status
49-
* **View Kafka Topics** — view partition count, replication status, and custom configuration
50-
* **View Consumer Groups** — view per-partition parked offsets, combined and per-partition lag
51-
* **Browse Messages** — browse messages with JSON, plain text, and Avro encoding
52-
* **Dynamic Topic Configuration** — create and configure new topics with dynamic configuration
53-
* **Configurable Authentication**[secure](https://ui.docs.kafbat.io/configuration/authentication) your installation with optional Github/Gitlab/Google OAuth 2.0
54-
* **Custom serialization/deserialization plugins** - [use](https://ui.docs.kafbat.io/configuration/serialization-serde) a ready-to-go serde for your data like AWS Glue or Smile, or code your own!
55-
* **Role based access control** - [manage permissions](https://ui.docs.kafbat.io/configuration/rbac-role-based-access-control) to access the UI with granular precision
56-
* **Data masking** - [obfuscate](https://ui.docs.kafbat.io/configuration/data-masking) sensitive data in topic messages
46+
47+
* **Topic Insights** – View essential topic details including partition count, replication status, and custom configurations.
48+
* **Configuration Wizard** – Set up and configure your Kafka clusters directly through the UI.
49+
* **Multi-Cluster Management** – Monitor and manage all your Kafka clusters in one unified interface.
50+
* **Metrics Dashboard** – Track key Kafka metrics in real time with a streamlined, lightweight dashboard.
51+
* **Kafka Brokers Overview** – Inspect brokers, including partition assignments and controller status.
52+
* **Consumer Group Details** – Analyze parked offsets per partition, and monitor both combined and partition-specific lag.
53+
* **Message Browser** – Explore messages in JSON, plain text, or Avro encoding formats. Live view is supported, enriched with user-defined CEL message filters.
54+
* **Dynamic Topic Management** – Create and configure new topics with flexible, real-time settings.
55+
* **Pluggable Authentication** – Secure your UI using OAuth 2.0 (GitHub, GitLab, Google), LDAP, or basic authentication.
56+
* **Cloud IAM Support** – Integrate with **GCP IAM**, **Azure IAM**, and **AWS IAM** for cloud-native identity and access management.
57+
* **Managed Kafka Service Support** – Full support for **Azure EventHub**, **Google Cloud Managed Service for Apache Kafka**, and **AWS Managed Streaming for Apache Kafka (MSK)**—both server-based and serverless.
58+
* **Custom SerDe Plugin Support** – Use built-in serializers/deserializers like AWS Glue and Smile, or create your own custom plugins.
59+
* **Role-Based Access Control**[Manage granular UI permissions](https://ui.docs.kafbat.io/configuration/rbac-role-based-access-control) with RBAC.
60+
* **Data Masking**[Obfuscate sensitive data](https://ui.docs.kafbat.io/configuration/data-masking) in topic messages to enhance privacy and compliance.
61+
* **MCP Server** - [Model Context Protocol](https://ui.docs.kafbat.io/faq/mcp) Server
62+
5763

5864
## Feature overview
5965

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public enum LogLevel {
192192
public static class CacheProperties {
193193
boolean enabled = true;
194194
Duration connectCacheExpiry = Duration.ofMinutes(1);
195+
Duration connectClusterCacheExpiry = Duration.ofHours(24);
195196
}
196197

197198
@PostConstruct

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.mapper;
22

33
import io.kafbat.ui.config.ClustersProperties;
4+
import io.kafbat.ui.connect.model.ClusterInfo;
45
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
56
import io.kafbat.ui.connect.model.ConnectorTask;
67
import io.kafbat.ui.connect.model.NewConnector;
@@ -45,6 +46,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4546
default ConnectDTO toKafkaConnect(
4647
ClustersProperties.ConnectCluster connect,
4748
List<InternalConnectorInfo> connectors,
49+
ClusterInfo clusterInfo,
4850
boolean withStats) {
4951
Integer connectorCount = null;
5052
Integer failedConnectors = null;
@@ -66,12 +68,17 @@ default ConnectDTO toKafkaConnect(
6668
.filter(ConnectorStateDTO.FAILED::equals)
6769
.map(s -> 1).orElse(0);
6870

69-
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
70-
71-
for (TaskDTO task : connector.getTasks()) {
72-
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
73-
failedTasksCount += tasksCount;
74-
}
71+
tasksCount += internalConnector.map(ConnectorDTO::getTasks).map(List::size).orElse(0);
72+
73+
if (connector.getTasks() != null) {
74+
failedTasksCount += (int) connector.getTasks().stream()
75+
.filter(t ->
76+
Optional.ofNullable(t)
77+
.map(TaskDTO::getStatus)
78+
.map(TaskStatusDTO::getState)
79+
.map(ConnectorTaskStatusDTO.FAILED::equals)
80+
.orElse(false)
81+
).count();
7582
}
7683
}
7784

@@ -83,7 +90,10 @@ default ConnectDTO toKafkaConnect(
8390
.connectorsCount(connectorCount)
8491
.failedConnectorsCount(failedConnectors)
8592
.tasksCount(tasksCount)
86-
.failedTasksCount(failedTasksCount);
93+
.failedTasksCount(failedTasksCount)
94+
.version(clusterInfo.getVersion())
95+
.commit(clusterInfo.getCommit())
96+
.clusterId(clusterInfo.getKafkaClusterId());
8797
}
8898

8999
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ void configure(Configuration configuration) {
112112
&& configuration.defaultKeyMessageDescriptor() == null
113113
&& configuration.messageDescriptorMap().isEmpty()
114114
&& configuration.keyMessageDescriptorMap().isEmpty()) {
115-
throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde");
115+
throw new ValidationException("Neither default, nor per-topic descriptors defined for " + name() + " serde");
116116
}
117117
this.defaultMessageDescriptor = configuration.defaultMessageDescriptor();
118118
this.defaultKeyMessageDescriptor = configuration.defaultKeyMessageDescriptor();

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import com.github.benmanes.caffeine.cache.Caffeine;
66
import io.kafbat.ui.config.ClustersProperties;
77
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
8+
import io.kafbat.ui.connect.model.ClusterInfo;
89
import io.kafbat.ui.connect.model.ConnectorStatus;
910
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
1011
import io.kafbat.ui.connect.model.ConnectorTopics;
1112
import io.kafbat.ui.connect.model.TaskStatus;
1213
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
1314
import io.kafbat.ui.exception.NotFoundException;
1415
import io.kafbat.ui.exception.ValidationException;
15-
import io.kafbat.ui.mapper.ClusterMapper;
1616
import io.kafbat.ui.mapper.KafkaConnectMapper;
1717
import io.kafbat.ui.model.ConnectDTO;
1818
import io.kafbat.ui.model.ConnectorActionDTO;
@@ -40,49 +40,59 @@
4040
import org.springframework.web.reactive.function.client.WebClientResponseException;
4141
import reactor.core.publisher.Flux;
4242
import reactor.core.publisher.Mono;
43+
import reactor.util.function.Tuples;
4344

4445
@Service
4546
@Slf4j
4647
public class KafkaConnectService {
47-
private final ClusterMapper clusterMapper;
4848
private final KafkaConnectMapper kafkaConnectMapper;
4949
private final KafkaConfigSanitizer kafkaConfigSanitizer;
5050
private final ClustersProperties clustersProperties;
5151

5252
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
53+
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
5354

54-
public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
55+
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5556
KafkaConfigSanitizer kafkaConfigSanitizer,
5657
ClustersProperties clustersProperties) {
57-
this.clusterMapper = clusterMapper;
5858
this.kafkaConnectMapper = kafkaConnectMapper;
5959
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
6060
this.clustersProperties = clustersProperties;
6161
this.cachedConnectors = Caffeine.newBuilder()
6262
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
6363
.buildAsync();
64+
this.cacheClusterInfo = Caffeine.newBuilder()
65+
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
66+
.buildAsync();
6467
}
6568

6669
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
6770
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
6871
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
72+
6973
if (withStats) {
7074
return connectClusters.map(connects ->
71-
Flux.fromIterable(connects).flatMap(connect -> (
72-
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
73-
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
74-
)
75+
Flux.fromIterable(connects).flatMap(c ->
76+
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
77+
).flatMap(tuple -> (
78+
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()))
79+
.map(connectors ->
80+
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
81+
)
7582
)
7683
)
7784
).orElse(Flux.fromIterable(List.of()));
7885
} else {
79-
return Flux.fromIterable(connectClusters.map(connects ->
80-
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
81-
).orElse(List.of()));
86+
return Flux.fromIterable(connectClusters.orElse(List.of()))
87+
.flatMap(c ->
88+
getClusterInfo(cluster, c.getName()).map(info ->
89+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
90+
)
91+
);
8292
}
8393
}
8494

85-
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) {
95+
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key) {
8696
if (clustersProperties.getCache().isEnabled()) {
8797
return Mono.fromFuture(
8898
cachedConnectors.get(key, (t, e) ->
@@ -94,6 +104,16 @@ private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectC
94104
}
95105
}
96106

107+
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
108+
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
109+
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
110+
.onErrorResume(th -> {
111+
log.error("Error on collecting cluster info" + th.getMessage(), th);
112+
return Mono.just(new ClusterInfo());
113+
}).toFuture()
114+
));
115+
}
116+
97117
private Flux<InternalConnectorInfo> getConnectConnectors(
98118
KafkaCluster cluster,
99119
ClustersProperties.ConnectCluster connect) {
@@ -177,12 +197,13 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
177197
.mono(client ->
178198
connector
179199
.flatMap(c -> connectorExists(cluster, connectName, c.getName())
180-
.map(exists -> {
200+
.flatMap(exists -> {
181201
if (Boolean.TRUE.equals(exists)) {
182-
throw new ValidationException(
183-
String.format("Connector with name %s already exists", c.getName()));
202+
return Mono.error(new ValidationException(
203+
String.format("Connector with name %s already exists", c.getName())));
204+
} else {
205+
return Mono.just(c);
184206
}
185-
return c;
186207
}))
187208
.map(kafkaConnectMapper::toClient)
188209
.flatMap(client::createConnector)

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
163163
boolean topicDeletionEnabled = true;
164164
for (ConfigEntry entry : configs) {
165165
if (entry.name().contains("inter.broker.protocol.version")) {
166-
version = Optional.of(entry.value());
166+
version = Optional.ofNullable(entry.value());
167167
}
168-
if (entry.name().equals("delete.topic.enable")) {
168+
if (entry.name().equals("delete.topic.enable") && entry.value() != null) {
169169
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
170170
}
171171
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.kafbat.ui.mapper;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.config.ClustersProperties;
6+
import io.kafbat.ui.connect.model.ClusterInfo;
7+
import io.kafbat.ui.model.ConnectDTO;
8+
import io.kafbat.ui.model.ConnectorDTO;
9+
import io.kafbat.ui.model.ConnectorStateDTO;
10+
import io.kafbat.ui.model.ConnectorStatusDTO;
11+
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
12+
import io.kafbat.ui.model.TaskDTO;
13+
import io.kafbat.ui.model.TaskIdDTO;
14+
import io.kafbat.ui.model.TaskStatusDTO;
15+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.UUID;
19+
import java.util.concurrent.ThreadLocalRandom;
20+
import org.junit.jupiter.api.Test;
21+
import org.openapitools.jackson.nullable.JsonNullable;
22+
23+
class KafkaConnectMapperTest {
24+
25+
@Test
26+
void toKafkaConnect() {
27+
ThreadLocalRandom random = ThreadLocalRandom.current();
28+
29+
List<InternalConnectorInfo> connectors = new ArrayList<>();
30+
int failedConnectors = 0;
31+
int failedTasks = 0;
32+
int tasksPerConnector = random.nextInt(1, 10);
33+
34+
for (int i = 0; i < 10; i++) {
35+
ConnectorStateDTO connectorState;
36+
if (random.nextBoolean()) {
37+
connectorState = ConnectorStateDTO.FAILED;
38+
failedConnectors++;
39+
} else {
40+
connectorState = ConnectorStateDTO.RUNNING;
41+
}
42+
43+
ConnectorDTO connectorDto = new ConnectorDTO();
44+
connectorDto.setName(UUID.randomUUID().toString());
45+
connectorDto.setStatus(
46+
new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString())
47+
);
48+
49+
List<TaskDTO> tasks = new ArrayList<>();
50+
List<TaskIdDTO> taskIds = new ArrayList<>();
51+
52+
for (int j = 0; j < tasksPerConnector; j++) {
53+
TaskDTO task = new TaskDTO();
54+
TaskIdDTO taskId = new TaskIdDTO(UUID.randomUUID().toString(), j);
55+
task.setId(taskId);
56+
57+
ConnectorTaskStatusDTO state;
58+
if (random.nextBoolean()) {
59+
state = ConnectorTaskStatusDTO.FAILED;
60+
failedTasks++;
61+
} else {
62+
state = ConnectorTaskStatusDTO.RUNNING;
63+
}
64+
65+
TaskStatusDTO status = new TaskStatusDTO();
66+
status.setState(state);
67+
task.setStatus(status);
68+
tasks.add(task);
69+
taskIds.add(taskId);
70+
}
71+
72+
connectorDto.setTasks(taskIds);
73+
InternalConnectorInfo connector = InternalConnectorInfo.builder()
74+
.connector(connectorDto)
75+
.tasks(tasks)
76+
.build();
77+
78+
connectors.add(connector);
79+
}
80+
81+
ClusterInfo clusterInfo = new ClusterInfo();
82+
clusterInfo.setVersion(UUID.randomUUID().toString());
83+
clusterInfo.setCommit(UUID.randomUUID().toString());
84+
clusterInfo.setKafkaClusterId(UUID.randomUUID().toString());
85+
86+
ClustersProperties.ConnectCluster connectCluster = ClustersProperties.ConnectCluster.builder()
87+
.name(UUID.randomUUID().toString())
88+
.address("http://localhost:" + random.nextInt(1000, 5000))
89+
.username(UUID.randomUUID().toString())
90+
.password(UUID.randomUUID().toString()).build();
91+
92+
ConnectDTO connectDto = new ConnectDTO();
93+
connectDto.setName(connectCluster.getName());
94+
connectDto.setAddress(connectCluster.getAddress());
95+
connectDto.setVersion(JsonNullable.of(clusterInfo.getVersion()));
96+
connectDto.setCommit(JsonNullable.of(clusterInfo.getCommit()));
97+
connectDto.setClusterId(JsonNullable.of(clusterInfo.getKafkaClusterId()));
98+
connectDto.setConnectorsCount(JsonNullable.of(connectors.size()));
99+
connectDto.setFailedConnectorsCount(JsonNullable.of(failedConnectors));
100+
connectDto.setTasksCount(JsonNullable.of(connectors.size() * tasksPerConnector));
101+
connectDto.setFailedTasksCount(JsonNullable.of(failedTasks));
102+
103+
KafkaConnectMapper mapper = new KafkaConnectMapperImpl();
104+
ConnectDTO kafkaConnect = mapper.toKafkaConnect(
105+
connectCluster,
106+
connectors,
107+
clusterInfo,
108+
true
109+
);
110+
111+
assertThat(kafkaConnect).isNotNull();
112+
assertThat(kafkaConnect).isEqualTo(connectDto);
113+
114+
}
115+
}

contract/build.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import org.openapitools.generator.gradle.plugin.tasks.GenerateTask
33
plugins {
44
id "java-library"
55
alias(libs.plugins.openapi.generator)
6+
alias(libs.plugins.openapi.validator)
67
}
78

89

@@ -60,6 +61,21 @@ tasks.register('generateBackendApi', GenerateTask) {
6061
"""]
6162
}
6263

64+
openAPIStyleValidator {
65+
inputFile = "${project.projectDir}/src/main/resources/swagger/kafbat-ui-api.yaml"
66+
validateModelPropertiesDescription = false
67+
validateModelPropertiesExample = false
68+
parameterNamingConvention = "UnderscoreCase"
69+
propertyNamingConvention = "camelCase"
70+
queryParamNamingConvention = "camelCase"
71+
pathParamNamingConvention = "camelCase"
72+
validateOperationDescription = false
73+
validateInfoLicense = false
74+
validateInfoDescription = false
75+
validateInfoContact = false
76+
}
77+
78+
6379
tasks.register('generateConnectClient', GenerateTask) {
6480
generatorName = "java"
6581
inputSpec = specDir.file("kafka-connect-api.yaml").asFile.absolutePath

0 commit comments

Comments
 (0)