diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index a5036011a..51af387c6 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -11,6 +11,7 @@ import io.kafbat.ui.api.TopicsApi; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.mapper.ClusterMapper; +import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.InternalTopicConfig; import io.kafbat.ui.model.PartitionsIncreaseDTO; @@ -28,6 +29,8 @@ import io.kafbat.ui.model.TopicUpdateDTO; import io.kafbat.ui.model.TopicsResponseDTO; import io.kafbat.ui.model.rbac.AccessContext; +import io.kafbat.ui.model.rbac.permission.ConnectAction; +import io.kafbat.ui.service.KafkaConnectService; import io.kafbat.ui.service.TopicsService; import io.kafbat.ui.service.analyze.TopicAnalysisService; import io.kafbat.ui.service.mcp.McpTool; @@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M private final TopicAnalysisService topicAnalysisService; private final ClusterMapper clusterMapper; private final ClustersProperties clustersProperties; + private final KafkaConnectService kafkaConnectService; @Override public Mono> createTopic( @@ -370,4 +374,23 @@ private Comparator getComparatorForTopic( default -> defaultComparator; }; } + + @Override + public Mono>> getTopicConnectors(String clusterName, + String topicName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .topicActions(topicName, VIEW) + .operationName("getTopicConnectors") + .operationParams(topicName) + .build(); + + Flux job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName) + .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName)); + + return validateAccess(context) + .then(Mono.just(ResponseEntity.ok(job))) + .doOnEach(sig -> audit(context, sig)); + } } diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 4057087a1..9aac5b74e 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -18,6 +18,7 @@ import io.kafbat.ui.model.TaskIdDTO; import io.kafbat.ui.model.TaskStatusDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; +import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; import java.util.List; import java.util.Map; import java.util.Objects; @@ -31,6 +32,12 @@ public interface KafkaConnectMapper { NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector); + default ClusterInfo toClient(KafkaConnectState state) { + ClusterInfo clusterInfo = new ClusterInfo(); + clusterInfo.setVersion(state.getVersion()); + return clusterInfo; + } + @Mapping(target = "status", ignore = true) @Mapping(target = "connect", ignore = true) ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector); @@ -153,4 +160,23 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo .tasksCount(tasks.size()) .failedTasksCount(failedTasksCount); } + + default KafkaConnectState toScrapeState(ConnectDTO connect, List connectors) { + return KafkaConnectState.builder() + .name(connect.getName()) + .version(connect.getVersion().orElse("Unknown")) + .connectors(connectors.stream().map(this::toScrapeState).toList()) + .build(); + } + + default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo connector) { + return new KafkaConnectState.ConnectorState( + connector.getConnector().getName(), + connector.getConnector().getType(), + connector.getConnector().getStatus(), + connector.getTopics() + ); + } + + } diff --git a/api/src/main/java/io/kafbat/ui/model/Statistics.java b/api/src/main/java/io/kafbat/ui/model/Statistics.java index d5b6ebd1b..23e28f515 100644 --- a/api/src/main/java/io/kafbat/ui/model/Statistics.java +++ b/api/src/main/java/io/kafbat/ui/model/Statistics.java @@ -1,8 +1,10 @@ package io.kafbat.ui.model; import io.kafbat.ui.service.ReactiveAdminClient; +import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import java.util.List; +import java.util.Map; import java.util.function.UnaryOperator; import java.util.stream.Stream; import lombok.Builder; @@ -19,6 +21,7 @@ public class Statistics implements AutoCloseable { ReactiveAdminClient.ClusterDescription clusterDescription; Metrics metrics; ScrapedClusterState clusterState; + Map connectStates; public static Statistics empty() { return builder() @@ -28,6 +31,7 @@ public static Statistics empty() { .clusterDescription(ReactiveAdminClient.ClusterDescription.empty()) .metrics(Metrics.empty()) .clusterState(ScrapedClusterState.empty()) + .connectStates(Map.of()) .build(); } diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index adb656b5b..e58da714c 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -1,7 +1,5 @@ package io.kafbat.ui.service; -import com.github.benmanes.caffeine.cache.AsyncCache; -import com.github.benmanes.caffeine.cache.Caffeine; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.ClusterInfo; @@ -25,12 +23,15 @@ import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; +import io.kafbat.ui.model.Statistics; import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TaskIdDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.service.index.KafkaConnectNgramFilter; +import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; import io.kafbat.ui.util.ReactiveFailover; import jakarta.validation.Valid; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -49,17 +50,16 @@ public class KafkaConnectService { private final KafkaConnectMapper kafkaConnectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; private final ClustersProperties clustersProperties; - private final AsyncCache cacheClusterInfo; + private final StatisticsCache statisticsCache; public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, KafkaConfigSanitizer kafkaConfigSanitizer, - ClustersProperties clustersProperties) { + ClustersProperties clustersProperties, + StatisticsCache statisticsCache) { this.kafkaConnectMapper = kafkaConnectMapper; this.kafkaConfigSanitizer = kafkaConfigSanitizer; this.clustersProperties = clustersProperties; - this.cacheClusterInfo = Caffeine.newBuilder() - .expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry()) - .buildAsync(); + this.statisticsCache = statisticsCache; } public Flux getConnects(KafkaCluster cluster, boolean withStats) { @@ -89,14 +89,17 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { } } - private Mono getClusterInfo(KafkaCluster cluster, String connectName) { - return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) -> - api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo) - .onErrorResume(th -> { - log.error("Error on collecting cluster info", th); - return Mono.just(new ClusterInfo()); - }).toFuture() - )); + public Mono getClusterInfo(KafkaCluster cluster, String connectName) { + KafkaConnectState state = statisticsCache.get(cluster).getConnectStates().get(connectName); + if (state != null) { + return Mono.just(kafkaConnectMapper.toClient(state)); + } else { + return api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo) + .onErrorResume(th -> { + log.error("Error on collecting cluster info", th); + return Mono.just(new ClusterInfo()); + }); + } } private Flux getConnectConnectors( @@ -134,6 +137,33 @@ public Flux getAllConnectors(final KafkaCluster cluster, .flatMapMany(Flux::fromIterable); } + public Flux scrapeAllConnects(KafkaCluster cluster) { + + Optional> connectClusters = + Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()); + + return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c -> + getClusterInfo(cluster, c.getName()).map(info -> + kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false) + ).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName()))) + ).flatMap(connect -> + getConnectorsWithErrorsSuppress(cluster, connect.getName()) + .onErrorResume(t -> Mono.just(Map.of())) + .flatMapMany(connectors -> + Flux.fromIterable(connectors.entrySet()) + .flatMap(e -> + getConnectorTopics( + cluster, + connect.getName(), + e.getKey() + ).map(topics -> + kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics()) + ) + ) + ).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors)) + ); + } + private List filterConnectors( List connectors, String search, @@ -349,4 +379,30 @@ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName .formatted(connectorName, connectName)); }); } + + public Flux getTopicConnectors(KafkaCluster cluster, String topicName) { + Map connectStates = this.statisticsCache.get(cluster).getConnectStates(); + Map> filteredConnects = new HashMap<>(); + for (Map.Entry entry : connectStates.entrySet()) { + List connectors = + entry.getValue().getConnectors().stream().filter(c -> c.topics().contains(topicName)).toList(); + if (!connectors.isEmpty()) { + filteredConnects.put(entry.getKey(), connectors.stream().map(KafkaConnectState.ConnectorState::name).toList()); + } + } + + return Flux.fromIterable(filteredConnects.entrySet()) + .flatMap(entry -> + getConnectorsWithErrorsSuppress(cluster, entry.getKey()) + .map(connectors -> + connectors.entrySet() + .stream() + .filter(c -> entry.getValue().contains(c.getKey())) + .map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null)) + .map(kafkaConnectMapper::fullConnectorInfo) + .toList() + ) + ).flatMap(Flux::fromIterable); + + } } diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java index 9185838dd..985c95784 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java @@ -8,8 +8,10 @@ import io.kafbat.ui.model.Metrics; import io.kafbat.ui.model.ServerStatusDTO; import io.kafbat.ui.model.Statistics; +import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import java.util.List; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -21,6 +23,7 @@ public class StatisticsService { private final AdminClientService adminClientService; + private final KafkaConnectService kafkaConnectService; private final FeatureService featureService; private final StatisticsCache cache; private final ClustersProperties clustersProperties; @@ -38,11 +41,14 @@ private Mono getStatistics(KafkaCluster cluster) { .then( Mono.zip( featureService.getAvailableFeatures(ac, cluster, description), - loadClusterState(description, ac) + loadClusterState(description, ac), + loadKafkaConnects(cluster) ).flatMap(t -> scrapeMetrics(cluster, t.getT2(), description) - .map(metrics -> createStats(description, t.getT1(), t.getT2(), metrics, ac))))) - .doOnError(e -> + .map(metrics -> createStats(description, t.getT1(), t.getT2(), t.getT3(), metrics, ac)) + ) + ) + ).doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e)) .onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t)))); } @@ -50,7 +56,7 @@ private Mono getStatistics(KafkaCluster cluster) { private Statistics createStats(ClusterDescription description, List features, ScrapedClusterState scrapedClusterState, - Metrics metrics, + List connects, Metrics metrics, ReactiveAdminClient ac) { return Statistics.builder() .status(ServerStatusDTO.ONLINE) @@ -59,6 +65,11 @@ private Statistics createStats(ClusterDescription description, .metrics(metrics) .features(features) .clusterState(scrapedClusterState) + .connectStates( + connects.stream().collect( + Collectors.toMap(KafkaConnectState::getName, c -> c) + ) + ) .build(); } @@ -74,4 +85,8 @@ private Mono scrapeMetrics(KafkaCluster cluster, .scrape(clusterState, clusterDescription.getNodes()); } + private Mono> loadKafkaConnects(KafkaCluster cluster) { + return kafkaConnectService.scrapeAllConnects(cluster).collectList(); + } + } diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/scrape/KafkaConnectState.java b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/KafkaConnectState.java new file mode 100644 index 000000000..00a0fdd54 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/metrics/scrape/KafkaConnectState.java @@ -0,0 +1,24 @@ +package io.kafbat.ui.service.metrics.scrape; + +import io.kafbat.ui.model.ConnectorStatusDTO; +import io.kafbat.ui.model.ConnectorTypeDTO; +import java.time.Instant; +import java.util.List; +import lombok.Builder; +import lombok.RequiredArgsConstructor; +import lombok.Value; + +@Builder(toBuilder = true) +@RequiredArgsConstructor +@Value +public class KafkaConnectState { + Instant scrapeFinishedAt; + String name; + String version; + List connectors; + + public record ConnectorState(String name, + ConnectorTypeDTO connectorType, + ConnectorStatusDTO status, + List topics) {} +} diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 82d7a3041..02d579548 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -13,8 +13,15 @@ import io.kafbat.ui.model.ConnectorStateDTO; import io.kafbat.ui.model.ConnectorStatusDTO; import io.kafbat.ui.model.ConnectorTypeDTO; +import io.kafbat.ui.model.FullConnectorInfoDTO; +import io.kafbat.ui.model.InternalTopic; +import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.TaskIdDTO; +import io.kafbat.ui.model.TopicCreationDTO; +import io.kafbat.ui.service.ClustersStorage; +import io.kafbat.ui.service.StatisticsService; +import io.kafbat.ui.service.TopicsService; import java.util.List; import java.util.Map; import java.util.UUID; @@ -25,26 +32,45 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Mono; @Slf4j class KafkaConnectServiceTests extends AbstractIntegrationTest { private final String connectName = "kafka-connect"; private final String connectorName = UUID.randomUUID().toString(); + private final String topicName = "test-topic"; private final Map config = Map.of( "name", connectorName, "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "output-topic", + "topics", topicName, "file", "/tmp/test", "test.password", "******" ); @Autowired private WebTestClient webTestClient; - + @Autowired + private StatisticsService statisticsService; + @Autowired + private ClustersStorage clustersStorage; + @Autowired + private TopicsService topicsService; @BeforeEach void setUp() { + KafkaCluster kafkaCluster = clustersStorage.getClusterByName(LOCAL).get(); + + InternalTopic block = topicsService.getTopicDetails(kafkaCluster, topicName) + .onErrorResume(t -> Mono.empty()).block(); + if (block == null) { + topicsService.createTopic(kafkaCluster, + new TopicCreationDTO() + .name(topicName) + .partitions(1) + .configs(Map.of()) + ).block(); + } webTestClient.post() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) @@ -53,12 +79,13 @@ void setUp() { .config(Map.of( "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "output-topic", + "topics", topicName, "file", "/tmp/test", "test.password", "test-credentials"))) .exchange() .expectStatus().isOk(); - + // Force cache refresh + statisticsService.updateCache(kafkaCluster).block(); } @AfterEach @@ -195,7 +222,7 @@ void shouldUpdateConfig() { .bodyValue(Map.of( "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "another-topic", + "topics", topicName, "file", "/tmp/new" ) ) @@ -212,7 +239,7 @@ void shouldUpdateConfig() { .isEqualTo(Map.of( "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "another-topic", + "topics", topicName, "file", "/tmp/new", "name", connectorName )); @@ -302,7 +329,7 @@ void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { .isEqualTo(Map.of( "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "output-topic", + "topics", topicName, "file", "/tmp/test", "name", connectorName, "test.password", "******" @@ -331,7 +358,7 @@ void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() { .isEqualTo(Map.of( "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max", "1", - "topics", "output-topic", + "topics", topicName, "file", "/tmp/test", "test.password", "******", "name", connectorName @@ -470,4 +497,13 @@ void shouldReturn400WhenResettingConnectorInRunningState() { .expectStatus().isBadRequest(); } + + @Test + void shouldReturnConnectorsByTopic() { + var path = "/api/clusters/{clusterName}/topics/{topicName}/connectors"; + webTestClient.get() + .uri(path, LOCAL, topicName) + .exchange() + .expectStatus().isOk(); + } } diff --git a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java index ec0a77264..0ffa5633f 100644 --- a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java @@ -60,12 +60,19 @@ class TopicsServicePaginationTest { ); private final TopicsService mockTopicsService = Mockito.mock(TopicsService.class); + private final KafkaConnectService kafkaConnectService = Mockito.mock(KafkaConnectService.class); private final ClusterMapper clusterMapper = new ClusterMapperImpl(); private final AccessControlService accessControlService = new AccessControlServiceMock().getMock(); private final TopicsController topicsController = - new TopicsController(mockTopicsService, mock(TopicAnalysisService.class), clusterMapper, clustersProperties); + new TopicsController( + mockTopicsService, + mock(TopicAnalysisService.class), + clusterMapper, + clustersProperties, + kafkaConnectService + ); private void init(Map topicsInCache) { KafkaCluster kafkaCluster = buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME); diff --git a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java index db10fb1b0..3c8334da5 100644 --- a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java +++ b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java @@ -14,6 +14,7 @@ import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.model.TopicColumnsToSortDTO; import io.kafbat.ui.model.TopicUpdateDTO; +import io.kafbat.ui.service.KafkaConnectService; import io.kafbat.ui.service.TopicsService; import io.kafbat.ui.service.analyze.TopicAnalysisService; import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification; @@ -37,12 +38,12 @@ private static SchemaGenerator schemaGenerator() { void testConvertController() { TopicsController topicsController = new TopicsController( mock(TopicsService.class), mock(TopicAnalysisService.class), mock(ClusterMapper.class), - mock(ClustersProperties.class) + mock(ClustersProperties.class), mock(KafkaConnectService.class) ); List specifications = MCP_SPECIFICATION_GENERATOR.convertTool(topicsController); - assertThat(specifications).hasSize(14); + assertThat(specifications).hasSize(15); List tools = List.of( new McpSchema.Tool( "recreateTopic", diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index 7ae8a9682..27b13c57a 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -144,6 +144,15 @@ interface TopicsApi { @path topicName: string, @body partitionsIncrease: PartitionsIncrease, ): PartitionsIncreaseResponse | ApiNotFoundResponse; + + @get + @route("/{topicName}/connectors") + @operationId("getTopicConnectors") + @summary("getTopicConnectors") + connectors( + @path clusterName: string, + @path topicName: string + ) : FullConnectorInfo[] } model TopicsResponse { diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index a50a755a2..de8f08b8a 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -708,6 +708,33 @@ paths: schema: $ref: '#/components/schemas/TopicSerdeSuggestion' + /api/clusters/{clusterName}/topics/{topicName}/connectors: + get: + operationId: getTopicConnectors + summary: getTopicConnectors + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + '200': + description: The request has succeeded. + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/FullConnectorInfo' + tags: + - Topics + /api/smartfilters/testexecutions: put: tags: