Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/src/main/java/io/kafbat/ui/controller/TopicsController.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kafbat.ui.api.TopicsApi;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.mapper.ClusterMapper;
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.InternalTopic;
import io.kafbat.ui.model.InternalTopicConfig;
import io.kafbat.ui.model.PartitionsIncreaseDTO;
Expand All @@ -28,6 +29,8 @@
import io.kafbat.ui.model.TopicUpdateDTO;
import io.kafbat.ui.model.TopicsResponseDTO;
import io.kafbat.ui.model.rbac.AccessContext;
import io.kafbat.ui.model.rbac.permission.ConnectAction;
import io.kafbat.ui.service.KafkaConnectService;
import io.kafbat.ui.service.TopicsService;
import io.kafbat.ui.service.analyze.TopicAnalysisService;
import io.kafbat.ui.service.mcp.McpTool;
Expand Down Expand Up @@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
private final ClustersProperties clustersProperties;
private final KafkaConnectService kafkaConnectService;

@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
Expand Down Expand Up @@ -370,4 +374,20 @@ private Comparator<InternalTopic> getComparatorForTopic(
default -> defaultComparator;
};
}

@Override
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getTopicConnectors(String clusterName,
String topicName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getAllConnectors")
.build();

Flux<FullConnectorInfoDTO> job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));

return Mono.just(ResponseEntity.ok(job))
.doOnEach(sig -> audit(context, sig));
}
}
26 changes: 26 additions & 0 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.TaskStatusDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -31,6 +32,12 @@
public interface KafkaConnectMapper {
NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector);

default ClusterInfo toClient(KafkaConnectState state) {
ClusterInfo clusterInfo = new ClusterInfo();
clusterInfo.setVersion(state.getVersion());
return clusterInfo;
}

@Mapping(target = "status", ignore = true)
@Mapping(target = "connect", ignore = true)
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
Expand Down Expand Up @@ -153,4 +160,23 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo
.tasksCount(tasks.size())
.failedTasksCount(failedTasksCount);
}

default KafkaConnectState toScrapeState(ConnectDTO connect, List<InternalConnectorInfo> connectors) {
return KafkaConnectState.builder()
.name(connect.getName())
.version(connect.getVersion().orElse("Unknown"))
.connectors(connectors.stream().map(this::toScrapeState).toList())
.build();
}

default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo connector) {
return new KafkaConnectState.ConnectorState(
connector.getConnector().getName(),
connector.getConnector().getType(),
connector.getConnector().getStatus(),
connector.getTopics()
);
}


}
4 changes: 4 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/Statistics.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.kafbat.ui.model;

import io.kafbat.ui.service.ReactiveAdminClient;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import lombok.Builder;
Expand All @@ -19,6 +21,7 @@ public class Statistics implements AutoCloseable {
ReactiveAdminClient.ClusterDescription clusterDescription;
Metrics metrics;
ScrapedClusterState clusterState;
Map<String, KafkaConnectState> connectStates;

public static Statistics empty() {
return builder()
Expand All @@ -28,6 +31,7 @@ public static Statistics empty() {
.clusterDescription(ReactiveAdminClient.ClusterDescription.empty())
.metrics(Metrics.empty())
.clusterState(ScrapedClusterState.empty())
.connectStates(Map.of())
.build();
}

Expand Down
86 changes: 71 additions & 15 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kafbat.ui.service;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ClusterInfo;
Expand All @@ -25,12 +23,15 @@
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.Statistics;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.util.ReactiveFailover;
import jakarta.validation.Valid;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,17 +50,16 @@ public class KafkaConnectService {
private final KafkaConnectMapper kafkaConnectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
private final StatisticsCache statisticsCache;

public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
KafkaConfigSanitizer kafkaConfigSanitizer,
ClustersProperties clustersProperties) {
ClustersProperties clustersProperties,
StatisticsCache statisticsCache) {
this.kafkaConnectMapper = kafkaConnectMapper;
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.cacheClusterInfo = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
.buildAsync();
this.statisticsCache = statisticsCache;
}

public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Expand Down Expand Up @@ -89,14 +89,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
}
}

private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
.onErrorResume(th -> {
log.error("Error on collecting cluster info", th);
return Mono.just(new ClusterInfo());
}).toFuture()
));
public Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
KafkaConnectState state = statisticsCache.get(cluster).getConnectStates().get(connectName);
if (state != null) {
return Mono.just(kafkaConnectMapper.toClient(state));
} else {
return api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
.onErrorResume(th -> {
log.error("Error on collecting cluster info", th);
return Mono.just(new ClusterInfo());
});
}
}

private Flux<InternalConnectorInfo> getConnectConnectors(
Expand Down Expand Up @@ -134,6 +137,33 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
.flatMapMany(Flux::fromIterable);
}

public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {

Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());

return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName())))
).flatMap(connect ->
getConnectorsWithErrorsSuppress(cluster, connect.getName())
.onErrorResume(t -> Mono.just(Map.of()))
.flatMapMany(connectors ->
Flux.fromIterable(connectors.entrySet())
.flatMap(e ->
getConnectorTopics(
cluster,
connect.getName(),
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
)
)
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors))
);
}

private List<FullConnectorInfoDTO> filterConnectors(
List<FullConnectorInfoDTO> connectors,
String search,
Expand Down Expand Up @@ -349,4 +379,30 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
.formatted(connectorName, connectName));
});
}

public Flux<FullConnectorInfoDTO> getTopicConnectors(KafkaCluster cluster, String topicName) {
Map<String, KafkaConnectState> connectStates = this.statisticsCache.get(cluster).getConnectStates();
Map<String, List<String>> filteredConnects = new HashMap<>();
for (Map.Entry<String, KafkaConnectState> entry : connectStates.entrySet()) {
List<KafkaConnectState.ConnectorState> connectors =
entry.getValue().getConnectors().stream().filter(c -> c.topics().contains(topicName)).toList();
if (!connectors.isEmpty()) {
filteredConnects.put(entry.getKey(), connectors.stream().map(KafkaConnectState.ConnectorState::name).toList());
}
}

return Flux.fromIterable(filteredConnects.entrySet())
.flatMap(entry ->
getConnectorsWithErrorsSuppress(cluster, entry.getKey())
.map(connectors ->
connectors.entrySet()
.stream()
.filter(c -> entry.getValue().contains(c.getKey()))
.map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null))
.map(kafkaConnectMapper::fullConnectorInfo)
.toList()
)
).flatMap(Flux::fromIterable);

}
}
23 changes: 19 additions & 4 deletions api/src/main/java/io/kafbat/ui/service/StatisticsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import io.kafbat.ui.model.Metrics;
import io.kafbat.ui.model.ServerStatusDTO;
import io.kafbat.ui.model.Statistics;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
Expand All @@ -21,6 +23,7 @@
public class StatisticsService {

private final AdminClientService adminClientService;
private final KafkaConnectService kafkaConnectService;
private final FeatureService featureService;
private final StatisticsCache cache;
private final ClustersProperties clustersProperties;
Expand All @@ -38,19 +41,22 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
.then(
Mono.zip(
featureService.getAvailableFeatures(ac, cluster, description),
loadClusterState(description, ac)
loadClusterState(description, ac),
loadKafkaConnects(cluster)
).flatMap(t ->
scrapeMetrics(cluster, t.getT2(), description)
.map(metrics -> createStats(description, t.getT1(), t.getT2(), metrics, ac)))))
.doOnError(e ->
.map(metrics -> createStats(description, t.getT1(), t.getT2(), t.getT3(), metrics, ac))
)
)
).doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(t -> Mono.just(Statistics.statsUpdateError(t))));
}

private Statistics createStats(ClusterDescription description,
List<ClusterFeature> features,
ScrapedClusterState scrapedClusterState,
Metrics metrics,
List<KafkaConnectState> connects, Metrics metrics,
ReactiveAdminClient ac) {
return Statistics.builder()
.status(ServerStatusDTO.ONLINE)
Expand All @@ -59,6 +65,11 @@ private Statistics createStats(ClusterDescription description,
.metrics(metrics)
.features(features)
.clusterState(scrapedClusterState)
.connectStates(
connects.stream().collect(
Collectors.toMap(KafkaConnectState::getName, c -> c)
)
)
.build();
}

Expand All @@ -74,4 +85,8 @@ private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,
.scrape(clusterState, clusterDescription.getNodes());
}

private Mono<List<KafkaConnectState>> loadKafkaConnects(KafkaCluster cluster) {
return kafkaConnectService.scrapeAllConnects(cluster).collectList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.kafbat.ui.service.metrics.scrape;

import io.kafbat.ui.model.ConnectorStatusDTO;
import io.kafbat.ui.model.ConnectorTypeDTO;
import java.time.Instant;
import java.util.List;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.Value;

@Builder(toBuilder = true)
@RequiredArgsConstructor
@Value
public class KafkaConnectState {
Instant scrapeFinishedAt;
String name;
String version;
List<ConnectorState> connectors;

public record ConnectorState(String name,
ConnectorTypeDTO connectorType,
ConnectorStatusDTO status,
List<String> topics) {}
}
Loading
Loading