Skip to content

Commit e398596

Browse files
committed
BE: Fixes #445 Added connect stats
1 parent d3f2f3a commit e398596

File tree

5 files changed

+70
-11
lines changed

5 files changed

+70
-11
lines changed

api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
import io.kafbat.ui.model.ConfigSourceDTO;
1313
import io.kafbat.ui.model.ConfigSynonymDTO;
1414
import io.kafbat.ui.model.ConnectDTO;
15+
import io.kafbat.ui.model.ConnectorDTO;
16+
import io.kafbat.ui.model.ConnectorStateDTO;
17+
import io.kafbat.ui.model.ConnectorStatusDTO;
18+
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1519
import io.kafbat.ui.model.InternalBroker;
1620
import io.kafbat.ui.model.InternalBrokerConfig;
1721
import io.kafbat.ui.model.InternalBrokerDiskUsage;
@@ -27,13 +31,16 @@
2731
import io.kafbat.ui.model.Metrics;
2832
import io.kafbat.ui.model.PartitionDTO;
2933
import io.kafbat.ui.model.ReplicaDTO;
34+
import io.kafbat.ui.model.TaskDTO;
3035
import io.kafbat.ui.model.TopicConfigDTO;
3136
import io.kafbat.ui.model.TopicDTO;
3237
import io.kafbat.ui.model.TopicDetailsDTO;
3338
import io.kafbat.ui.model.TopicProducerStateDTO;
39+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
3440
import io.kafbat.ui.service.metrics.RawMetric;
3541
import java.util.List;
3642
import java.util.Map;
43+
import java.util.Optional;
3744
import org.apache.kafka.clients.admin.ConfigEntry;
3845
import org.apache.kafka.clients.admin.ProducerState;
3946
import org.apache.kafka.common.acl.AccessControlEntry;
@@ -107,7 +114,38 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
107114

108115
ReplicaDTO toReplica(InternalReplica replica);
109116

110-
ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);
117+
default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List<InternalConnectorInfo> connectors) {
118+
int connectorCount = connectors.size();
119+
int failedConnectors = 0;
120+
int tasksCount = 0;
121+
int failedTasksCount = 0;
122+
123+
for (InternalConnectorInfo connector : connectors) {
124+
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
125+
126+
failedConnectors += internalConnector
127+
.map(ConnectorDTO::getStatus)
128+
.map(ConnectorStatusDTO::getState)
129+
.filter(ConnectorStateDTO.FAILED::equals)
130+
.map(s -> 1).orElse(0);
131+
132+
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
133+
134+
for (TaskDTO task : connector.getTasks()) {
135+
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
136+
failedTasksCount += tasksCount;
137+
}
138+
}
139+
}
140+
141+
return new ConnectDTO()
142+
.address(connect.getAddress())
143+
.name(connect.getName())
144+
.connectorsCount(connectorCount)
145+
.failedConnectorsCount(failedConnectors)
146+
.tasksCount(tasksCount)
147+
.failedTasksCount(failedTasksCount);
148+
}
111149

112150
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
113151

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.kafbat.ui.model.FullConnectorInfoDTO;
1212
import io.kafbat.ui.model.TaskDTO;
1313
import io.kafbat.ui.model.TaskStatusDTO;
14-
import io.kafbat.ui.model.connect.InternalConnectInfo;
14+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1515
import java.util.List;
1616
import org.mapstruct.Mapper;
1717
import org.mapstruct.Mapping;
@@ -38,7 +38,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
3838
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
3939
connectorPluginConfigValidationResponse);
4040

41-
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
41+
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
4242
ConnectorDTO connector = connectInfo.getConnector();
4343
List<TaskDTO> tasks = connectInfo.getTasks();
4444
int failedTasksCount = (int) tasks.stream()

api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
@Data
1111
@Builder(toBuilder = true)
12-
public class InternalConnectInfo {
12+
public class InternalConnectorInfo {
1313
private final ConnectorDTO connector;
1414
private final Map<String, Object> config;
1515
private final List<TaskDTO> tasks;

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.kafbat.ui.model.KafkaCluster;
2222
import io.kafbat.ui.model.NewConnectorDTO;
2323
import io.kafbat.ui.model.TaskDTO;
24-
import io.kafbat.ui.model.connect.InternalConnectInfo;
24+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
2525
import io.kafbat.ui.util.ReactiveFailover;
2626
import java.util.List;
2727
import java.util.Map;
@@ -46,11 +46,24 @@ public class KafkaConnectService {
4646
private final KafkaConfigSanitizer kafkaConfigSanitizer;
4747

4848
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
49-
return Flux.fromIterable(
50-
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
51-
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
52-
.orElse(List.of())
53-
);
49+
return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
50+
.map(connects -> Flux.fromIterable(connects).flatMap(connect ->
51+
getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
52+
Mono.zip(
53+
getConnector(cluster, connect.getName(), connectorName),
54+
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
55+
).map(tuple ->
56+
InternalConnectorInfo.builder()
57+
.connector(tuple.getT1())
58+
.config(null)
59+
.tasks(tuple.getT2())
60+
.topics(null)
61+
.build()
62+
)
63+
).collectList().map(connectors ->
64+
clusterMapper.toKafkaConnect(connect, connectors)
65+
)
66+
)).orElse(Flux.fromIterable(List.of()));
5467
}
5568

5669
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@@ -65,7 +78,7 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
6578
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
6679
getConnectorTopics(cluster, connect.getName(), connectorName)
6780
).map(tuple ->
68-
InternalConnectInfo.builder()
81+
InternalConnectorInfo.builder()
6982
.connector(tuple.getT1())
7083
.config(tuple.getT2())
7184
.tasks(tuple.getT3())

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3434,6 +3434,14 @@ components:
34343434
type: string
34353435
address:
34363436
type: string
3437+
connectors_count:
3438+
type: integer
3439+
failed_connectors_count:
3440+
type: integer
3441+
tasks_count:
3442+
type: integer
3443+
failed_tasks_count:
3444+
type: integer
34373445
required:
34383446
- name
34393447

0 commit comments

Comments
 (0)