Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public enum LogLevel {
public static class CacheProperties {
boolean enabled = true;
Duration connectCacheExpiry = Duration.ofMinutes(1);
Duration connectClusterCacheExpiry = Duration.ofHours(24);
}

@PostConstruct
Expand Down
24 changes: 17 additions & 7 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.model.ClusterInfo;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTask;
import io.kafbat.ui.connect.model.NewConnector;
Expand Down Expand Up @@ -45,6 +46,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
default ConnectDTO toKafkaConnect(
ClustersProperties.ConnectCluster connect,
List<InternalConnectorInfo> connectors,
ClusterInfo clusterInfo,
boolean withStats) {
Integer connectorCount = null;
Integer failedConnectors = null;
Expand All @@ -66,12 +68,17 @@ default ConnectDTO toKafkaConnect(
.filter(ConnectorStateDTO.FAILED::equals)
.map(s -> 1).orElse(0);

tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);

for (TaskDTO task : connector.getTasks()) {
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
failedTasksCount += tasksCount;
}
tasksCount += internalConnector.map(ConnectorDTO::getTasks).map(List::size).orElse(0);

if (connector.getTasks() != null) {
failedTasksCount += (int) connector.getTasks().stream()
.filter(t ->
Optional.ofNullable(t)
.map(TaskDTO::getStatus)
.map(TaskStatusDTO::getState)
.map(ConnectorTaskStatusDTO.FAILED::equals)
.orElse(false)
).count();
}
}

Expand All @@ -83,7 +90,10 @@ default ConnectDTO toKafkaConnect(
.connectorsCount(connectorCount)
.failedConnectorsCount(failedConnectors)
.tasksCount(tasksCount)
.failedTasksCount(failedTasksCount);
.failedTasksCount(failedTasksCount)
.version(clusterInfo.getVersion())
.commit(clusterInfo.getCommit())
.clusterId(clusterInfo.getKafkaClusterId());
}

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
Expand Down
53 changes: 37 additions & 16 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ClusterInfo;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.mapper.ClusterMapper;
import io.kafbat.ui.mapper.KafkaConnectMapper;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.ConnectorActionDTO;
Expand Down Expand Up @@ -40,49 +40,59 @@
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Service
@Slf4j
public class KafkaConnectService {
private final ClusterMapper clusterMapper;
private final KafkaConnectMapper kafkaConnectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;

private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;

public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
KafkaConfigSanitizer kafkaConfigSanitizer,
ClustersProperties clustersProperties) {
this.clusterMapper = clusterMapper;
this.kafkaConnectMapper = kafkaConnectMapper;
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.cachedConnectors = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
.buildAsync();
this.cacheClusterInfo = Caffeine.newBuilder()
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
.buildAsync();
}

public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());

if (withStats) {
return connectClusters.map(connects ->
Flux.fromIterable(connects).flatMap(connect -> (
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
)
Flux.fromIterable(connects).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
).flatMap(tuple -> (
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()))
.map(connectors ->
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
)
)
)
).orElse(Flux.fromIterable(List.of()));
} else {
return Flux.fromIterable(connectClusters.map(connects ->
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
).orElse(List.of()));
return Flux.fromIterable(connectClusters.orElse(List.of()))
.flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
)
);
}
}

private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) {
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key) {
if (clustersProperties.getCache().isEnabled()) {
return Mono.fromFuture(
cachedConnectors.get(key, (t, e) ->
Expand All @@ -94,6 +104,16 @@ private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectC
}
}

private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
.onErrorResume(th -> {
log.error("Error on collecting cluster info" + th.getMessage(), th);
return Mono.just(new ClusterInfo());
}).toFuture()
));
}

private Flux<InternalConnectorInfo> getConnectConnectors(
KafkaCluster cluster,
ClustersProperties.ConnectCluster connect) {
Expand Down Expand Up @@ -177,12 +197,13 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
.mono(client ->
connector
.flatMap(c -> connectorExists(cluster, connectName, c.getName())
.map(exists -> {
.flatMap(exists -> {
if (Boolean.TRUE.equals(exists)) {
throw new ValidationException(
String.format("Connector with name %s already exists", c.getName()));
return Mono.error(new ValidationException(
String.format("Connector with name %s already exists", c.getName())));
} else {
return Mono.just(c);
}
return c;
}))
.map(kafkaConnectMapper::toClient)
.flatMap(client::createConnector)
Expand Down
115 changes: 115 additions & 0 deletions api/src/test/java/io/kafbat/ui/mapper/KafkaConnectMapperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.kafbat.ui.mapper;

import static org.assertj.core.api.Assertions.assertThat;

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.model.ClusterInfo;
import io.kafbat.ui.model.ConnectDTO;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorStateDTO;
import io.kafbat.ui.model.ConnectorStatusDTO;
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.TaskStatusDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Test;
import org.openapitools.jackson.nullable.JsonNullable;

class KafkaConnectMapperTest {

@Test
void toKafkaConnect() {
ThreadLocalRandom random = ThreadLocalRandom.current();

List<InternalConnectorInfo> connectors = new ArrayList<>();
int failedConnectors = 0;
int failedTasks = 0;
int tasksPerConnector = random.nextInt(1, 10);

for (int i = 0; i < 10; i++) {
ConnectorStateDTO connectorState;
if (random.nextBoolean()) {
connectorState = ConnectorStateDTO.FAILED;
failedConnectors++;
} else {
connectorState = ConnectorStateDTO.RUNNING;
}

ConnectorDTO connectorDto = new ConnectorDTO();
connectorDto.setName(UUID.randomUUID().toString());
connectorDto.setStatus(
new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString())
);

List<TaskDTO> tasks = new ArrayList<>();
List<TaskIdDTO> taskIds = new ArrayList<>();

for (int j = 0; j < tasksPerConnector; j++) {
TaskDTO task = new TaskDTO();
TaskIdDTO taskId = new TaskIdDTO(UUID.randomUUID().toString(), j);
task.setId(taskId);

ConnectorTaskStatusDTO state;
if (random.nextBoolean()) {
state = ConnectorTaskStatusDTO.FAILED;
failedTasks++;
} else {
state = ConnectorTaskStatusDTO.RUNNING;
}

TaskStatusDTO status = new TaskStatusDTO();
status.setState(state);
task.setStatus(status);
tasks.add(task);
taskIds.add(taskId);
}

connectorDto.setTasks(taskIds);
InternalConnectorInfo connector = InternalConnectorInfo.builder()
.connector(connectorDto)
.tasks(tasks)
.build();

connectors.add(connector);
}

ClusterInfo clusterInfo = new ClusterInfo();
clusterInfo.setVersion(UUID.randomUUID().toString());
clusterInfo.setCommit(UUID.randomUUID().toString());
clusterInfo.setKafkaClusterId(UUID.randomUUID().toString());

ClustersProperties.ConnectCluster connectCluster = ClustersProperties.ConnectCluster.builder()
.name(UUID.randomUUID().toString())
.address("http://localhost:" + random.nextInt(1000, 5000))
.username(UUID.randomUUID().toString())
.password(UUID.randomUUID().toString()).build();

ConnectDTO connectDto = new ConnectDTO();
connectDto.setName(connectCluster.getName());
connectDto.setAddress(connectCluster.getAddress());
connectDto.setVersion(JsonNullable.of(clusterInfo.getVersion()));
connectDto.setCommit(JsonNullable.of(clusterInfo.getCommit()));
connectDto.setClusterId(JsonNullable.of(clusterInfo.getKafkaClusterId()));
connectDto.setConnectorsCount(JsonNullable.of(connectors.size()));
connectDto.setFailedConnectorsCount(JsonNullable.of(failedConnectors));
connectDto.setTasksCount(JsonNullable.of(connectors.size() * tasksPerConnector));
connectDto.setFailedTasksCount(JsonNullable.of(failedTasks));

KafkaConnectMapper mapper = new KafkaConnectMapperImpl();
ConnectDTO kafkaConnect = mapper.toKafkaConnect(
connectCluster,
connectors,
clusterInfo,
true
);

assertThat(kafkaConnect).isNotNull();
assertThat(kafkaConnect).isEqualTo(connectDto);

}
}
14 changes: 12 additions & 2 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3445,12 +3445,22 @@ components:
failed_connectors_count:
type: integer
nullable: true
tasks_count:
tasksCount:
type: integer
nullable: true
failed_tasks_count:
failedTasksCount:
type: integer
nullable: true
version:
type: string
nullable: true
commit:
type: string
nullable: true
clusterId:
type: string
nullable: true

required:
- name

Expand Down
25 changes: 25 additions & 0 deletions contract/src/main/resources/swagger/kafka-connect-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ servers:
- url: /localhost

paths:
/:
get:
tags:
- KafkaConnectClient
summary: get kafka connect info
operationId: getClusterInfo
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ClusterInfo'

/connectors:
get:
tags:
Expand Down Expand Up @@ -419,6 +433,17 @@ components:
type: http
scheme: basic
schemas:

ClusterInfo:
type: object
properties:
version:
type: string
commit:
type: string
kafka_cluster_id:
type: string

ConnectorConfig:
type: object
additionalProperties:
Expand Down
Loading