Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ dependencies {
implementation libs.netty.common
implementation libs.netty.handler

implementation libs.modelcontextprotocol.spring.webflux
implementation libs.victools.jsonschema.generator

// Google Managed Service for Kafka IAM support
implementation (libs.google.managed.kafka.login.handler) {
Expand Down
18 changes: 9 additions & 9 deletions api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ public class CustomWebFilter implements WebFilter {

final String path = exchange.getRequest().getPath().pathWithinApplication().value();

ServerWebExchange filterExchange = exchange;

if (path.startsWith("/ui") || path.isEmpty() || path.equals("/")) {
return chain.filter(
exchange.mutate().request(
exchange.getRequest().mutate()
.path(basePath + "/index.html")
.contextPath(basePath)
.build()
).build()
);
filterExchange = exchange.mutate().request(
exchange.getRequest().mutate()
.path(basePath + "/index.html")
.contextPath(basePath)
.build()
).build();
}

return chain.filter(exchange);
return chain.filter(filterExchange).contextWrite(ctx -> ctx.put(ServerWebExchange.class, exchange));
}
}
18 changes: 18 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/JsonSchemaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.kafbat.ui.config;

import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaGenerator;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JsonSchemaConfig {
@Bean
public SchemaGenerator schemaGenerator() {
SchemaGeneratorConfigBuilder configBuilder =
new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON);
return new SchemaGenerator(configBuilder.build());
}
}
68 changes: 68 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/McpConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kafbat.ui.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.service.mcp.McpSpecificationGenerator;
import io.kafbat.ui.service.mcp.McpTool;
import io.modelcontextprotocol.server.McpAsyncServer;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification;
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;

@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(value = "mcp.enabled", havingValue = "true")
public class McpConfig {

private final List<McpTool> mcpTools;
private final McpSpecificationGenerator mcpSpecificationGenerator;

// SSE transport
@Bean
public WebFluxSseServerTransportProvider sseServerTransport(ObjectMapper mapper) {
return new WebFluxSseServerTransportProvider(mapper, "/mcp/message", "/mcp/sse");
}

// Router function for SSE transport used by Spring WebFlux to start an HTTP
// server.

@Bean
public RouterFunction<?> mcpRouterFunction(WebFluxSseServerTransportProvider transport) {
return transport.getRouterFunction();
}

@Bean
public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {

// Configure server capabilities with resource support
var capabilities = McpSchema.ServerCapabilities.builder()
.resources(false, true)
.tools(true) // Tool support with list changes notifications
.prompts(false) // Prompt support with list changes notifications
.logging() // Logging support
.build();

// Create the server with both tool and resource capabilities
return McpServer.async(transport)
.serverInfo("Kafka UI MCP", "0.0.1")
.capabilities(capabilities)
.tools(tools())
.build();
}

private List<AsyncToolSpecification> tools() {
List<AsyncToolSpecification> tools = new ArrayList<>();
for (McpTool mcpTool : mcpTools) {
tools.addAll(mcpSpecificationGenerator.convertTool(mcpTool));
}
return tools;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.AclAction;
import io.kafbat.ui.service.acl.AclsService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
Expand All @@ -24,7 +25,7 @@

@RestController
@RequiredArgsConstructor
public class AclsController extends AbstractController implements AclsApi {
public class AclsController extends AbstractController implements AclsApi, McpTool {

private final AclsService aclsService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
import io.kafbat.ui.service.BrokerService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -25,7 +26,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class BrokersController extends AbstractController implements BrokersApi {
public class BrokersController extends AbstractController implements BrokersApi, McpTool {
private static final String BROKER_ID = "brokerId";

private final BrokerService brokerService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.ClientQuotasDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
import io.kafbat.ui.service.mcp.McpTool;
import io.kafbat.ui.service.quota.ClientQuotaRecord;
import io.kafbat.ui.service.quota.ClientQuotaService;
import java.math.BigDecimal;
Expand All @@ -22,7 +23,7 @@

@RestController
@RequiredArgsConstructor
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
public class ClientQuotasController extends AbstractController implements ClientQuotasApi, McpTool {

private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.model.ClusterStatsDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.service.ClusterService;
import io.kafbat.ui.service.mcp.McpTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand All @@ -17,7 +18,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class ClustersController extends AbstractController implements ClustersApi {
public class ClustersController extends AbstractController implements ClustersApi, McpTool {
private final ClusterService clusterService;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kafbat.ui.model.rbac.permission.TopicAction;
import io.kafbat.ui.service.ConsumerGroupService;
import io.kafbat.ui.service.OffsetsResetService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -35,7 +36,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi, McpTool {

private final ConsumerGroupService consumerGroupService;
private final OffsetsResetService offsetsResetService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
Expand All @@ -35,7 +36,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
public class KafkaConnectController extends AbstractController implements KafkaConnectApi, McpTool {
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private static final String CONNECTOR_NAME = "connectorName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.KsqlAction;
import io.kafbat.ui.service.ksql.KsqlServiceV2;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -24,7 +25,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class KsqlController extends AbstractController implements KsqlApi {
public class KsqlController extends AbstractController implements KsqlApi, McpTool {

private final KsqlServiceV2 ksqlServiceV2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.kafbat.ui.serde.api.Serde;
import io.kafbat.ui.service.DeserializationService;
import io.kafbat.ui.service.MessagesService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Optional;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand All @@ -41,7 +43,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class MessagesController extends AbstractController implements MessagesApi {
public class MessagesController extends AbstractController implements MessagesApi, McpTool {

private final MessagesService messagesService;
private final DeserializationService deserializationService;
Expand Down Expand Up @@ -148,8 +150,8 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(

return validateAccess(context).then(
createTopicMessage.flatMap(msg ->
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
).map(ResponseEntity::ok)
messagesService.sendMessage(getCluster(clusterName), topicName, msg)
).map(m -> new ResponseEntity<Void>(HttpStatus.OK))
).doOnEach(sig -> audit(context, sig));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.SchemaAction;
import io.kafbat.ui.service.SchemaRegistryService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
Expand All @@ -28,7 +29,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class SchemasController extends AbstractController implements SchemasApi {
public class SchemasController extends AbstractController implements SchemasApi, McpTool {

private static final Integer DEFAULT_PAGE_SIZE = 25;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.service.TopicsService;
import io.kafbat.ui.service.analyze.TopicAnalysisService;
import io.kafbat.ui.service.mcp.McpTool;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand All @@ -46,7 +47,7 @@
@RestController
@RequiredArgsConstructor
@Slf4j
public class TopicsController extends AbstractController implements TopicsApi {
public class TopicsController extends AbstractController implements TopicsApi, McpTool {

private static final Integer DEFAULT_PAGE_SIZE = 25;

Expand Down
8 changes: 4 additions & 4 deletions api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static Predicate<TopicMessageDTO> noop() {

public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string) || headersContains(msg, string);
|| StringUtils.contains(msg.getValue(), string) || headersContains(msg, string);
}

private static boolean headersContains(TopicMessageDTO msg, String searchString) {
Expand Down Expand Up @@ -126,9 +126,9 @@ private static Map<String, Map<String, Object>> recordToArgs(TopicMessageDTO top
args.put("keyAsText", topicMessage.getKey());
}

if (topicMessage.getContent() != null) {
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getContent()));
args.put("valueAsText", topicMessage.getContent());
if (topicMessage.getValue() != null) {
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getValue()));
args.put("valueAsText", topicMessage.getValue());
}

args.put("headers", Objects.requireNonNullElse(topicMessage.getHeaders(), emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
try {
var deserResult = valueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setValue(deserResult.getResult());
message.setValueSerde(valueSerdeName);
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
var deserResult = fallbackValueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setValue(deserResult.getResult());
message.setValueSerde(fallbackSerdeName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdmi
case STATE -> {
ToIntFunction<ConsumerGroupListing> statesPriorities =
cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) {
case STABLE -> 0;
case COMPLETING_REBALANCE -> 1;
case PREPARING_REBALANCE -> 2;
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
case STABLE -> 0;
case COMPLETING_REBALANCE -> 1;
case PREPARING_REBALANCE -> 2;
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
var comparator = Comparator.comparingInt(statesPriorities);
yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ public DeserializationService(Environment env,
}
}

public ClusterSerdes getSerdesFor(String clusterName) {
return clusterSerdes.get(clusterName);
}

private ClusterSerdes getSerdesFor(KafkaCluster cluster) {
return clusterSerdes.get(cluster.getName());
return getSerdesFor(cluster.getName());
}

private Serde.Serializer getSerializer(KafkaCluster cluster,
String topic,
Serde.Target type,
String serdeName) {
var serdes = getSerdesFor(cluster);
var serdes = getSerdesFor(cluster.getName());
var serde = serdes.serdeForName(serdeName)
.orElseThrow(() -> new ValidationException(
String.format("Serde %s not found", serdeName)));
Expand Down
Loading
Loading