Skip to content

Commit 6946c5e

Browse files
authored
Merge branch 'main' into issues/1333-optional-fts-fe
2 parents 40d03b2 + e767cb1 commit 6946c5e

File tree

36 files changed

+1159
-268
lines changed

36 files changed

+1159
-268
lines changed

README.md

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,25 @@ Kafbat UI wraps major functions of Apache Kafka with an intuitive user interface
7272
![Interface](documentation/images/Interface.gif)
7373

7474
## Topics
75-
Kafbat UI makes it easy for you to create topics in your browser by several clicks,
76-
pasting your own parameters, and viewing topics in the list.
75+
Kafbat UI makes it easy for you to create topics in your browser with just a few clicks, by pasting your own parameters, and viewing topics in the list.
7776

7877
![Create Topic](documentation/images/Create_topic_kafka-ui.gif)
7978

80-
It's possible to jump from connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation.
81-
connectors, overview topic settings.
79+
You can jump from the connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation, including connectors and overview topic settings.
8280

8381
![Connector_Topic_Consumer](documentation/images/Connector_Topic_Consumer.gif)
8482

8583
### Messages
86-
Let's say we want to produce messages for our topic. With the Kafbat UI we can send or write data/messages to the Kafka topics without effort by specifying parameters, and viewing messages in the list.
84+
Suppose you want to produce messages for your topic. With Kafbat UI, you can easily send or write data/messages to Kafka topics by specifying parameters and viewing messages in the list.
8785

8886
![Produce Message](documentation/images/Create_message_kafka-ui.gif)
8987

9088
## Schema registry
91-
There are 3 supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
89+
There are three supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
9290

9391
![Create Schema Registry](documentation/images/Create_schema.gif)
9492

95-
Before producing avro/protobuf encoded messages, you have to add a schema for the topic in Schema Registry. Now all these steps are easy to do
96-
with a few clicks in a user-friendly interface.
93+
Before producing Avro/Protobuf encoded messages, you need to add a schema for the topic in the Schema Registry. All these steps are now easy to do with just a few clicks in a user-friendly interface.
9794

9895
![Avro Schema Topic](documentation/images/Schema_Topic.gif)
9996

@@ -111,7 +108,7 @@ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true ghcr.io/kafbat/kafka-
111108

112109
Then access the web UI at [http://localhost:8080](http://localhost:8080)
113110

114-
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start)
111+
This command is sufficient to try things out. When you're done, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start).
115112

116113
## Persistent installation
117114

@@ -146,24 +143,24 @@ Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/conf
146143
147144
## Building from sources
148145
149-
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) with building
146+
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) for building from source
150147
151148
## Liveliness and readiness probes
152-
Liveliness and readiness endpoint is at `/actuator/health`.<br/>
153-
Info endpoint (build info) is located at `/actuator/info`.
149+
The liveness and readiness endpoint is at `/actuator/health`.<br/>
150+
The info endpoint (build info) is located at `/actuator/info`.
154151

155152
# Configuration options
156153

157-
All the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
154+
All environment variables and configuration properties can be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
158155

159156
# Contributing
160157

161-
Please refer to [contributing guide](https://ui.docs.kafbat.io/development/contributing), we'll guide you from there.
158+
Please refer to the [contributing guide](https://ui.docs.kafbat.io/development/contributing); we'll guide you from there.
162159

163160
# Support
164161

165162
As we're fully independent, team members contribute in their free time.
166-
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
163+
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
167164

168165
# Powered by
169166

api/src/main/java/io/kafbat/ui/config/McpConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {
4444
// Configure server capabilities with resource support
4545
var capabilities = McpSchema.ServerCapabilities.builder()
4646
.resources(false, true)
47-
.tools(true) // Tool support with list changes notifications
47+
.tools(true) // Tools support with list changes notifications
4848
.prompts(false) // Prompt support with list changes notifications
4949
.logging() // Logging support
5050
.build();
5151

52-
// Create the server with both tool and resource capabilities
52+
// Create the server with both tools and resource capabilities
5353
return McpServer.async(transport)
5454
.serverInfo("Kafka UI MCP", "0.0.1")
5555
.capabilities(capabilities)

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.api.TopicsApi;
1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
14+
import io.kafbat.ui.model.FullConnectorInfoDTO;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
1617
import io.kafbat.ui.model.PartitionsIncreaseDTO;
@@ -28,6 +29,8 @@
2829
import io.kafbat.ui.model.TopicUpdateDTO;
2930
import io.kafbat.ui.model.TopicsResponseDTO;
3031
import io.kafbat.ui.model.rbac.AccessContext;
32+
import io.kafbat.ui.model.rbac.permission.ConnectAction;
33+
import io.kafbat.ui.service.KafkaConnectService;
3134
import io.kafbat.ui.service.TopicsService;
3235
import io.kafbat.ui.service.analyze.TopicAnalysisService;
3336
import io.kafbat.ui.service.mcp.McpTool;
@@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5558
private final TopicAnalysisService topicAnalysisService;
5659
private final ClusterMapper clusterMapper;
5760
private final ClustersProperties clustersProperties;
61+
private final KafkaConnectService kafkaConnectService;
5862

5963
@Override
6064
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -370,4 +374,23 @@ private Comparator<InternalTopic> getComparatorForTopic(
370374
default -> defaultComparator;
371375
};
372376
}
377+
378+
@Override
379+
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getTopicConnectors(String clusterName,
380+
String topicName,
381+
ServerWebExchange exchange) {
382+
var context = AccessContext.builder()
383+
.cluster(clusterName)
384+
.topicActions(topicName, VIEW)
385+
.operationName("getTopicConnectors")
386+
.operationParams(topicName)
387+
.build();
388+
389+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName)
390+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
391+
392+
return validateAccess(context)
393+
.then(Mono.just(ResponseEntity.ok(job)))
394+
.doOnEach(sig -> audit(context, sig));
395+
}
373396
}

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.kafbat.ui.model.TaskIdDTO;
1919
import io.kafbat.ui.model.TaskStatusDTO;
2020
import io.kafbat.ui.model.connect.InternalConnectorInfo;
21+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Objects;
@@ -31,6 +32,12 @@
3132
public interface KafkaConnectMapper {
3233
NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector);
3334

35+
default ClusterInfo toClient(KafkaConnectState state) {
36+
ClusterInfo clusterInfo = new ClusterInfo();
37+
clusterInfo.setVersion(state.getVersion());
38+
return clusterInfo;
39+
}
40+
3441
@Mapping(target = "status", ignore = true)
3542
@Mapping(target = "connect", ignore = true)
3643
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
@@ -153,4 +160,23 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo
153160
.tasksCount(tasks.size())
154161
.failedTasksCount(failedTasksCount);
155162
}
163+
164+
default KafkaConnectState toScrapeState(ConnectDTO connect, List<InternalConnectorInfo> connectors) {
165+
return KafkaConnectState.builder()
166+
.name(connect.getName())
167+
.version(connect.getVersion().orElse("Unknown"))
168+
.connectors(connectors.stream().map(this::toScrapeState).toList())
169+
.build();
170+
}
171+
172+
default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo connector) {
173+
return new KafkaConnectState.ConnectorState(
174+
connector.getConnector().getName(),
175+
connector.getConnector().getType(),
176+
connector.getConnector().getStatus(),
177+
connector.getTopics()
178+
);
179+
}
180+
181+
156182
}

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.kafbat.ui.model;
22

33
import io.kafbat.ui.service.ReactiveAdminClient;
4+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
45
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
56
import java.util.List;
7+
import java.util.Map;
68
import java.util.function.UnaryOperator;
79
import java.util.stream.Stream;
810
import lombok.Builder;
@@ -19,6 +21,7 @@ public class Statistics implements AutoCloseable {
1921
ReactiveAdminClient.ClusterDescription clusterDescription;
2022
Metrics metrics;
2123
ScrapedClusterState clusterState;
24+
Map<String, KafkaConnectState> connectStates;
2225

2326
public static Statistics empty() {
2427
return builder()
@@ -28,6 +31,7 @@ public static Statistics empty() {
2831
.clusterDescription(ReactiveAdminClient.ClusterDescription.empty())
2932
.metrics(Metrics.empty())
3033
.clusterState(ScrapedClusterState.empty())
34+
.connectStates(Map.of())
3135
.build();
3236
}
3337

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.service;
22

3-
import com.github.benmanes.caffeine.cache.AsyncCache;
4-
import com.github.benmanes.caffeine.cache.Caffeine;
53
import io.kafbat.ui.config.ClustersProperties;
64
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
75
import io.kafbat.ui.connect.model.ClusterInfo;
@@ -25,12 +23,15 @@
2523
import io.kafbat.ui.model.FullConnectorInfoDTO;
2624
import io.kafbat.ui.model.KafkaCluster;
2725
import io.kafbat.ui.model.NewConnectorDTO;
26+
import io.kafbat.ui.model.Statistics;
2827
import io.kafbat.ui.model.TaskDTO;
2928
import io.kafbat.ui.model.TaskIdDTO;
3029
import io.kafbat.ui.model.connect.InternalConnectorInfo;
3130
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
31+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
3232
import io.kafbat.ui.util.ReactiveFailover;
3333
import jakarta.validation.Valid;
34+
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
3637
import java.util.Optional;
@@ -49,17 +50,16 @@ public class KafkaConnectService {
4950
private final KafkaConnectMapper kafkaConnectMapper;
5051
private final KafkaConfigSanitizer kafkaConfigSanitizer;
5152
private final ClustersProperties clustersProperties;
52-
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
53+
private final StatisticsCache statisticsCache;
5354

5455
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5556
KafkaConfigSanitizer kafkaConfigSanitizer,
56-
ClustersProperties clustersProperties) {
57+
ClustersProperties clustersProperties,
58+
StatisticsCache statisticsCache) {
5759
this.kafkaConnectMapper = kafkaConnectMapper;
5860
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
5961
this.clustersProperties = clustersProperties;
60-
this.cacheClusterInfo = Caffeine.newBuilder()
61-
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
62-
.buildAsync();
62+
this.statisticsCache = statisticsCache;
6363
}
6464

6565
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
@@ -89,14 +89,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
8989
}
9090
}
9191

92-
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
93-
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
94-
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
95-
.onErrorResume(th -> {
96-
log.error("Error on collecting cluster info", th);
97-
return Mono.just(new ClusterInfo());
98-
}).toFuture()
99-
));
92+
public Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
93+
KafkaConnectState state = statisticsCache.get(cluster).getConnectStates().get(connectName);
94+
if (state != null) {
95+
return Mono.just(kafkaConnectMapper.toClient(state));
96+
} else {
97+
return api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
98+
.onErrorResume(th -> {
99+
log.error("Error on collecting cluster info", th);
100+
return Mono.just(new ClusterInfo());
101+
});
102+
}
100103
}
101104

102105
private Flux<InternalConnectorInfo> getConnectConnectors(
@@ -134,6 +137,33 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
134137
.flatMapMany(Flux::fromIterable);
135138
}
136139

140+
public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
141+
142+
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
143+
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
144+
145+
return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c ->
146+
getClusterInfo(cluster, c.getName()).map(info ->
147+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
148+
).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName())))
149+
).flatMap(connect ->
150+
getConnectorsWithErrorsSuppress(cluster, connect.getName())
151+
.onErrorResume(t -> Mono.just(Map.of()))
152+
.flatMapMany(connectors ->
153+
Flux.fromIterable(connectors.entrySet())
154+
.flatMap(e ->
155+
getConnectorTopics(
156+
cluster,
157+
connect.getName(),
158+
e.getKey()
159+
).map(topics ->
160+
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
161+
)
162+
)
163+
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors))
164+
);
165+
}
166+
137167
private List<FullConnectorInfoDTO> filterConnectors(
138168
List<FullConnectorInfoDTO> connectors,
139169
String search,
@@ -349,4 +379,30 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
349379
.formatted(connectorName, connectName));
350380
});
351381
}
382+
383+
public Flux<FullConnectorInfoDTO> getTopicConnectors(KafkaCluster cluster, String topicName) {
384+
Map<String, KafkaConnectState> connectStates = this.statisticsCache.get(cluster).getConnectStates();
385+
Map<String, List<String>> filteredConnects = new HashMap<>();
386+
for (Map.Entry<String, KafkaConnectState> entry : connectStates.entrySet()) {
387+
List<KafkaConnectState.ConnectorState> connectors =
388+
entry.getValue().getConnectors().stream().filter(c -> c.topics().contains(topicName)).toList();
389+
if (!connectors.isEmpty()) {
390+
filteredConnects.put(entry.getKey(), connectors.stream().map(KafkaConnectState.ConnectorState::name).toList());
391+
}
392+
}
393+
394+
return Flux.fromIterable(filteredConnects.entrySet())
395+
.flatMap(entry ->
396+
getConnectorsWithErrorsSuppress(cluster, entry.getKey())
397+
.map(connectors ->
398+
connectors.entrySet()
399+
.stream()
400+
.filter(c -> entry.getValue().contains(c.getKey()))
401+
.map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null))
402+
.map(kafkaConnectMapper::fullConnectorInfo)
403+
.toList()
404+
)
405+
).flatMap(Flux::fromIterable);
406+
407+
}
352408
}

0 commit comments

Comments
 (0)