Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ dependencies {
implementation libs.netty.common
implementation libs.netty.handler

implementation libs.modelcontextprotocol.spring.webflux
implementation libs.victools.jsonschema.generator

// Google Managed Service for Kafka IAM support
implementation (libs.google.managed.kafka.login.handler) {
Expand Down
18 changes: 9 additions & 9 deletions api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ public class CustomWebFilter implements WebFilter {

final String path = exchange.getRequest().getPath().pathWithinApplication().value();

ServerWebExchange filterExchange = exchange;

if (path.startsWith("/ui") || path.isEmpty() || path.equals("/")) {
return chain.filter(
exchange.mutate().request(
exchange.getRequest().mutate()
.path(basePath + "/index.html")
.contextPath(basePath)
.build()
).build()
);
filterExchange = exchange.mutate().request(
exchange.getRequest().mutate()
.path(basePath + "/index.html")
.contextPath(basePath)
.build()
).build();
}

return chain.filter(exchange);
return chain.filter(filterExchange).contextWrite(ctx -> ctx.put(ServerWebExchange.class, exchange));
}
}
18 changes: 18 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/JsonSchemaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.kafbat.ui.config;

import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JsonSchemaConfig {
@Bean
public SchemaGenerator schemaGenerator() {
SchemaGeneratorConfigBuilder configBuilder =
new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON);
return new SchemaGenerator(configBuilder.build());
}
}
72 changes: 72 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/McpConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.kafbat.ui.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.service.mcp.McpSpecificationGenerator;
import io.kafbat.ui.service.mcp.McpTool;
import io.modelcontextprotocol.server.McpAsyncServer;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification;
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(value = "mcp.enabled", havingValue = "true")
public class McpConfig {

private final List<McpTool> mcpTools;
private final McpSpecificationGenerator mcpSpecificationGenerator;

// SSE transport
@Bean
public WebFluxSseServerTransportProvider sseServerTransport(ObjectMapper mapper) {
return new WebFluxSseServerTransportProvider(mapper, "/mcp/message", "/mcp/sse");
}

// Router function for SSE transport used by Spring WebFlux to start an HTTP
// server.

@Bean
public RouterFunction<?> mcpRouterFunction(WebFluxSseServerTransportProvider transport) {
return transport.getRouterFunction();
}

@Bean
public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {

// Configure server capabilities with resource support
var capabilities = McpSchema.ServerCapabilities.builder()
.resources(false, true)
.tools(true) // Tool support with list changes notifications
.prompts(false) // Prompt support with list changes notifications
.logging() // Logging support
.build();

// Create the server with both tool and resource capabilities
return McpServer.async(transport)
.serverInfo("Kafka UI MCP", "0.0.1")
.capabilities(capabilities)
.tools(tools())
.build();
}

private List<AsyncPromptSpecification> prompts() {
return List.of();
}

private List<AsyncToolSpecification> tools() {
List<AsyncToolSpecification> tools = new ArrayList<>();
for (McpTool mcpTool : mcpTools) {
tools.addAll(mcpSpecificationGenerator.convertTool(mcpTool));
}
return tools;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.AclAction;
import io.kafbat.ui.service.acl.AclsService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
Expand All @@ -24,7 +25,7 @@

@RestController
@RequiredArgsConstructor
public class AclsController extends AbstractController implements AclsApi {
public class AclsController extends AbstractController implements AclsApi, McpTool {

private final AclsService aclsService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.service.BrokerService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -25,7 +26,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class BrokersController extends AbstractController implements BrokersApi {
public class BrokersController extends AbstractController implements BrokersApi, McpTool {
private static final String BROKER_ID = "brokerId";

private final BrokerService brokerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.ClientQuotasDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.service.mcp.McpTool;
import io.kafbat.ui.service.quota.ClientQuotaRecord;
import io.kafbat.ui.service.quota.ClientQuotaService;
import java.math.BigDecimal;
Expand All @@ -22,7 +23,7 @@

@RestController
@RequiredArgsConstructor
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
public class ClientQuotasController extends AbstractController implements ClientQuotasApi, McpTool {

private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.ClusterStatsDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.service.ClusterService;
import io.kafbat.ui.service.mcp.McpTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand All @@ -17,7 +18,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class ClustersController extends AbstractController implements ClustersApi {
public class ClustersController extends AbstractController implements ClustersApi, McpTool {
private final ClusterService clusterService;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kafbat.ui.model.rbac.permission.TopicAction;
import io.kafbat.ui.service.ConsumerGroupService;
import io.kafbat.ui.service.OffsetsResetService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -35,7 +36,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi, McpTool {

private final ConsumerGroupService consumerGroupService;
private final OffsetsResetService offsetsResetService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
Expand All @@ -35,7 +36,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
public class KafkaConnectController extends AbstractController implements KafkaConnectApi, McpTool {
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private static final String CONNECTOR_NAME = "connectorName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.service.ksql.KsqlServiceV2;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -24,7 +25,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class KsqlController extends AbstractController implements KsqlApi {
public class KsqlController extends AbstractController implements KsqlApi, McpTool {

private final KsqlServiceV2 ksqlServiceV2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.kafbat.ui.serde.api.Serde;
import io.kafbat.ui.service.DeserializationService;
import io.kafbat.ui.service.MessagesService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Optional;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand All @@ -41,7 +43,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class MessagesController extends AbstractController implements MessagesApi {
public class MessagesController extends AbstractController implements MessagesApi, McpTool {

private final MessagesService messagesService;
private final DeserializationService deserializationService;
Expand Down Expand Up @@ -148,8 +150,8 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(

return validateAccess(context).then(
createTopicMessage.flatMap(msg ->
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
).map(ResponseEntity::ok)
messagesService.sendMessage(getCluster(clusterName), topicName, msg)
).map(m -> new ResponseEntity<Void>(HttpStatus.OK))
).doOnEach(sig -> audit(context, sig));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.SchemaAction;
import io.kafbat.ui.service.SchemaRegistryService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
Expand All @@ -28,7 +29,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class SchemasController extends AbstractController implements SchemasApi {
public class SchemasController extends AbstractController implements SchemasApi, McpTool {

private static final Integer DEFAULT_PAGE_SIZE = 25;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.service.TopicsService;
import io.kafbat.ui.service.analyze.TopicAnalysisService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand All @@ -46,7 +47,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class TopicsController extends AbstractController implements TopicsApi {
public class TopicsController extends AbstractController implements TopicsApi, McpTool {

private static final Integer DEFAULT_PAGE_SIZE = 25;

Expand Down
8 changes: 4 additions & 4 deletions api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static Predicate<TopicMessageDTO> noop() {

public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string) || headersContains(msg, string);
|| StringUtils.contains(msg.getValue(), string) || headersContains(msg, string);
}

private static boolean headersContains(TopicMessageDTO msg, String searchString) {
Expand Down Expand Up @@ -126,9 +126,9 @@ private static Map<String, Map<String, Object>> recordToArgs(TopicMessageDTO top
args.put("keyAsText", topicMessage.getKey());
}

if (topicMessage.getContent() != null) {
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getContent()));
args.put("valueAsText", topicMessage.getContent());
if (topicMessage.getValue() != null) {
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getValue()));
args.put("valueAsText", topicMessage.getValue());
}

args.put("headers", Objects.requireNonNullElse(topicMessage.getHeaders(), emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
try {
var deserResult = valueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setValue(deserResult.getResult());
message.setValueSerde(valueSerdeName);
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
var deserResult = fallbackValueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setValue(deserResult.getResult());
message.setValueSerde(fallbackSerdeName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdmi
case STATE -> {
ToIntFunction<ConsumerGroupListing> statesPriorities =
cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) {
case STABLE -> 0;
case COMPLETING_REBALANCE -> 1;
case PREPARING_REBALANCE -> 2;
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
case STABLE -> 0;
case COMPLETING_REBALANCE -> 1;
case PREPARING_REBALANCE -> 2;
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
var comparator = Comparator.comparingInt(statesPriorities);
yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
}
Expand Down
Loading
Loading