Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation libs.spring.starter.oauth2.client
implementation libs.spring.security.oauth2.resource.server
implementation libs.spring.boot.actuator

compileOnly libs.spring.boot.devtools

implementation libs.spring.security.ldap
Expand All @@ -48,6 +49,7 @@ dependencies {

implementation libs.jackson.databind.nullable
implementation libs.cel
implementation libs.caffeine
antlr libs.antlr
implementation libs.antlr.runtime

Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -36,6 +37,8 @@ public class ClustersProperties {

PollingProperties polling = new PollingProperties();

CacheProperties cache = new CacheProperties();

@Data
public static class Cluster {
@NotBlank(message = "field name for for cluster could not be blank")
Expand Down Expand Up @@ -183,6 +186,14 @@ public enum LogLevel {
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class CacheProperties {
boolean enabled = true;
Duration connectCacheExpiry = Duration.ofMinutes(1);
}

@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC

@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
Boolean withStats,
ServerWebExchange exchange) {

Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(
getCluster(clusterName), withStats != null ? withStats : false
).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));

return Mono.just(ResponseEntity.ok(availableConnects));
}
Expand Down
4 changes: 0 additions & 4 deletions api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.BrokerConfigDTO;
import io.kafbat.ui.model.BrokerDTO;
import io.kafbat.ui.model.BrokerDiskUsageDTO;
Expand All @@ -11,7 +10,6 @@
import io.kafbat.ui.model.ClusterStatsDTO;
import io.kafbat.ui.model.ConfigSourceDTO;
import io.kafbat.ui.model.ConfigSynonymDTO;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.InternalBroker;
import io.kafbat.ui.model.InternalBrokerConfig;
import io.kafbat.ui.model.InternalBrokerDiskUsage;
Expand Down Expand Up @@ -107,8 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {

ReplicaDTO toReplica(InternalReplica replica);

ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);

List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);

default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
Expand Down
52 changes: 50 additions & 2 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTask;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
import io.kafbat.ui.model.ConnectorPluginDTO;
import io.kafbat.ui.model.ConnectorStateDTO;
import io.kafbat.ui.model.ConnectorStatusDTO;
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskStatusDTO;
import io.kafbat.ui.model.connect.InternalConnectInfo;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import java.util.List;
import java.util.Optional;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

Expand All @@ -38,7 +42,51 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
default ConnectDTO toKafkaConnect(
ClustersProperties.ConnectCluster connect,
List<InternalConnectorInfo> connectors,
boolean withStats) {
Integer connectorCount = null;
Integer failedConnectors = null;
Integer tasksCount = null;
Integer failedTasksCount = null;

if (withStats) {
connectorCount = connectors.size();
failedConnectors = 0;
tasksCount = 0;
failedTasksCount = 0;

for (InternalConnectorInfo connector : connectors) {
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());

failedConnectors += internalConnector
.map(ConnectorDTO::getStatus)
.map(ConnectorStatusDTO::getState)
.filter(ConnectorStateDTO.FAILED::equals)
.map(s -> 1).orElse(0);

tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);

for (TaskDTO task : connector.getTasks()) {
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
failedTasksCount += tasksCount;
}
}
}

}

return new ConnectDTO()
.address(connect.getAddress())
.name(connect.getName())
.connectorsCount(connectorCount)
.failedConnectorsCount(failedConnectors)
.tasksCount(tasksCount)
.failedTasksCount(failedTasksCount);
}

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
ConnectorDTO connector = connectInfo.getConnector();
List<TaskDTO> tasks = connectInfo.getTasks();
int failedTasksCount = (int) tasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@Data
@Builder(toBuilder = true)
public class InternalConnectInfo {
public class InternalConnectorInfo {
private final ConnectorDTO connector;
private final Map<String, Object> config;
private final List<TaskDTO> tasks;
Expand Down
70 changes: 60 additions & 10 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.kafbat.ui.service;


import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
Expand All @@ -21,15 +25,15 @@
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.connect.InternalConnectInfo;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.util.ReactiveFailover;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
Expand All @@ -39,23 +43,67 @@

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConnectService {
private final ClusterMapper clusterMapper;
private final KafkaConnectMapper kafkaConnectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;

private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;

public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
KafkaConfigSanitizer kafkaConfigSanitizer,
ClustersProperties clustersProperties) {
this.clusterMapper = clusterMapper;
this.kafkaConnectMapper = kafkaConnectMapper;
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.cachedConnectors = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
.buildAsync();
}

public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
return Flux.fromIterable(
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
.orElse(List.of())
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
if (withStats) {
return connectClusters.map(connects -> Flux.fromIterable(connects).flatMap(connect -> (
clustersProperties.getCache().isEnabled() ? Mono.fromFuture(
cachedConnectors.get(new ConnectCacheKey(cluster, connect), (t, e) ->
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture())
) : getConnectConnectors(cluster, connect).collectList()).map(connectors ->
kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
)
)
).orElse(Flux.fromIterable(List.of()));
} else {
return Flux.fromIterable(connectClusters.map(connects ->
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
).orElse(List.of()));
}
}

private Flux<InternalConnectorInfo> getConnectConnectors(
KafkaCluster cluster,
ClustersProperties.ConnectCluster connect) {
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
).map(tuple ->
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(null)
.tasks(tuple.getT2())
.topics(null)
.build()
)
);
}

public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search) {
return getConnects(cluster)
return getConnects(cluster, false)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName ->
Expand All @@ -65,7 +113,7 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
getConnectorTopics(cluster, connect.getName(), connectorName)
).map(tuple ->
InternalConnectInfo.builder()
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(tuple.getT2())
.tasks(tuple.getT3())
Expand Down Expand Up @@ -289,4 +337,6 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
.formatted(connectorName, connectName));
});
}

record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ConnectorsExporter {
private final KafkaConnectService kafkaConnectService;

Flux<DataEntityList> export(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
return kafkaConnectService.getConnects(cluster, false)
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
.flatMap(connectorDTO ->
Expand All @@ -41,7 +41,7 @@ Flux<DataEntityList> export(KafkaCluster cluster) {
}

Flux<DataSource> getConnectDataSources(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
return kafkaConnectService.getConnects(cluster, false)
.map(ConnectorsExporter::toDataSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) {
}

private String serializeToYaml(PropertiesStructure props) {
//representer, that skips fields with null values
Representer representer = new Representer(new DumperOptions()) {
@Override
protected NodeTuple representJavaBeanProperty(Object javaBean,
Property property,
Object propertyValue,
Tag customTag) {
if (propertyValue == null) {
return null; // if value of property is null, ignore it.
} else {
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
}
}
};
Representer representer = new YamlNullSkipRepresenter(new DumperOptions());
var propertyUtils = new PropertyUtils();
propertyUtils.setBeanAccess(BeanAccess.FIELD);
representer.setPropertyUtils(propertyUtils);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.kafbat.ui.util;

import java.time.Duration;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.introspector.Property;
import org.yaml.snakeyaml.nodes.NodeTuple;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;

// representer, that skips fields with null values
public class YamlNullSkipRepresenter extends Representer {
public YamlNullSkipRepresenter(DumperOptions options) {
super(options);
this.representers.put(Duration.class, data -> this.representScalar(Tag.STR, data.toString()));
}

@Override
protected NodeTuple representJavaBeanProperty(Object javaBean,
Property property,
Object propertyValue,
Tag customTag) {
if (propertyValue == null) {
return null; // if value of property is null, ignore it.
} else {
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void exportsConnectorsAsDataTransformers() {
)
);

when(kafkaConnectService.getConnects(CLUSTER))
when(kafkaConnectService.getConnects(CLUSTER, false))
.thenReturn(Flux.just(connect));

when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
Expand Down
Loading
Loading