diff --git a/api/build.gradle b/api/build.gradle index 563da52ab..a2ea1c568 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -62,6 +62,8 @@ dependencies { implementation libs.netty.common implementation libs.netty.handler + implementation libs.modelcontextprotocol.spring.webflux + implementation libs.victools.jsonschema.generator // Google Managed Service for Kafka IAM support implementation (libs.google.managed.kafka.login.handler) { diff --git a/api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java b/api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java index 3db292452..f5f3cd7c5 100644 --- a/api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java +++ b/api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java @@ -17,17 +17,17 @@ public class CustomWebFilter implements WebFilter { final String path = exchange.getRequest().getPath().pathWithinApplication().value(); + ServerWebExchange filterExchange = exchange; + if (path.startsWith("/ui") || path.isEmpty() || path.equals("/")) { - return chain.filter( - exchange.mutate().request( - exchange.getRequest().mutate() - .path(basePath + "/index.html") - .contextPath(basePath) - .build() - ).build() - ); + filterExchange = exchange.mutate().request( + exchange.getRequest().mutate() + .path(basePath + "/index.html") + .contextPath(basePath) + .build() + ).build(); } - return chain.filter(exchange); + return chain.filter(filterExchange).contextWrite(ctx -> ctx.put(ServerWebExchange.class, exchange)); } } diff --git a/api/src/main/java/io/kafbat/ui/config/JsonSchemaConfig.java b/api/src/main/java/io/kafbat/ui/config/JsonSchemaConfig.java new file mode 100644 index 000000000..0ab0cd1bc --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/config/JsonSchemaConfig.java @@ -0,0 +1,18 @@ +package io.kafbat.ui.config; + +import com.github.victools.jsonschema.generator.OptionPreset; +import com.github.victools.jsonschema.generator.SchemaGenerator; +import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder; +import com.github.victools.jsonschema.generator.SchemaVersion; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class JsonSchemaConfig { + @Bean + public SchemaGenerator schemaGenerator() { + SchemaGeneratorConfigBuilder configBuilder = + new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON); + return new SchemaGenerator(configBuilder.build()); + } +} diff --git a/api/src/main/java/io/kafbat/ui/config/McpConfig.java b/api/src/main/java/io/kafbat/ui/config/McpConfig.java new file mode 100644 index 000000000..cf4181f73 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/config/McpConfig.java @@ -0,0 +1,68 @@ +package io.kafbat.ui.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kafbat.ui.service.mcp.McpSpecificationGenerator; +import io.kafbat.ui.service.mcp.McpTool; +import io.modelcontextprotocol.server.McpAsyncServer; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification; +import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification; +import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider; +import io.modelcontextprotocol.spec.McpSchema; +import java.util.ArrayList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.server.RouterFunction; + +@Configuration +@RequiredArgsConstructor +@ConditionalOnProperty(value = "mcp.enabled", havingValue = "true") +public class McpConfig { + + private final List mcpTools; + private final McpSpecificationGenerator mcpSpecificationGenerator; + + // SSE transport + @Bean + public WebFluxSseServerTransportProvider sseServerTransport(ObjectMapper mapper) { + return new WebFluxSseServerTransportProvider(mapper, "/mcp/message", "/mcp/sse"); + } + + // Router function for SSE transport used by Spring WebFlux to start an HTTP + // server. + + @Bean + public RouterFunction mcpRouterFunction(WebFluxSseServerTransportProvider transport) { + return transport.getRouterFunction(); + } + + @Bean + public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) { + + // Configure server capabilities with resource support + var capabilities = McpSchema.ServerCapabilities.builder() + .resources(false, true) + .tools(true) // Tool support with list changes notifications + .prompts(false) // Prompt support with list changes notifications + .logging() // Logging support + .build(); + + // Create the server with both tool and resource capabilities + return McpServer.async(transport) + .serverInfo("Kafka UI MCP", "0.0.1") + .capabilities(capabilities) + .tools(tools()) + .build(); + } + + private List tools() { + List tools = new ArrayList<>(); + for (McpTool mcpTool : mcpTools) { + tools.addAll(mcpSpecificationGenerator.convertTool(mcpTool)); + } + return tools; + } +} diff --git a/api/src/main/java/io/kafbat/ui/controller/AclsController.java b/api/src/main/java/io/kafbat/ui/controller/AclsController.java index bbb30b5c6..ae30f8bd0 100644 --- a/api/src/main/java/io/kafbat/ui/controller/AclsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/AclsController.java @@ -11,6 +11,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.AclAction; import io.kafbat.ui.service.acl.AclsService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.resource.PatternType; @@ -24,7 +25,7 @@ @RestController @RequiredArgsConstructor -public class AclsController extends AbstractController implements AclsApi { +public class AclsController extends AbstractController implements AclsApi, McpTool { private final AclsService aclsService; diff --git a/api/src/main/java/io/kafbat/ui/controller/BrokersController.java b/api/src/main/java/io/kafbat/ui/controller/BrokersController.java index cd81f7a5f..9c76fdc5e 100644 --- a/api/src/main/java/io/kafbat/ui/controller/BrokersController.java +++ b/api/src/main/java/io/kafbat/ui/controller/BrokersController.java @@ -11,6 +11,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.ClusterConfigAction; import io.kafbat.ui.service.BrokerService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -25,7 +26,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class BrokersController extends AbstractController implements BrokersApi { +public class BrokersController extends AbstractController implements BrokersApi, McpTool { private static final String BROKER_ID = "brokerId"; private final BrokerService brokerService; diff --git a/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java index 9741fd216..e853e5403 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java @@ -6,6 +6,7 @@ import io.kafbat.ui.model.ClientQuotasDTO; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.ClientQuotaAction; +import io.kafbat.ui.service.mcp.McpTool; import io.kafbat.ui.service.quota.ClientQuotaRecord; import io.kafbat.ui.service.quota.ClientQuotaService; import java.math.BigDecimal; @@ -22,7 +23,7 @@ @RestController @RequiredArgsConstructor -public class ClientQuotasController extends AbstractController implements ClientQuotasApi { +public class ClientQuotasController extends AbstractController implements ClientQuotasApi, McpTool { private static final Comparator QUOTA_RECORDS_COMPARATOR = Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user)) diff --git a/api/src/main/java/io/kafbat/ui/controller/ClustersController.java b/api/src/main/java/io/kafbat/ui/controller/ClustersController.java index 4d83df1a4..463c9e965 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ClustersController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ClustersController.java @@ -6,6 +6,7 @@ import io.kafbat.ui.model.ClusterStatsDTO; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.service.ClusterService; +import io.kafbat.ui.service.mcp.McpTool; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; @@ -17,7 +18,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class ClustersController extends AbstractController implements ClustersApi { +public class ClustersController extends AbstractController implements ClustersApi, McpTool { private final ClusterService clusterService; @Override diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 32337bbcf..916ed185f 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -19,6 +19,7 @@ import io.kafbat.ui.model.rbac.permission.TopicAction; import io.kafbat.ui.service.ConsumerGroupService; import io.kafbat.ui.service.OffsetsResetService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; @@ -35,7 +36,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi { +public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi, McpTool { private final ConsumerGroupService consumerGroupService; private final OffsetsResetService offsetsResetService; diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 328a7353e..4c5dc4d84 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -20,6 +20,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.service.KafkaConnectService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.Comparator; import java.util.Map; import java.util.Set; @@ -35,7 +36,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class KafkaConnectController extends AbstractController implements KafkaConnectApi { +public class KafkaConnectController extends AbstractController implements KafkaConnectApi, McpTool { private static final Set RESTART_ACTIONS = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS); private static final String CONNECTOR_NAME = "connectorName"; diff --git a/api/src/main/java/io/kafbat/ui/controller/KsqlController.java b/api/src/main/java/io/kafbat/ui/controller/KsqlController.java index d8b3203a9..218421e66 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KsqlController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KsqlController.java @@ -10,6 +10,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.KsqlAction; import io.kafbat.ui.service.ksql.KsqlServiceV2; +import io.kafbat.ui.service.mcp.McpTool; import java.util.List; import java.util.Map; import java.util.Optional; @@ -24,7 +25,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class KsqlController extends AbstractController implements KsqlApi { +public class KsqlController extends AbstractController implements KsqlApi, McpTool { private final KsqlServiceV2 ksqlServiceV2; diff --git a/api/src/main/java/io/kafbat/ui/controller/MessagesController.java b/api/src/main/java/io/kafbat/ui/controller/MessagesController.java index b88e2d566..b99fa2aba 100644 --- a/api/src/main/java/io/kafbat/ui/controller/MessagesController.java +++ b/api/src/main/java/io/kafbat/ui/controller/MessagesController.java @@ -26,11 +26,13 @@ import io.kafbat.ui.serde.api.Serde; import io.kafbat.ui.service.DeserializationService; import io.kafbat.ui.service.MessagesService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.List; import java.util.Optional; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -41,7 +43,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class MessagesController extends AbstractController implements MessagesApi { +public class MessagesController extends AbstractController implements MessagesApi, McpTool { private final MessagesService messagesService; private final DeserializationService deserializationService; @@ -148,8 +150,8 @@ public Mono> sendTopicMessages( return validateAccess(context).then( createTopicMessage.flatMap(msg -> - messagesService.sendMessage(getCluster(clusterName), topicName, msg).then() - ).map(ResponseEntity::ok) + messagesService.sendMessage(getCluster(clusterName), topicName, msg) + ).map(m -> new ResponseEntity(HttpStatus.OK)) ).doOnEach(sig -> audit(context, sig)); } diff --git a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java index 079ac674c..6f73d3525 100644 --- a/api/src/main/java/io/kafbat/ui/controller/SchemasController.java +++ b/api/src/main/java/io/kafbat/ui/controller/SchemasController.java @@ -13,6 +13,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.SchemaAction; import io.kafbat.ui.service.SchemaRegistryService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.List; import java.util.Map; import javax.validation.Valid; @@ -28,7 +29,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class SchemasController extends AbstractController implements SchemasApi { +public class SchemasController extends AbstractController implements SchemasApi, McpTool { private static final Integer DEFAULT_PAGE_SIZE = 25; diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index c230f4751..208ca59cb 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -29,6 +29,7 @@ import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.service.TopicsService; import io.kafbat.ui.service.analyze.TopicAnalysisService; +import io.kafbat.ui.service.mcp.McpTool; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -46,7 +47,7 @@ @RestController @RequiredArgsConstructor @Slf4j -public class TopicsController extends AbstractController implements TopicsApi { +public class TopicsController extends AbstractController implements TopicsApi, McpTool { private static final Integer DEFAULT_PAGE_SIZE = 25; diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java index 7472c452f..fbbc84ab3 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java @@ -56,7 +56,7 @@ public static Predicate noop() { public static Predicate containsStringFilter(String string) { return msg -> StringUtils.contains(msg.getKey(), string) - || StringUtils.contains(msg.getContent(), string) || headersContains(msg, string); + || StringUtils.contains(msg.getValue(), string) || headersContains(msg, string); } private static boolean headersContains(TopicMessageDTO msg, String searchString) { @@ -126,9 +126,9 @@ private static Map> recordToArgs(TopicMessageDTO top args.put("keyAsText", topicMessage.getKey()); } - if (topicMessage.getContent() != null) { - args.put("value", parseToJsonOrReturnAsIs(topicMessage.getContent())); - args.put("valueAsText", topicMessage.getContent()); + if (topicMessage.getValue() != null) { + args.put("value", parseToJsonOrReturnAsIs(topicMessage.getValue())); + args.put("valueAsText", topicMessage.getValue()); } args.put("headers", Objects.requireNonNullElse(topicMessage.getHeaders(), emptyMap())); diff --git a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java index 854f57b9e..034c6fcf6 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java @@ -99,7 +99,7 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord rec try { var deserResult = valueDeserializer.deserialize( new RecordHeadersImpl(rec.headers()), rec.value().get()); - message.setContent(deserResult.getResult()); + message.setValue(deserResult.getResult()); message.setValueSerde(valueSerdeName); message.setValueDeserializeProperties(deserResult.getAdditionalProperties()); } catch (Exception e) { @@ -107,7 +107,7 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord rec rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e); var deserResult = fallbackValueDeserializer.deserialize( new RecordHeadersImpl(rec.headers()), rec.value().get()); - message.setContent(deserResult.getResult()); + message.setValue(deserResult.getResult()); message.setValueSerde(fallbackSerdeName); } } diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index f1256cf00..00ea5179a 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -143,15 +143,15 @@ private Mono> loadSortedDescriptions(ReactiveAdmi case STATE -> { ToIntFunction statesPriorities = cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) { - case STABLE -> 0; - case COMPLETING_REBALANCE -> 1; - case PREPARING_REBALANCE -> 2; - case EMPTY -> 3; - case DEAD -> 4; - case UNKNOWN -> 5; - case ASSIGNING -> 6; - case RECONCILING -> 7; - }; + case STABLE -> 0; + case COMPLETING_REBALANCE -> 1; + case PREPARING_REBALANCE -> 2; + case EMPTY -> 3; + case DEAD -> 4; + case UNKNOWN -> 5; + case ASSIGNING -> 6; + case RECONCILING -> 7; + }; var comparator = Comparator.comparingInt(statesPriorities); yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto); } diff --git a/api/src/main/java/io/kafbat/ui/service/DeserializationService.java b/api/src/main/java/io/kafbat/ui/service/DeserializationService.java index 87e1b202d..ccf82b987 100644 --- a/api/src/main/java/io/kafbat/ui/service/DeserializationService.java +++ b/api/src/main/java/io/kafbat/ui/service/DeserializationService.java @@ -41,15 +41,19 @@ public DeserializationService(Environment env, } } + public ClusterSerdes getSerdesFor(String clusterName) { + return clusterSerdes.get(clusterName); + } + private ClusterSerdes getSerdesFor(KafkaCluster cluster) { - return clusterSerdes.get(cluster.getName()); + return getSerdesFor(cluster.getName()); } private Serde.Serializer getSerializer(KafkaCluster cluster, String topic, Serde.Target type, String serdeName) { - var serdes = getSerdesFor(cluster); + var serdes = getSerdesFor(cluster.getName()); var serde = serdes.serdeForName(serdeName) .orElseThrow(() -> new ValidationException( String.format("Serde %s not found", serdeName))); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 92bfc260b..31f552885 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -207,14 +207,15 @@ public Mono updateConnectorState(KafkaCluster cluster, String connectName, String connectorName, ConnectorActionDTO action) { return api(cluster, connectName) .mono(client -> switch (action) { - case RESTART -> client.restartConnector(connectorName, false, false); - case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true); - case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName, - t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED); - case PAUSE -> client.pauseConnector(connectorName); - case STOP -> client.stopConnector(connectorName); - case RESUME -> client.resumeConnector(connectorName); - }); + case RESTART -> client.restartConnector(connectorName, false, false); + case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true); + case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName, + t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED); + case PAUSE -> client.pauseConnector(connectorName); + case STOP -> client.stopConnector(connectorName); + case RESUME -> client.resumeConnector(connectorName); + } + ); } private Mono restartTasks(KafkaCluster cluster, String connectName, diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index b33be8b76..ad9244b40 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -110,7 +110,7 @@ public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterT var result = predicate.test( new TopicMessageDTO() .key(execData.getKey()) - .content(execData.getValue()) + .value(execData.getValue()) .headers(execData.getHeaders()) .offset(execData.getOffset()) .partition(execData.getPartition()) @@ -179,7 +179,7 @@ private Mono sendMessageImpl(KafkaCluster cluster, topicDescription.name(), msg.getPartition(), msg.getKey().orElse(null), - msg.getContent().orElse(null), + msg.getValue().orElse(null), msg.getHeaders() ); CompletableFuture cf = new CompletableFuture<>(); diff --git a/api/src/main/java/io/kafbat/ui/service/OffsetsResetService.java b/api/src/main/java/io/kafbat/ui/service/OffsetsResetService.java index ad783a92d..d2e376cf3 100644 --- a/api/src/main/java/io/kafbat/ui/service/OffsetsResetService.java +++ b/api/src/main/java/io/kafbat/ui/service/OffsetsResetService.java @@ -23,7 +23,7 @@ import reactor.core.publisher.Mono; /** - * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling + * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling . * to works like "kafka-consumer-groups --reset-offsets" console command * (see kafka.admin.ConsumerGroupCommand) */ diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 2503711c6..34ca4f688 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -427,12 +427,12 @@ private static Mono describeClusterImpl(AdminClient client, result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations()); return toMono(allOfFuture).then( Mono.fromCallable(() -> - new ClusterDescription( - result.controller().get(), - result.clusterId().get(), - result.nodes().get(), - result.authorizedOperations().get() - ) + new ClusterDescription( + result.controller().get(), + result.clusterId().get(), + result.nodes().get(), + result.authorizedOperations().get() + ) ) ); } diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 95ad7bc5a..0cdae7891 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -240,7 +240,7 @@ private Mono changeReplicationFactor( } /** - * Change topic replication factor, works on brokers versions 5.4.x and higher + * Change topic replication factor. Works on brokers versions 5.4.x and higher. */ public Mono changeReplicationFactor( KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java b/api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java index 13cc6ed0f..fd7a17b22 100644 --- a/api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java +++ b/api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java @@ -67,7 +67,7 @@ public UnaryOperator getMaskerForTopic(String topic) { var valMasker = getMaskingFunction(topic, Serde.Target.VALUE); return msg -> msg .key(keyMasker.apply(msg.getKey())) - .content(valMasker.apply(msg.getContent())); + .value(valMasker.apply(msg.getValue())); } @VisibleForTesting diff --git a/api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java b/api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java new file mode 100644 index 000000000..997922fc4 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/mcp/McpSpecificationGenerator.java @@ -0,0 +1,254 @@ +package io.kafbat.ui.service.mcp; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.victools.jsonschema.generator.SchemaGenerator; +import io.modelcontextprotocol.server.McpAsyncServerExchange; +import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.CallToolResult; +import io.modelcontextprotocol.spec.McpSchema.JsonSchema; +import io.swagger.v3.oas.annotations.Operation; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Parameter; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.support.AopUtils; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@Component +@RequiredArgsConstructor +public class McpSpecificationGenerator { + private final SchemaGenerator schemaGenerator; + private final ObjectMapper objectMapper = new ObjectMapper(); + + public List convertTool(McpTool controller) { + List result = new ArrayList<>(); + Class targetClass = AopUtils.getTargetClass(controller); + for (Method method : targetClass.getMethods()) { + Deprecated deprecated = AnnotationUtils.findAnnotation(method, Deprecated.class); + if (deprecated == null) { + Operation annotation = AnnotationUtils.findAnnotation(method, Operation.class); + if (annotation != null) { + result.add(this.convertOperation(method, annotation, controller)); + } + } + } + return result; + } + + private AsyncToolSpecification convertOperation(Method method, Operation annotation, McpTool instance) { + String name = annotation.operationId(); + String description = annotation.description().isEmpty() ? name : annotation.description(); + return new AsyncToolSpecification( + new McpSchema.Tool(name, description, operationSchema(method, instance)), + methodCall(method, instance) + ); + } + + @SuppressWarnings("unchecked") + private BiFunction, Mono> + methodCall(Method method, Object instance) { + + return (ex, args) -> Mono.deferContextual(ctx -> { + try { + ServerWebExchange serverWebExchange = ctx.get(ServerWebExchange.class); + Mono result = (Mono) method.invoke( + instance, + toParams(args, method.getParameters(), ex, serverWebExchange) + ); + return result.flatMap(this::toCallResult) + .onErrorResume((e) -> Mono.just(this.toErrorResult(e))); + } catch (IllegalAccessException | InvocationTargetException e) { + log.warn("Error invoking method {}: {}", method.getName(), e.getMessage(), e); + return Mono.just(this.toErrorResult(e)); + } + }); + } + + private Mono toCallResult(Object result) { + return switch (result) { + case Mono mono -> mono.map(this::callToolResult); + case Flux flux -> flux.collectList().map(this::callToolResult); + case ResponseEntity response -> reponseToCallResult(response); + case null, default -> Mono.just(this.callToolResult(result)); + }; + } + + private Mono reponseToCallResult(ResponseEntity response) { + HttpStatusCode statusCode = response.getStatusCode(); + if (statusCode.is2xxSuccessful() || statusCode.is1xxInformational()) { + return Mono.just(this.callToolResult(response.getBody())); + } else { + try { + return Mono.just(toErrorResult(objectMapper.writeValueAsString(response.getBody()))); + } catch (JsonProcessingException e) { + return Mono.just(toErrorResult(e)); + } + } + } + + private CallToolResult callToolResult(Object result) { + try { + return new CallToolResult( + List.of(new McpSchema.TextContent(objectMapper.writeValueAsString(result))), + false + ); + } catch (Exception e) { + return toErrorResult(e); + } + } + + protected CallToolResult toErrorResult(String body) { + return new CallToolResult( + List.of(new McpSchema.TextContent(body)), + true + ); + } + + protected CallToolResult toErrorResult(Throwable e) { + log.warn("Error responded to MCP Client: {}", e.getMessage(), e); + return new CallToolResult( + List.of(new McpSchema.TextContent(e.getMessage())), + true + ); + } + + private Object[] toParams( + Map mcpArgs, + Parameter[] parameters, + McpAsyncServerExchange ex, + ServerWebExchange serverWebExchange + ) { + Object[] values = new Object[parameters.length]; + for (int i = 0; i < parameters.length; i++) { + Parameter parameter = parameters[i]; + if (parameter.getType().equals(ServerWebExchange.class)) { + values[i] = serverWebExchange; + } else if (parameter.getType().equals(McpAsyncServerExchange.class)) { + values[i] = ex; + } else { + Object arg = mcpArgs.get(parameter.getName()); + if (arg != null) { + Class parameterType = parameter.getType(); + boolean mono = false; + + if (parameterType.isAssignableFrom(Mono.class)) { + ParameterizedType parameterizedType = (ParameterizedType) parameter.getParameterizedType(); + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + parameterType = (Class) actualTypeArguments[0]; + mono = true; + } + + if (parameterType.isAssignableFrom(arg.getClass())) { + values[i] = mono ? Mono.just(arg) : arg; + } else if (Map.class.isAssignableFrom(arg.getClass())) { + try { + Object obj = objectMapper.convertValue(arg, parameterType); + values[i] = mono ? Mono.just(obj) : obj; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + } + + return values; + } + + private JsonSchema operationSchema(Method method, McpTool instance) { + Method annotatedMethod = findAnnotatedMethod(method, instance); + + Map parametersSchemas = new HashMap<>(); + List required = new ArrayList<>(); + Parameter[] annotatedParameters = annotatedMethod.getParameters(); + Parameter[] methodParameters = method.getParameters(); + for (int i = 0; i < methodParameters.length; i++) { + Parameter methodParameter = methodParameters[i]; + Parameter annotatedParameter = annotatedParameters[i]; + + io.swagger.v3.oas.annotations.Parameter parameterAnnotation = + annotatedParameter.getAnnotation(io.swagger.v3.oas.annotations.Parameter.class); + if (!parameterAnnotation.hidden()) { + if (parameterAnnotation.required()) { + required.add(methodParameter.getName()); + } + parametersSchemas.put( + methodParameter.getName(), + getTypeSchema(methodParameter) + ); + } + } + return new JsonSchema( + "object", parametersSchemas, required, + false, + null, null + ); + } + + + private Method findAnnotatedMethod(Method method, McpTool instance) { + Class declaringClass = AopUtils.getTargetClass(instance); + for (Class iface : declaringClass.getInterfaces()) { + try { + Method interfaceMethod = iface.getMethod(method.getName(), method.getParameterTypes()); + if (interfaceMethod.isAnnotationPresent(Operation.class)) { + return interfaceMethod; + } + } catch (NoSuchMethodException ignored) { + // Skip if no method in interface + } + } + throw new RuntimeException(new NoSuchMethodException(method.getName())); + } + + private Object getTypeSchema(Parameter parameter) { + Class type = parameter.getType(); + if (type.isAssignableFrom(Mono.class)) { + ParameterizedType paramType = (ParameterizedType) parameter.getParameterizedType(); + Type[] actualTypeArguments = paramType.getActualTypeArguments(); + Type actualTypeArgument = actualTypeArguments[0]; + if (actualTypeArgument instanceof Class clz) { + return getTypeSchema(clz); + } else if (actualTypeArgument instanceof ParameterizedType prm) { + return getTypeSchema((Class) prm.getRawType()); + } else { + throw new UnsupportedOperationException( + "TypeVariable, WildcardType, and GenericArrayType do not supported now" + ); + } + } else { + return getTypeSchema(type); + } + } + + private Object getTypeSchema(Class type) { + return switch (type) { + case Class clz when clz.isAssignableFrom(String.class) -> Map.of("type", "string"); + case Class clz when clz.isAssignableFrom(Integer.class) -> Map.of("type", "integer"); + case Class clz when clz.isAssignableFrom(Long.class) -> Map.of("type", "integer"); + case Class clz when clz.isAssignableFrom(Double.class) -> Map.of("type", "number"); + case Class clz when clz.isAssignableFrom(Float.class) -> Map.of("type", "number"); + case Class clz when clz.isAssignableFrom(Boolean.class) -> Map.of("type", "boolean"); + default -> schemaGenerator.generateSchema(type); + }; + } + + +} diff --git a/api/src/main/java/io/kafbat/ui/service/mcp/McpTool.java b/api/src/main/java/io/kafbat/ui/service/mcp/McpTool.java new file mode 100644 index 000000000..fc60418b5 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/mcp/McpTool.java @@ -0,0 +1,4 @@ +package io.kafbat.ui.service.mcp; + +public interface McpTool { +} diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java index 896bbf875..88a966013 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java @@ -75,13 +75,13 @@ public void init() { .map(Subject::getProvider) .distinct() .map(provider -> switch (provider) { - case OAUTH_COGNITO -> new CognitoAuthorityExtractor(); - case OAUTH_GOOGLE -> new GoogleAuthorityExtractor(); - case OAUTH_GITHUB -> new GithubAuthorityExtractor(); - case OAUTH -> new OauthAuthorityExtractor(); - default -> null; - }) - .filter(Objects::nonNull) + case OAUTH_COGNITO -> new CognitoAuthorityExtractor(); + case OAUTH_GOOGLE -> new GoogleAuthorityExtractor(); + case OAUTH_GITHUB -> new GithubAuthorityExtractor(); + case OAUTH -> new OauthAuthorityExtractor(); + default -> null; + } + ).filter(Objects::nonNull) .collect(Collectors.toSet())) .flatMap(Set::stream) .collect(Collectors.toSet()); diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacActiveDirectoryAuthoritiesExtractor.java b/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacActiveDirectoryAuthoritiesExtractor.java index 76feff063..f7f5ec1d9 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacActiveDirectoryAuthoritiesExtractor.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacActiveDirectoryAuthoritiesExtractor.java @@ -37,10 +37,10 @@ public Collection getGrantedAuthorities(DirContextOp .stream() .filter(subject -> subject.getProvider().equals(Provider.LDAP_AD)) .anyMatch(subject -> switch (subject.getType()) { - case "user" -> subject.matches(username); - case "group" -> adGroups.stream().anyMatch(subject::matches); - default -> false; - }) + case "user" -> subject.matches(username); + case "group" -> adGroups.stream().anyMatch(subject::matches); + default -> false; + }) ) .map(Role::getName) .peek(role -> log.trace("Mapped role [{}] for user [{}]", role, username)) diff --git a/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java b/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java index 78ec4ba19..fd168bc5a 100644 --- a/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java +++ b/api/src/main/java/io/kafbat/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java @@ -39,10 +39,10 @@ protected Set getAdditionalRoles(DirContextOperations user, St .stream() .filter(subject -> subject.getProvider().equals(Provider.LDAP)) .anyMatch(subject -> switch (subject.getType()) { - case "user" -> subject.matches(username); - case "group" -> ldapGroups.stream().anyMatch(subject::matches); - default -> false; - }) + case "user" -> subject.matches(username); + case "group" -> ldapGroups.stream().anyMatch(subject::matches); + default -> false; + }) ) .map(Role::getName) .peek(role -> log.trace("Mapped role [{}] for user [{}]", role, username)) diff --git a/api/src/main/java/io/kafbat/ui/util/ContentUtils.java b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java index a3eafc2db..e5d6cce00 100644 --- a/api/src/main/java/io/kafbat/ui/util/ContentUtils.java +++ b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java @@ -8,6 +8,7 @@ import java.util.regex.Pattern; /** + * Provides utility methods converting byte data to string representations. * Inspired by: https://github.com/tchiotludo/akhq/blob/dev/src/main/java/org/akhq/utils/ContentUtils.java */ public class ContentUtils { diff --git a/api/src/main/resources/application-localtest.yaml b/api/src/main/resources/application-localtest.yaml new file mode 100644 index 000000000..91b266390 --- /dev/null +++ b/api/src/main/resources/application-localtest.yaml @@ -0,0 +1,15 @@ +logging: + level: + root: INFO + io.kafbat.ui: DEBUG + reactor.netty.http.server.AccessLog: INFO + org.springframework.security: DEBUG + +kafka: + clusters: + - name: local + bootstrapServers: localhost:9092 + schemaRegistry: http://localhost:8085 + +dynamic.config.enabled: true + diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index fdb02c2e7..af9b83418 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -30,42 +30,42 @@ class StringContainsFilter { @Test void returnsTrueWhenStringContainedInKeyOrContentOrHeadersOrInAllThree() { assertTrue( - filter.test(msg().key("contains abCd").content("some str")) + filter.test(msg().key("contains abCd").value("some str")) ); assertTrue( - filter.test(msg().key("some str").content("contains abCd")) + filter.test(msg().key("some str").value("contains abCd")) ); assertTrue( - filter.test(msg().key("contains abCd").content("contains abCd")) + filter.test(msg().key("contains abCd").value("contains abCd")) ); assertTrue( - filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", "value"))) + filter.test(msg().key("dfg").value("does-not-contain").headers(Map.of("abC", "value"))) ); assertTrue( - filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", "some abC"))) + filter.test(msg().key("dfg").value("does-not-contain").headers(Map.of("x1", "some abC"))) ); } @Test void returnsFalseOtherwise() { assertFalse( - filter.test(msg().key("some str").content("some str")) + filter.test(msg().key("some str").value("some str")) ); assertFalse( - filter.test(msg().key(null).content(null)) + filter.test(msg().key(null).value(null)) ); assertFalse( - filter.test(msg().key("aBc").content("AbC")) + filter.test(msg().key("aBc").value("AbC")) ); assertFalse( - filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", "value"))) + filter.test(msg().key("aBc").value("AbC").headers(Map.of("abc", "value"))) ); } @@ -120,8 +120,8 @@ void canCheckTimestampMs() { @Test void canCheckValueAsText() { var f = celScriptFilter("record.valueAsText == 'some text'"); - assertTrue(f.test(msg().content("some text"))); - assertFalse(f.test(msg().content("some other text"))); + assertTrue(f.test(msg().value("some text"))); + assertFalse(f.test(msg().value("some other text"))); } @Test @@ -157,24 +157,24 @@ void keyAndKeyAsTextSetToNullIfRecordsKeyIsNull() { @Test void canCheckValueAsJsonObjectIfItCanBeParsedToJson() { var f = celScriptFilter("has(record.value.name.first) && record.value.name.first == 'user1'"); - assertTrue(f.test(msg().content("{ \"name\" : { \"first\" : \"user1\" } }"))); - assertFalse(f.test(msg().content("{ \"name\" : { \"first\" : \"user2\" } }"))); - assertFalse(f.test(msg().content("{ \"name\" : { \"second\" : \"user2\" } }"))); + assertTrue(f.test(msg().value("{ \"name\" : { \"first\" : \"user1\" } }"))); + assertFalse(f.test(msg().value("{ \"name\" : { \"first\" : \"user2\" } }"))); + assertFalse(f.test(msg().value("{ \"name\" : { \"second\" : \"user2\" } }"))); } @Test void valueSetToContentStringIfCantBeParsedToJson() { var f = celScriptFilter("record.value == \"not json\""); - assertTrue(f.test(msg().content("not json"))); + assertTrue(f.test(msg().value("not json"))); } @Test void valueAndValueAsTextSetToNullIfRecordsContentIsNull() { var f = celScriptFilter("!has(record.value)"); - assertTrue(f.test(msg().content(null))); + assertTrue(f.test(msg().value(null))); f = celScriptFilter("!has(record.valueAsText)"); - assertTrue(f.test(msg().content(null))); + assertTrue(f.test(msg().value(null))); } @Test @@ -188,7 +188,7 @@ void filterSpeedIsAtLeast5kPerSec() { String jsonContent = String.format( "{ \"name\" : { \"randomStr\": \"%s\", \"first\" : \"%s\"} }", randString, name); - toFilter.add(msg().content(jsonContent).key(randString)); + toFilter.add(msg().value(jsonContent).key(randString)); } // first iteration for warmup // noinspection ResultOfMethodCallIgnored @@ -207,10 +207,10 @@ void nullFiltering() { String msg = "{ \"field\": { \"inner\": null } }"; var f = celScriptFilter("record.value.field.inner == null"); - assertTrue(f.test(msg().content(msg))); + assertTrue(f.test(msg().value(msg))); f = celScriptFilter("record.value.field.inner != null"); - assertFalse(f.test(msg().content(msg))); + assertFalse(f.test(msg().value(msg))); } } @@ -220,7 +220,7 @@ void testBase64DecodingWorks() { var uuid = UUID.randomUUID().toString(); var msg = "test." + Base64.getEncoder().encodeToString(uuid.getBytes()); var f = celScriptFilter("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')"); - assertTrue(f.test(msg().content(msg))); + assertTrue(f.test(msg().value(msg))); } private TopicMessageDTO msg() { diff --git a/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java b/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java index 592bee935..c1130c9d2 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java @@ -71,7 +71,7 @@ void allNewMessagesShouldBeEmitted() throws Exception { .untilAsserted(() -> assertThat(fluxOutput) .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) - .extracting(msg -> msg.getMessage().getContent()) + .extracting(msg -> msg.getMessage().getValue()) .hasSameElementsAs(expectedValues) ); } @@ -96,7 +96,7 @@ void allNewMessageThatFitFilterConditionShouldBeEmitted() throws Exception { .untilAsserted(() -> assertThat(fluxOutput) .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) - .extracting(msg -> msg.getMessage().getContent()) + .extracting(msg -> msg.getMessage().getValue()) .hasSameElementsAs(expectedValues) ); } diff --git a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java index 1fecae247..c0d02f39a 100644 --- a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java @@ -104,8 +104,8 @@ void maskingAppliedOnConfiguredClusters() throws Exception { // both messages should be masked StepVerifier.create(msgsFlux) - .expectNextMatches(msg -> msg.getContent().equals("***")) - .expectNextMatches(msg -> msg.getContent().equals("***")) + .expectNextMatches(msg -> msg.getValue().equals("***")) + .expectNextMatches(msg -> msg.getValue().equals("***")) .verifyComplete(); } @@ -136,7 +136,7 @@ void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingM } }) .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) - .map(evt -> evt.getMessage().getContent()); + .map(evt -> evt.getMessage().getValue()); StepVerifier.create(msgsFlux) .expectNextCount(pageSize) @@ -151,7 +151,7 @@ void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingM } }) .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) - .map(evt -> evt.getMessage().getContent()); + .map(evt -> evt.getMessage().getValue()); StepVerifier.create(remainingMsgs) .expectNextCount(msgsToGenerate - pageSize) @@ -231,7 +231,7 @@ void sendMessageWithProtobufAnyType() { .key(null) .partition(0) .keySerde(StringSerde.name()) - .content(jsonContent) + .value(jsonContent) .valueSerde(ProtobufFileSerde.name()); String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID(); diff --git a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java index 65bf1f49d..cc395ed41 100644 --- a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java @@ -345,7 +345,7 @@ private void expectEmitter( Flux.create(emitter) .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) .take(take) - .map(m -> m.getMessage().getContent()) + .map(m -> m.getMessage().getValue()) ); StepVerifier.Step step = stepConsumer.apply(firstStep); diff --git a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java index 9e0164540..efed56ded 100644 --- a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java +++ b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java @@ -143,12 +143,12 @@ void noSchemaStringKeyStringValue() { new CreateTopicMessageDTO() .key("testKey") .keySerde(StringSerde.name()) - .content("testValue") + .value("testValue") .valueSerde(StringSerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); - assertThat(polled.getContent()).isEqualTo("testValue"); + assertThat(polled.getValue()).isEqualTo("testValue"); }); } @@ -159,12 +159,12 @@ void keyIsIntValueIsLong() { new CreateTopicMessageDTO() .key("123") .keySerde(Int32Serde.name()) - .content("21474836470") + .value("21474836470") .valueSerde(Int64Serde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("123"); - assertThat(polled.getContent()).isEqualTo("21474836470"); + assertThat(polled.getValue()).isEqualTo("21474836470"); }); } @@ -175,12 +175,12 @@ void keyIsNull() { new CreateTopicMessageDTO() .key(null) .keySerde(StringSerde.name()) - .content("testValue") + .value("testValue") .valueSerde(StringSerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isNull(); - assertThat(polled.getContent()).isEqualTo("testValue"); + assertThat(polled.getValue()).isEqualTo("testValue"); }); } @@ -191,12 +191,12 @@ void valueIsNull() { new CreateTopicMessageDTO() .key("testKey") .keySerde(StringSerde.name()) - .content(null) + .value(null) .valueSerde(StringSerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); - assertThat(polled.getContent()).isNull(); + assertThat(polled.getValue()).isNull(); }); } @@ -209,12 +209,12 @@ void primitiveAvroSchemas() { new CreateTopicMessageDTO() .key("\"some string\"") .keySerde(SchemaRegistrySerde.name()) - .content("123") + .value("123") .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("\"some string\""); - assertThat(polled.getContent()).isEqualTo("123"); + assertThat(polled.getValue()).isEqualTo("123"); }); } @@ -227,12 +227,12 @@ void recordAvroSchema() { new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(AVRO_SCHEMA_2_JSON_RECORD) + .value(AVRO_SCHEMA_2_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); - assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD); + assertJsonEqual(polled.getValue(), AVRO_SCHEMA_2_JSON_RECORD); }); } @@ -244,12 +244,12 @@ void keyWithNoSchemaValueWithProtoSchema() { new CreateTopicMessageDTO() .key("testKey") .keySerde(StringSerde.name()) - .content(PROTOBUF_SCHEMA_JSON_RECORD) + .value(PROTOBUF_SCHEMA_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); - assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD); + assertJsonEqual(polled.getValue(), PROTOBUF_SCHEMA_JSON_RECORD); }); } @@ -262,13 +262,13 @@ void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() { new CreateTopicMessageDTO() .key(null) .keySerde(SchemaRegistrySerde.name()) - .content(AVRO_SCHEMA_2_JSON_RECORD) + .value(AVRO_SCHEMA_2_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertThat(polled.getKey()).isNull(); - assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD); + assertJsonEqual(polled.getValue(), AVRO_SCHEMA_2_JSON_RECORD); }); } @@ -280,7 +280,7 @@ void valueWithAvroSchemaShouldThrowExceptionIfArgIsNotValidJsonObject() { new CreateTopicMessageDTO() .keySerde(StringSerde.name()) // f2 has type int instead of string - .content("{ \"f1\": 111, \"f2\": 123 }") + .value("{ \"f1\": 111, \"f2\": 123 }") .valueSerde(SchemaRegistrySerde.name()) ) .assertSendThrowsException(); @@ -295,12 +295,12 @@ void keyWithAvroSchemaValueWithProtoSchema() { new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(PROTOBUF_SCHEMA_JSON_RECORD) + .value(PROTOBUF_SCHEMA_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); - assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD); + assertJsonEqual(polled.getValue(), PROTOBUF_SCHEMA_JSON_RECORD); }); } @@ -313,7 +313,7 @@ void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() { .key(null) .keySerde(StringSerde.name()) // f2 field has type object instead of int - .content("{ \"f1\" : \"test str\", \"f2\" : {} }") + .value("{ \"f1\" : \"test str\", \"f2\" : {} }") .valueSerde(SchemaRegistrySerde.name()) ) .assertSendThrowsException(); @@ -328,12 +328,12 @@ void keyWithProtoSchemaValueWithJsonSchema() { new CreateTopicMessageDTO() .key(PROTOBUF_SCHEMA_JSON_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(JSON_SCHEMA_RECORD) + .value(JSON_SCHEMA_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); - assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD); + assertJsonEqual(polled.getValue(), JSON_SCHEMA_RECORD); }); } @@ -346,7 +346,7 @@ void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() { .key(null) .keySerde(StringSerde.name()) // 'f2' field has type object instead of string - .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }") + .value("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }") .valueSerde(SchemaRegistrySerde.name()) ) .assertSendThrowsException(); @@ -361,12 +361,12 @@ void topicMessageMetadataAvro() { new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(AVRO_SCHEMA_2_JSON_RECORD) + .value(AVRO_SCHEMA_2_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); - assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD); + assertJsonEqual(polled.getValue(), AVRO_SCHEMA_2_JSON_RECORD); assertThat(polled.getKeySize()).isEqualTo(15L); assertThat(polled.getValueSize()).isEqualTo(15L); assertThat(polled.getKeyDeserializeProperties().get("schemaId")).isNotNull(); @@ -386,12 +386,12 @@ void topicMessageMetadataProtobuf() { new CreateTopicMessageDTO() .key(PROTOBUF_SCHEMA_JSON_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(PROTOBUF_SCHEMA_JSON_RECORD) + .value(PROTOBUF_SCHEMA_JSON_RECORD) .valueSerde(SchemaRegistrySerde.name()) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); - assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD); + assertJsonEqual(polled.getValue(), PROTOBUF_SCHEMA_JSON_RECORD); assertThat(polled.getKeySize()).isEqualTo(18L); assertThat(polled.getValueSize()).isEqualTo(18L); assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull(); @@ -410,13 +410,13 @@ void topicMessageMetadataJson() { new CreateTopicMessageDTO() .key(JSON_SCHEMA_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(JSON_SCHEMA_RECORD) + .value(JSON_SCHEMA_RECORD) .valueSerde(SchemaRegistrySerde.name()) .headers(Map.of("header1", "value1")) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD); - assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD); + assertJsonEqual(polled.getValue(), JSON_SCHEMA_RECORD); assertThat(polled.getKeySize()).isEqualTo(57L); assertThat(polled.getValueSize()).isEqualTo(57L); assertThat(polled.getHeadersSize()).isEqualTo(13L); @@ -436,7 +436,7 @@ void headerValueNullPresentTest() { new CreateTopicMessageDTO() .key(JSON_SCHEMA_RECORD) .keySerde(SchemaRegistrySerde.name()) - .content(JSON_SCHEMA_RECORD) + .value(JSON_SCHEMA_RECORD) .valueSerde(SchemaRegistrySerde.name()) .headers(Collections.singletonMap("header123", null)) ) @@ -451,12 +451,12 @@ void noKeyAndNoContentPresentTest() { new CreateTopicMessageDTO() .key(null) .keySerde(StringSerde.name()) // any serde - .content(null) + .value(null) .valueSerde(StringSerde.name()) // any serde ) .doAssert(polled -> { assertThat(polled.getKey()).isNull(); - assertThat(polled.getContent()).isNull(); + assertThat(polled.getValue()).isNull(); }); } diff --git a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java new file mode 100644 index 000000000..bbcde09d8 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java @@ -0,0 +1,97 @@ +package io.kafbat.ui.service.mcp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.victools.jsonschema.generator.OptionPreset; +import com.github.victools.jsonschema.generator.SchemaGenerator; +import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder; +import com.github.victools.jsonschema.generator.SchemaVersion; +import io.kafbat.ui.controller.TopicsController; +import io.kafbat.ui.mapper.ClusterMapper; +import io.kafbat.ui.model.SortOrderDTO; +import io.kafbat.ui.model.TopicColumnsToSortDTO; +import io.kafbat.ui.model.TopicUpdateDTO; +import io.kafbat.ui.service.TopicsService; +import io.kafbat.ui.service.analyze.TopicAnalysisService; +import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification; +import io.modelcontextprotocol.spec.McpSchema; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class McpSpecificationGeneratorTest { + private static final SchemaGenerator SCHEMA_GENERATOR = schemaGenerator(); + private static final McpSpecificationGenerator MCP_SPECIFICATION_GENERATOR = + new McpSpecificationGenerator(SCHEMA_GENERATOR); + + private static SchemaGenerator schemaGenerator() { + SchemaGeneratorConfigBuilder configBuilder = + new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON); + return new SchemaGenerator(configBuilder.build()); + } + + @Test + void testConvertController() { + TopicsController topicsController = new TopicsController( + mock(TopicsService.class), mock(TopicAnalysisService.class), mock(ClusterMapper.class) + ); + List specifications = + MCP_SPECIFICATION_GENERATOR.convertTool(topicsController); + + assertThat(specifications).hasSize(14); + List tools = List.of( + new McpSchema.Tool( + "recreateTopic", + "recreateTopic", + new McpSchema.JsonSchema("object", Map.of( + "clusterName", Map.of("type", "string"), + "topicName", Map.of("type", "string") + ), List.of("clusterName", "topicName"), false, null, null) + ), + new McpSchema.Tool( + "getTopicConfigs", + "getTopicConfigs", + new McpSchema.JsonSchema("object", Map.of( + "clusterName", Map.of("type", "string"), + "topicName", Map.of("type", "string") + ), List.of("clusterName", "topicName"), false, null, null) + ), + new McpSchema.Tool( + "cloneTopic", + "cloneTopic", + new McpSchema.JsonSchema("object", Map.of( + "clusterName", Map.of("type", "string"), + "topicName", Map.of("type", "string"), + "newTopicName", Map.of("type", "string") + ), List.of("clusterName", "topicName", "newTopicName"), false, null, null) + ), + new McpSchema.Tool( + "getTopics", + "getTopics", + new McpSchema.JsonSchema("object", Map.of( + "clusterName", Map.of("type", "string"), + "page", Map.of("type", "integer"), + "perPage", Map.of("type", "integer"), + "showInternal", Map.of("type", "boolean"), + "search", Map.of("type", "string"), + "orderBy", SCHEMA_GENERATOR.generateSchema(TopicColumnsToSortDTO.class), + "sortOrder", SCHEMA_GENERATOR.generateSchema(SortOrderDTO.class) + ), List.of("clusterName"), false, null, null) + ), + new McpSchema.Tool( + "updateTopic", + "updateTopic", + new McpSchema.JsonSchema("object", Map.of( + "clusterName", Map.of("type", "string"), + "topicName", Map.of("type", "string"), + "topicUpdate", SCHEMA_GENERATOR.generateSchema(TopicUpdateDTO.class) + ), List.of("clusterName", "topicName"), false, null, null) + ) + ); + assertThat(tools).allMatch(tool -> + specifications.stream().anyMatch(s -> s.tool().equals(tool)) + ); + } +} diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 454d78c7c..3c9aa703c 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -3034,7 +3034,7 @@ components: type: object additionalProperties: type: string - content: + value: type: string nullable: true keySerde: @@ -3124,7 +3124,7 @@ components: type: object additionalProperties: type: string - content: + value: type: string keyFormat: #deprecated - wont be filled - use 'keySerde' field instead diff --git a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java index 4a4e8e1a2..2f480b8ca 100644 --- a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java +++ b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java @@ -187,7 +187,7 @@ private void sendMessage(String clusterName, Topic topic) { createMessage.setKeySerde("String"); createMessage.setValueSerde("String"); createMessage.setKey(topic.getMessageKey()); - createMessage.setContent(topic.getMessageValue()); + createMessage.setValue(topic.getMessageValue()); try { messageApi().sendTopicMessages(clusterName, topic.getName(), createMessage).block(); } catch (WebClientResponseException ex) { diff --git a/frontend/src/components/Topics/Topic/Messages/Message.tsx b/frontend/src/components/Topics/Topic/Messages/Message.tsx index af76db673..e5624624c 100644 --- a/frontend/src/components/Topics/Topic/Messages/Message.tsx +++ b/frontend/src/components/Topics/Topic/Messages/Message.tsx @@ -32,7 +32,7 @@ const Message: React.FC = ({ key, keySize, partition, - content, + value, valueSize, headers, valueSerde, @@ -43,7 +43,7 @@ const Message: React.FC = ({ }) => { const [isOpen, setIsOpen] = React.useState(false); const savedMessageJson = { - Value: content, + Value: value, Offset: offset, Key: key, Partition: partition, @@ -120,10 +120,10 @@ const Message: React.FC = ({ )} - + - + {valueSerde === 'Fallback' && ( } @@ -149,7 +149,7 @@ const Message: React.FC = ({ {isOpen && ( { offset: 0, key: 'test-key', partition: 6, - content: '{"data": "test"}', + value: '{"data": "test"}', headers: { header: 'test' }, }; const mockKeyFilters: PreviewFilter = { @@ -62,7 +62,7 @@ describe('Message component', () => { it('shows the data in the table row', () => { renderComponent(); - expect(screen.getByText(mockMessage.content as string)).toBeInTheDocument(); + expect(screen.getByText(mockMessage.value as string)).toBeInTheDocument(); expect(screen.getByText(mockMessage.key as string)).toBeInTheDocument(); expect( screen.getByText(formatTimestamp(mockMessage.timestamp)) @@ -75,10 +75,10 @@ describe('Message component', () => { it('check the useDataSaver functionality', () => { const props = { message: { ...mockMessage } }; - delete props.message.content; + delete props.message.value; renderComponent(props); expect( - screen.queryByText(mockMessage.content as string) + screen.queryByText(mockMessage.value as string) ).not.toBeInTheDocument(); }); @@ -115,7 +115,7 @@ describe('Message component', () => { it('should check if Preview filter showing for Value', () => { const props = { - message: { ...mockMessage, content: contentTest as string }, + message: { ...mockMessage, value: contentTest as string }, contentFilters: [mockContentFilters], }; renderComponent(props); diff --git a/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx b/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx index 77f31cebc..7c91be5bd 100644 --- a/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx +++ b/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx @@ -17,7 +17,7 @@ export const topicMessagePayload: TopicMessage = { timestampType: TopicMessageTimestampTypeEnum.CREATE_TIME, key: 'schema-registry', headers: {}, - content: + value: '{"host":"schemaregistry1","port":8085,"master_eligibility":true,"scheme":"http","version":1}', }; @@ -102,9 +102,9 @@ describe('MessagesTable', () => { it('should check the rendering of the messages', () => { expect(screen.queryByText(/No messages found/i)).not.toBeInTheDocument(); - if (mockTopicsMessages[0].content) { + if (mockTopicsMessages[0].value) { expect( - screen.getByText(mockTopicsMessages[0].content) + screen.getByText(mockTopicsMessages[0].value) ).toBeInTheDocument(); } }); diff --git a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx index 4bdb981f8..b820ebed3 100644 --- a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx +++ b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx @@ -111,7 +111,7 @@ const SendMessage: React.FC<{ closeSidebar: () => void }> = ({ try { await sendMessage.mutateAsync({ key: key || null, - content: content || null, + value: content || null, headers: parsedHeaders, partition: partition || 0, keySerde, diff --git a/frontend/src/lib/hooks/api/__tests__/topics.spec.ts b/frontend/src/lib/hooks/api/__tests__/topics.spec.ts index 9085014fd..62c2fb0d7 100644 --- a/frontend/src/lib/hooks/api/__tests__/topics.spec.ts +++ b/frontend/src/lib/hooks/api/__tests__/topics.spec.ts @@ -186,7 +186,7 @@ describe('Topics hooks', () => { }); const message: CreateTopicMessage = { partition: 0, - content: 'Hello World', + value: 'Hello World', }; await act(() => { result.current.mutateAsync(message); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ccda81869..cd33031f8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -30,7 +30,7 @@ jakarta-annotation-api = '2.1.1' jackson-databind-nullable = '0.2.6' antlr = '4.12.0' json-schema-validator = '2.2.14' -checkstyle = '10.3.1' +checkstyle = '10.24.0' allure = '2.29.1' selenide = '7.2.3' @@ -137,3 +137,6 @@ bouncycastle-bcpkix = { module = 'org.bouncycastle:bcpkix-jdk18on', version = '1 # Google Managed Service for Apache Kafka support google-managed-kafka-login-handler = {module = 'com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler', version = '1.0.5'} google-oauth-client = { module = 'com.google.oauth-client:google-oauth-client', version = '1.39.0' } + +modelcontextprotocol-spring-webflux = {module = 'io.modelcontextprotocol.sdk:mcp-spring-webflux', version = '0.10.0'} +victools-jsonschema-generator = {module = 'com.github.victools:jsonschema-generator', version = '4.38.0'}