Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation libs.spring.starter.oauth2.client
implementation libs.spring.security.oauth2.resource.server
implementation libs.spring.boot.actuator

compileOnly libs.spring.boot.devtools

implementation libs.spring.security.ldap
Expand All @@ -48,6 +49,7 @@ dependencies {

implementation libs.jackson.databind.nullable
implementation libs.cel
implementation libs.caffeine
antlr libs.antlr
implementation libs.antlr.runtime

Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -36,6 +37,8 @@ public class ClustersProperties {

PollingProperties polling = new PollingProperties();

CacheProperties cache = new CacheProperties();

@Data
public static class Cluster {
@NotBlank(message = "field name for for cluster could not be blank")
Expand Down Expand Up @@ -183,6 +186,14 @@ public enum LogLevel {
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class CacheProperties {
boolean enabled = true;
Duration connectCacheExpiry = Duration.ofMinutes(1);
}

@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC

@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
Boolean withStats,
ServerWebExchange exchange) {

Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(
getCluster(clusterName), withStats != null ? withStats : false
).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));

return Mono.just(ResponseEntity.ok(availableConnects));
}
Expand Down
4 changes: 0 additions & 4 deletions api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.BrokerConfigDTO;
import io.kafbat.ui.model.BrokerDTO;
import io.kafbat.ui.model.BrokerDiskUsageDTO;
Expand All @@ -11,7 +10,6 @@
import io.kafbat.ui.model.ClusterStatsDTO;
import io.kafbat.ui.model.ConfigSourceDTO;
import io.kafbat.ui.model.ConfigSynonymDTO;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.InternalBroker;
import io.kafbat.ui.model.InternalBrokerConfig;
import io.kafbat.ui.model.InternalBrokerDiskUsage;
Expand Down Expand Up @@ -107,8 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {

ReplicaDTO toReplica(InternalReplica replica);

ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);

List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);

default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
Expand Down
52 changes: 50 additions & 2 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTask;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
import io.kafbat.ui.model.ConnectorPluginDTO;
import io.kafbat.ui.model.ConnectorStateDTO;
import io.kafbat.ui.model.ConnectorStatusDTO;
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskStatusDTO;
import io.kafbat.ui.model.connect.InternalConnectInfo;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import java.util.List;
import java.util.Optional;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

Expand All @@ -38,7 +42,51 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
default ConnectDTO toKafkaConnect(
ClustersProperties.ConnectCluster connect,
List<InternalConnectorInfo> connectors,
boolean withStats) {
Integer connectorCount = null;
Integer failedConnectors = null;
Integer tasksCount = null;
Integer failedTasksCount = null;

if (withStats) {
connectorCount = connectors.size();
failedConnectors = 0;
tasksCount = 0;
failedTasksCount = 0;

for (InternalConnectorInfo connector : connectors) {
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());

failedConnectors += internalConnector
.map(ConnectorDTO::getStatus)
.map(ConnectorStatusDTO::getState)
.filter(ConnectorStateDTO.FAILED::equals)
.map(s -> 1).orElse(0);

tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);

for (TaskDTO task : connector.getTasks()) {
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
failedTasksCount += tasksCount;
}
}
}

}

return new ConnectDTO()
.address(connect.getAddress())
.name(connect.getName())
.connectorsCount(connectorCount)
.failedConnectorsCount(failedConnectors)
.tasksCount(tasksCount)
.failedTasksCount(failedTasksCount);
}

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
ConnectorDTO connector = connectInfo.getConnector();
List<TaskDTO> tasks = connectInfo.getTasks();
int failedTasksCount = (int) tasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@Data
@Builder(toBuilder = true)
public class InternalConnectInfo {
public class InternalConnectorInfo {
private final ConnectorDTO connector;
private final Map<String, Object> config;
private final List<TaskDTO> tasks;
Expand Down
81 changes: 71 additions & 10 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.kafbat.ui.service;


import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
Expand All @@ -21,15 +25,15 @@
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.connect.InternalConnectInfo;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.util.ReactiveFailover;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
Expand All @@ -39,23 +43,78 @@

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConnectService {
private final ClusterMapper clusterMapper;
private final KafkaConnectMapper kafkaConnectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;

private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;

public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
KafkaConfigSanitizer kafkaConfigSanitizer,
ClustersProperties clustersProperties) {
this.clusterMapper = clusterMapper;
this.kafkaConnectMapper = kafkaConnectMapper;
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.cachedConnectors = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
.buildAsync();
}

public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
return Flux.fromIterable(
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
.orElse(List.of())
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
if (withStats) {
return connectClusters.map(connects ->
Flux.fromIterable(connects).flatMap(connect -> (
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
)
)
)
).orElse(Flux.fromIterable(List.of()));
} else {
return Flux.fromIterable(connectClusters.map(connects ->
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
).orElse(List.of()));
}
}

private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) {
if (clustersProperties.getCache().isEnabled()) {
return Mono.fromFuture(
cachedConnectors.get(key, (t, e) ->
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()
)
);
} else {
return getConnectConnectors(key.cluster(), key.connect()).collectList();
}
}

private Flux<InternalConnectorInfo> getConnectConnectors(
KafkaCluster cluster,
ClustersProperties.ConnectCluster connect) {
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
).map(tuple ->
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(null)
.tasks(tuple.getT2())
.topics(null)
.build()
)
);
}

public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search) {
return getConnects(cluster)
return getConnects(cluster, false)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName ->
Expand All @@ -65,7 +124,7 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
getConnectorTopics(cluster, connect.getName(), connectorName)
).map(tuple ->
InternalConnectInfo.builder()
InternalConnectorInfo.builder()
.connector(tuple.getT1())
.config(tuple.getT2())
.tasks(tuple.getT3())
Expand Down Expand Up @@ -289,4 +348,6 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
.formatted(connectorName, connectName));
});
}

record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ConnectorsExporter {
private final KafkaConnectService kafkaConnectService;

Flux<DataEntityList> export(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
return kafkaConnectService.getConnects(cluster, false)
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
.flatMap(connectorDTO ->
Expand All @@ -41,7 +41,7 @@ Flux<DataEntityList> export(KafkaCluster cluster) {
}

Flux<DataSource> getConnectDataSources(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
return kafkaConnectService.getConnects(cluster, false)
.map(ConnectorsExporter::toDataSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) {
}

private String serializeToYaml(PropertiesStructure props) {
//representer, that skips fields with null values
Representer representer = new Representer(new DumperOptions()) {
@Override
protected NodeTuple representJavaBeanProperty(Object javaBean,
Property property,
Object propertyValue,
Tag customTag) {
if (propertyValue == null) {
return null; // if value of property is null, ignore it.
} else {
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
}
}
};
Representer representer = new YamlNullSkipRepresenter(new DumperOptions());
var propertyUtils = new PropertyUtils();
propertyUtils.setBeanAccess(BeanAccess.FIELD);
representer.setPropertyUtils(propertyUtils);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.kafbat.ui.util;

import java.time.Duration;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.introspector.Property;
import org.yaml.snakeyaml.nodes.NodeTuple;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;

// representer, that skips fields with null values
public class YamlNullSkipRepresenter extends Representer {
public YamlNullSkipRepresenter(DumperOptions options) {
super(options);
this.representers.put(Duration.class, data -> this.representScalar(Tag.STR, data.toString()));
}

@Override
protected NodeTuple representJavaBeanProperty(Object javaBean,
Property property,
Object propertyValue,
Tag customTag) {
if (propertyValue == null) {
return null; // if value of property is null, ignore it.
} else {
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void exportsConnectorsAsDataTransformers() {
)
);

when(kafkaConnectService.getConnects(CLUSTER))
when(kafkaConnectService.getConnects(CLUSTER, false))
.thenReturn(Flux.just(connect));

when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
Expand Down
Loading
Loading