Skip to content

Commit f00171f

Browse files
authored
Merge branch 'main' into issues/1440
2 parents 2c9db56 + d285a69 commit f00171f

File tree

65 files changed

+1631
-541
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1631
-541
lines changed

README.md

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,25 @@ Kafbat UI wraps major functions of Apache Kafka with an intuitive user interface
7272
![Interface](documentation/images/Interface.gif)
7373

7474
## Topics
75-
Kafbat UI makes it easy for you to create topics in your browser by several clicks,
76-
pasting your own parameters, and viewing topics in the list.
75+
Kafbat UI makes it easy for you to create topics in your browser with just a few clicks, by pasting your own parameters, and viewing topics in the list.
7776

7877
![Create Topic](documentation/images/Create_topic_kafka-ui.gif)
7978

80-
It's possible to jump from connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation.
81-
connectors, overview topic settings.
79+
You can jump from the connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation, including connectors and overview topic settings.
8280

8381
![Connector_Topic_Consumer](documentation/images/Connector_Topic_Consumer.gif)
8482

8583
### Messages
86-
Let's say we want to produce messages for our topic. With the Kafbat UI we can send or write data/messages to the Kafka topics without effort by specifying parameters, and viewing messages in the list.
84+
Suppose you want to produce messages for your topic. With Kafbat UI, you can easily send or write data/messages to Kafka topics by specifying parameters and viewing messages in the list.
8785

8886
![Produce Message](documentation/images/Create_message_kafka-ui.gif)
8987

9088
## Schema registry
91-
There are 3 supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
89+
There are three supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
9290

9391
![Create Schema Registry](documentation/images/Create_schema.gif)
9492

95-
Before producing avro/protobuf encoded messages, you have to add a schema for the topic in Schema Registry. Now all these steps are easy to do
96-
with a few clicks in a user-friendly interface.
93+
Before producing Avro/Protobuf encoded messages, you need to add a schema for the topic in the Schema Registry. All these steps are now easy to do with just a few clicks in a user-friendly interface.
9794

9895
![Avro Schema Topic](documentation/images/Schema_Topic.gif)
9996

@@ -111,7 +108,7 @@ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true ghcr.io/kafbat/kafka-
111108

112109
Then access the web UI at [http://localhost:8080](http://localhost:8080)
113110

114-
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start)
111+
This command is sufficient to try things out. When you're done, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start).
115112

116113
## Persistent installation
117114

@@ -146,24 +143,24 @@ Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/conf
146143
147144
## Building from sources
148145
149-
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) with building
146+
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) for building from source
150147
151148
## Liveliness and readiness probes
152-
Liveliness and readiness endpoint is at `/actuator/health`.<br/>
153-
Info endpoint (build info) is located at `/actuator/info`.
149+
The liveness and readiness endpoint is at `/actuator/health`.<br/>
150+
The info endpoint (build info) is located at `/actuator/info`.
154151

155152
# Configuration options
156153

157-
All the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
154+
All environment variables and configuration properties can be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
158155

159156
# Contributing
160157

161-
Please refer to [contributing guide](https://ui.docs.kafbat.io/development/contributing), we'll guide you from there.
158+
Please refer to the [contributing guide](https://ui.docs.kafbat.io/development/contributing); we'll guide you from there.
162159

163160
# Support
164161

165162
As we're fully independent, team members contribute in their free time.
166-
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
163+
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
167164

168165
# Powered by
169166

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import io.kafbat.ui.connect.ApiClient;
88
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
99
import io.kafbat.ui.connect.model.Connector;
10+
import io.kafbat.ui.connect.model.ConnectorExpand;
1011
import io.kafbat.ui.connect.model.ConnectorPlugin;
1112
import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse;
1213
import io.kafbat.ui.connect.model.ConnectorStatus;
1314
import io.kafbat.ui.connect.model.ConnectorTask;
1415
import io.kafbat.ui.connect.model.ConnectorTopics;
16+
import io.kafbat.ui.connect.model.ExpandedConnector;
1517
import io.kafbat.ui.connect.model.NewConnector;
1618
import io.kafbat.ui.connect.model.TaskStatus;
1719
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
@@ -221,13 +223,17 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
221223
}
222224

223225
@Override
224-
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
225-
return withRetryOnConflictOrRebalance(super.getConnectors(search));
226+
public Mono<Map<String, ExpandedConnector>> getConnectors(
227+
String search, List<ConnectorExpand> expand
228+
) throws WebClientResponseException {
229+
return withRetryOnConflictOrRebalance(super.getConnectors(search, expand));
226230
}
227231

228232
@Override
229-
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
230-
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
233+
public Mono<ResponseEntity<Map<String, ExpandedConnector>>> getConnectorsWithHttpInfo(
234+
String search, List<ConnectorExpand> expand
235+
) throws WebClientResponseException {
236+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand));
231237
}
232238

233239
@Override

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ public enum LogLevel {
227227
@AllArgsConstructor
228228
public static class CacheProperties {
229229
boolean enabled = true;
230-
Duration connectCacheExpiry = Duration.ofMinutes(1);
231230
Duration connectClusterCacheExpiry = Duration.ofHours(24);
232231
}
233232

api/src/main/java/io/kafbat/ui/config/McpConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {
4444
// Configure server capabilities with resource support
4545
var capabilities = McpSchema.ServerCapabilities.builder()
4646
.resources(false, true)
47-
.tools(true) // Tool support with list changes notifications
47+
.tools(true) // Tools support with list changes notifications
4848
.prompts(false) // Prompt support with list changes notifications
4949
.logging() // Logging support
5050
.build();
5151

52-
// Create the server with both tool and resource capabilities
52+
// Create the server with both tools and resource capabilities
5353
return McpServer.async(transport)
5454
.serverInfo("Kafka UI MCP", "0.0.1")
5555
.capabilities(capabilities)

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6666
.build();
6767

6868
return validateAccess(context)
69-
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
70-
.doOnEach(sig -> audit(context, sig));
69+
.thenReturn(
70+
ResponseEntity.ok(
71+
kafkaConnectService.getConnectors(getCluster(clusterName), connectName)
72+
.flatMapMany(m -> Flux.fromIterable(m.keySet()))
73+
)
74+
).doOnEach(sig -> audit(context, sig));
7175
}
7276

7377
@Override

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.api.TopicsApi;
1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
14+
import io.kafbat.ui.model.FullConnectorInfoDTO;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
1617
import io.kafbat.ui.model.PartitionsIncreaseDTO;
@@ -28,6 +29,8 @@
2829
import io.kafbat.ui.model.TopicUpdateDTO;
2930
import io.kafbat.ui.model.TopicsResponseDTO;
3031
import io.kafbat.ui.model.rbac.AccessContext;
32+
import io.kafbat.ui.model.rbac.permission.ConnectAction;
33+
import io.kafbat.ui.service.KafkaConnectService;
3134
import io.kafbat.ui.service.TopicsService;
3235
import io.kafbat.ui.service.analyze.TopicAnalysisService;
3336
import io.kafbat.ui.service.mcp.McpTool;
@@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5558
private final TopicAnalysisService topicAnalysisService;
5659
private final ClusterMapper clusterMapper;
5760
private final ClustersProperties clustersProperties;
61+
private final KafkaConnectService kafkaConnectService;
5862

5963
@Override
6064
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -370,4 +374,23 @@ private Comparator<InternalTopic> getComparatorForTopic(
370374
default -> defaultComparator;
371375
};
372376
}
377+
378+
@Override
379+
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getTopicConnectors(String clusterName,
380+
String topicName,
381+
ServerWebExchange exchange) {
382+
var context = AccessContext.builder()
383+
.cluster(clusterName)
384+
.topicActions(topicName, VIEW)
385+
.operationName("getTopicConnectors")
386+
.operationParams(topicName)
387+
.build();
388+
389+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName)
390+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
391+
392+
return validateAccess(context)
393+
.then(Mono.just(ResponseEntity.ok(job)))
394+
.doOnEach(sig -> audit(context, sig));
395+
}
373396
}

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.connect.model.ClusterInfo;
5+
import io.kafbat.ui.connect.model.Connector;
56
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
67
import io.kafbat.ui.connect.model.ConnectorTask;
8+
import io.kafbat.ui.connect.model.ConnectorTopics;
9+
import io.kafbat.ui.connect.model.ExpandedConnector;
710
import io.kafbat.ui.connect.model.NewConnector;
811
import io.kafbat.ui.model.ConnectDTO;
912
import io.kafbat.ui.model.ConnectorDTO;
@@ -14,21 +17,41 @@
1417
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1518
import io.kafbat.ui.model.FullConnectorInfoDTO;
1619
import io.kafbat.ui.model.TaskDTO;
20+
import io.kafbat.ui.model.TaskIdDTO;
1721
import io.kafbat.ui.model.TaskStatusDTO;
1822
import io.kafbat.ui.model.connect.InternalConnectorInfo;
23+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
1924
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Objects;
2027
import java.util.Optional;
28+
import java.util.stream.Collectors;
29+
import javax.annotation.Nullable;
2130
import org.mapstruct.Mapper;
2231
import org.mapstruct.Mapping;
2332

2433
@Mapper(componentModel = "spring")
2534
public interface KafkaConnectMapper {
2635
NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector);
2736

37+
default ClusterInfo toClient(KafkaConnectState state) {
38+
ClusterInfo clusterInfo = new ClusterInfo();
39+
clusterInfo.setVersion(state.getVersion());
40+
return clusterInfo;
41+
}
42+
2843
@Mapping(target = "status", ignore = true)
2944
@Mapping(target = "connect", ignore = true)
3045
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
3146

47+
default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) {
48+
ConnectorDTO connectorDto = this.fromClient(connector);
49+
if (topics != null) {
50+
return connectorDto.topics(topics.getTopics());
51+
}
52+
return connectorDto;
53+
}
54+
3255
ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus);
3356

3457
@Mapping(target = "status", ignore = true)
@@ -43,6 +66,39 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4366
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
4467
connectorPluginConfigValidationResponse);
4568

69+
default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List<String> topics) {
70+
Objects.requireNonNull(connector.getInfo());
71+
Objects.requireNonNull(connector.getStatus());
72+
List<TaskDTO> tasks = List.of();
73+
74+
if (connector.getInfo().getTasks() != null
75+
&& connector.getStatus().getTasks() != null
76+
) {
77+
Map<Integer, TaskIdDTO> taskIds = connector.getInfo().getTasks()
78+
.stream().map(t -> new TaskIdDTO().task(t.getTask()).connector(t.getConnector()))
79+
.collect(Collectors.toMap(
80+
TaskIdDTO::getTask,
81+
t -> t
82+
));
83+
84+
tasks = connector.getStatus().getTasks().stream()
85+
.map(s ->
86+
new TaskDTO().status(fromClient(s)).id(taskIds.get(s.getId()))
87+
).toList();
88+
}
89+
90+
ConnectorDTO connectorDto = fromClient(connector.getInfo())
91+
.connect(connect)
92+
.status(fromClient(connector.getStatus().getConnector()));
93+
94+
return InternalConnectorInfo.builder()
95+
.connector(connectorDto)
96+
.config(connector.getInfo().getConfig())
97+
.tasks(tasks)
98+
.topics(topics)
99+
.build();
100+
}
101+
46102
default ConnectDTO toKafkaConnect(
47103
ClustersProperties.ConnectCluster connect,
48104
List<InternalConnectorInfo> connectors,
@@ -114,4 +170,21 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo
114170
.tasksCount(tasks.size())
115171
.failedTasksCount(failedTasksCount);
116172
}
173+
174+
default KafkaConnectState toScrapeState(ConnectDTO connect, List<InternalConnectorInfo> connectors) {
175+
return KafkaConnectState.builder()
176+
.name(connect.getName())
177+
.version(connect.getVersion().orElse("Unknown"))
178+
.connectors(connectors.stream().map(this::toScrapeState).toList())
179+
.build();
180+
}
181+
182+
default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo connector) {
183+
return new KafkaConnectState.ConnectorState(
184+
connector.getConnector().getName(),
185+
connector.getConnector().getType(),
186+
connector.getConnector().getStatus(),
187+
connector.getTopics()
188+
);
189+
}
117190
}

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.kafbat.ui.model;
22

33
import io.kafbat.ui.service.ReactiveAdminClient;
4+
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
45
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
56
import java.util.List;
7+
import java.util.Map;
68
import java.util.function.UnaryOperator;
79
import java.util.stream.Stream;
810
import lombok.Builder;
@@ -19,6 +21,7 @@ public class Statistics implements AutoCloseable {
1921
ReactiveAdminClient.ClusterDescription clusterDescription;
2022
Metrics metrics;
2123
ScrapedClusterState clusterState;
24+
Map<String, KafkaConnectState> connectStates;
2225

2326
public static Statistics empty() {
2427
return builder()
@@ -28,6 +31,7 @@ public static Statistics empty() {
2831
.clusterDescription(ReactiveAdminClient.ClusterDescription.empty())
2932
.metrics(Metrics.empty())
3033
.clusterState(ScrapedClusterState.empty())
34+
.connectStates(Map.of())
3135
.build();
3236
}
3337

api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.springframework.stereotype.Component;
2020

2121
@Component
22-
class KafkaConfigSanitizer {
22+
public class KafkaConfigSanitizer {
2323

2424
private static final String SANITIZED_VALUE = "******";
2525

0 commit comments

Comments
 (0)