Skip to content

Commit 206f543

Browse files
committed
Added missed code
1 parent 643d432 commit 206f543

File tree

6 files changed

+197
-16
lines changed

6 files changed

+197
-16
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public enum LogLevel {
192192
public static class CacheProperties {
193193
boolean enabled = true;
194194
Duration connectCacheExpiry = Duration.ofMinutes(1);
195+
Duration connectClusterCacheExpiry = Duration.ofHours(24);
195196
}
196197

197198
@PostConstruct

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.mapper;
22

33
import io.kafbat.ui.config.ClustersProperties;
4+
import io.kafbat.ui.connect.model.ClusterInfo;
45
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
56
import io.kafbat.ui.connect.model.ConnectorTask;
67
import io.kafbat.ui.connect.model.NewConnector;
@@ -45,6 +46,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4546
default ConnectDTO toKafkaConnect(
4647
ClustersProperties.ConnectCluster connect,
4748
List<InternalConnectorInfo> connectors,
49+
ClusterInfo clusterInfo,
4850
boolean withStats) {
4951
Integer connectorCount = null;
5052
Integer failedConnectors = null;
@@ -66,12 +68,17 @@ default ConnectDTO toKafkaConnect(
6668
.filter(ConnectorStateDTO.FAILED::equals)
6769
.map(s -> 1).orElse(0);
6870

69-
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
70-
71-
for (TaskDTO task : connector.getTasks()) {
72-
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
73-
failedTasksCount += tasksCount;
74-
}
71+
tasksCount += internalConnector.map(ConnectorDTO::getTasks).map(List::size).orElse(0);
72+
73+
if (connector.getTasks() != null) {
74+
failedTasksCount += (int) connector.getTasks().stream()
75+
.filter(t ->
76+
Optional.ofNullable(t)
77+
.map(TaskDTO::getStatus)
78+
.map(TaskStatusDTO::getState)
79+
.map(ConnectorTaskStatusDTO.FAILED::equals)
80+
.orElse(false)
81+
).count();
7582
}
7683
}
7784

@@ -83,7 +90,10 @@ default ConnectDTO toKafkaConnect(
8390
.connectorsCount(connectorCount)
8491
.failedConnectorsCount(failedConnectors)
8592
.tasksCount(tasksCount)
86-
.failedTasksCount(failedTasksCount);
93+
.failedTasksCount(failedTasksCount)
94+
.version(clusterInfo.getVersion())
95+
.commit(clusterInfo.getCommit())
96+
.clusterId(clusterInfo.getKafkaClusterId());
8797
}
8898

8999
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.github.benmanes.caffeine.cache.Caffeine;
66
import io.kafbat.ui.config.ClustersProperties;
77
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
8+
import io.kafbat.ui.connect.model.ClusterInfo;
89
import io.kafbat.ui.connect.model.ConnectorStatus;
910
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
1011
import io.kafbat.ui.connect.model.ConnectorTopics;
@@ -36,10 +37,12 @@
3637
import javax.annotation.Nullable;
3738
import lombok.extern.slf4j.Slf4j;
3839
import org.apache.commons.lang3.StringUtils;
40+
import org.apache.kafka.common.protocol.types.Field;
3941
import org.springframework.stereotype.Service;
4042
import org.springframework.web.reactive.function.client.WebClientResponseException;
4143
import reactor.core.publisher.Flux;
4244
import reactor.core.publisher.Mono;
45+
import reactor.util.function.Tuples;
4346

4447
@Service
4548
@Slf4j
@@ -50,6 +53,7 @@ public class KafkaConnectService {
5053
private final ClustersProperties clustersProperties;
5154

5255
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
56+
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
5357

5458
public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
5559
KafkaConfigSanitizer kafkaConfigSanitizer,
@@ -61,24 +65,34 @@ public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafka
6165
this.cachedConnectors = Caffeine.newBuilder()
6266
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
6367
.buildAsync();
68+
this.cacheClusterInfo = Caffeine.newBuilder()
69+
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
70+
.buildAsync();
6471
}
6572

6673
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
6774
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
6875
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
76+
6977
if (withStats) {
7078
return connectClusters.map(connects ->
71-
Flux.fromIterable(connects).flatMap(connect -> (
72-
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
73-
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
74-
)
79+
Flux.fromIterable(connects).flatMap( c ->
80+
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
81+
).flatMap(tuple -> (
82+
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()), withStats)
83+
.map(connectors ->
84+
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
85+
)
7586
)
7687
)
7788
).orElse(Flux.fromIterable(List.of()));
7889
} else {
79-
return Flux.fromIterable(connectClusters.map(connects ->
80-
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
81-
).orElse(List.of()));
90+
return Flux.fromIterable(connectClusters.orElse(List.of()))
91+
.flatMap(c ->
92+
getClusterInfo(cluster, c.getName()).map(info ->
93+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
94+
)
95+
);
8296
}
8397
}
8498

@@ -94,6 +108,16 @@ private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectC
94108
}
95109
}
96110

111+
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
112+
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
113+
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
114+
.onErrorResume(th -> {
115+
log.error("Error on collecting cluster info" + th.getMessage(), th);
116+
return Mono.just(new ClusterInfo());
117+
}).toFuture()
118+
));
119+
}
120+
97121
private Flux<InternalConnectorInfo> getConnectConnectors(
98122
KafkaCluster cluster,
99123
ClustersProperties.ConnectCluster connect) {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,115 @@
1+
package io.kafbat.ui.mapper;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
14
import static org.junit.jupiter.api.Assertions.*;
5+
6+
import io.kafbat.ui.config.ClustersProperties;
7+
import io.kafbat.ui.connect.model.ClusterInfo;
8+
import io.kafbat.ui.model.ConnectDTO;
9+
import io.kafbat.ui.model.ConnectorDTO;
10+
import io.kafbat.ui.model.ConnectorStateDTO;
11+
import io.kafbat.ui.model.ConnectorStatusDTO;
12+
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
13+
import io.kafbat.ui.model.TaskDTO;
14+
import io.kafbat.ui.model.TaskIdDTO;
15+
import io.kafbat.ui.model.TaskStatusDTO;
16+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.UUID;
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import org.junit.jupiter.api.Test;
22+
import org.openapitools.jackson.nullable.JsonNullable;
23+
224
class KafkaConnectMapperTest {
325

26+
@Test
27+
void toKafkaConnect() {
28+
ThreadLocalRandom random = ThreadLocalRandom.current();
29+
KafkaConnectMapper mapper = new KafkaConnectMapperImpl();
30+
ClustersProperties.ConnectCluster connectCluster = ClustersProperties.ConnectCluster.builder()
31+
.name(UUID.randomUUID().toString())
32+
.address("http://localhost:" + random.nextInt(1000, 5000))
33+
.username(UUID.randomUUID().toString())
34+
.password(UUID.randomUUID().toString()).build();
35+
36+
List<InternalConnectorInfo> connectors = new ArrayList<>();
37+
int failedConnectors = 0;
38+
int failedTasks = 0;
39+
int tasksPerConnector = random.nextInt(1, 10);
40+
41+
for (int i = 0; i < 10; i++) {
42+
ConnectorStateDTO connectorStateDTO;
43+
if (random.nextBoolean()) {
44+
connectorStateDTO = ConnectorStateDTO.FAILED;
45+
failedConnectors++;
46+
} else {
47+
connectorStateDTO = ConnectorStateDTO.RUNNING;
48+
}
49+
50+
ConnectorDTO connectorDTO = new ConnectorDTO();
51+
connectorDTO.setName(UUID.randomUUID().toString());
52+
connectorDTO.setStatus(
53+
new ConnectorStatusDTO(connectorStateDTO, UUID.randomUUID().toString())
54+
);
55+
56+
List<TaskDTO> tasks = new ArrayList<>();
57+
List<TaskIdDTO> taskIds = new ArrayList<>();
58+
59+
for (int j = 0; j < tasksPerConnector; j++) {
60+
TaskDTO task = new TaskDTO();
61+
TaskIdDTO taskId = new TaskIdDTO(UUID.randomUUID().toString(), j);
62+
task.setId(taskId);
63+
64+
ConnectorTaskStatusDTO state;
65+
if (random.nextBoolean()) {
66+
state = ConnectorTaskStatusDTO.FAILED;
67+
failedTasks++;
68+
} else {
69+
state = ConnectorTaskStatusDTO.RUNNING;
70+
}
71+
72+
TaskStatusDTO status = new TaskStatusDTO();
73+
status.setState(state);
74+
task.setStatus(status);
75+
tasks.add(task);
76+
taskIds.add(taskId);
77+
}
78+
79+
connectorDTO.setTasks(taskIds);
80+
InternalConnectorInfo connector = InternalConnectorInfo.builder()
81+
.connector(connectorDTO)
82+
.tasks(tasks)
83+
.build();
84+
85+
connectors.add(connector);
86+
}
87+
88+
ClusterInfo clusterInfo = new ClusterInfo();
89+
clusterInfo.setVersion(UUID.randomUUID().toString());
90+
clusterInfo.setCommit(UUID.randomUUID().toString());
91+
clusterInfo.setKafkaClusterId(UUID.randomUUID().toString());
92+
93+
ConnectDTO connectDTO = new ConnectDTO();
94+
connectDTO.setName(connectCluster.getName());
95+
connectDTO.setAddress(connectCluster.getAddress());
96+
connectDTO.setVersion(JsonNullable.of(clusterInfo.getVersion()));
97+
connectDTO.setCommit(JsonNullable.of(clusterInfo.getCommit()));
98+
connectDTO.setClusterId(JsonNullable.of(clusterInfo.getKafkaClusterId()));
99+
connectDTO.setConnectorsCount(JsonNullable.of(connectors.size()));
100+
connectDTO.setFailedConnectorsCount(JsonNullable.of(failedConnectors));
101+
connectDTO.setTasksCount(JsonNullable.of(connectors.size() * tasksPerConnector));
102+
connectDTO.setFailedTasksCount(JsonNullable.of(failedTasks));
103+
104+
ConnectDTO kafkaConnect = mapper.toKafkaConnect(
105+
connectCluster,
106+
connectors,
107+
clusterInfo,
108+
true
109+
);
110+
111+
assertThat(kafkaConnect).isNotNull();
112+
assertThat(kafkaConnect).isEqualTo(connectDTO);
113+
114+
}
4115
}

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3445,12 +3445,22 @@ components:
34453445
failed_connectors_count:
34463446
type: integer
34473447
nullable: true
3448-
tasks_count:
3448+
tasksCount:
34493449
type: integer
34503450
nullable: true
3451-
failed_tasks_count:
3451+
failedTasksCount:
34523452
type: integer
34533453
nullable: true
3454+
version:
3455+
type: string
3456+
nullable: true
3457+
commit:
3458+
type: string
3459+
nullable: true
3460+
clusterId:
3461+
type: string
3462+
nullable: true
3463+
34543464
required:
34553465
- name
34563466

contract/src/main/resources/swagger/kafka-connect-api.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ servers:
1414
- url: /localhost
1515

1616
paths:
17+
/:
18+
get:
19+
tags:
20+
- KafkaConnectClient
21+
summary: get kafka connect info
22+
operationId: getClusterInfo
23+
responses:
24+
200:
25+
description: OK
26+
content:
27+
application/json:
28+
schema:
29+
$ref: '#/components/schemas/ClusterInfo'
30+
1731
/connectors:
1832
get:
1933
tags:
@@ -419,6 +433,17 @@ components:
419433
type: http
420434
scheme: basic
421435
schemas:
436+
437+
ClusterInfo:
438+
type: object
439+
properties:
440+
version:
441+
type: string
442+
commit:
443+
type: string
444+
kafka_cluster_id:
445+
type: string
446+
422447
ConnectorConfig:
423448
type: object
424449
additionalProperties:

0 commit comments

Comments
 (0)