Skip to content

Commit abb46f1

Browse files
committed
BE: Fixes #445 be connect info
1 parent 9ea69c7 commit abb46f1

File tree

5 files changed

+71
-12
lines changed

5 files changed

+71
-12
lines changed

api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
import io.kafbat.ui.model.ConfigSourceDTO;
1616
import io.kafbat.ui.model.ConfigSynonymDTO;
1717
import io.kafbat.ui.model.ConnectDTO;
18+
import io.kafbat.ui.model.ConnectorDTO;
19+
import io.kafbat.ui.model.ConnectorStateDTO;
20+
import io.kafbat.ui.model.ConnectorStatusDTO;
21+
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1822
import io.kafbat.ui.model.InternalBroker;
1923
import io.kafbat.ui.model.InternalBrokerConfig;
2024
import io.kafbat.ui.model.InternalClusterState;
@@ -29,16 +33,19 @@
2933
import io.kafbat.ui.model.Metrics;
3034
import io.kafbat.ui.model.PartitionDTO;
3135
import io.kafbat.ui.model.ReplicaDTO;
36+
import io.kafbat.ui.model.TaskDTO;
3237
import io.kafbat.ui.model.TopicConfigDTO;
3338
import io.kafbat.ui.model.TopicDTO;
3439
import io.kafbat.ui.model.TopicDetailsDTO;
3540
import io.kafbat.ui.model.TopicProducerStateDTO;
41+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
3642
import io.kafbat.ui.service.metrics.SummarizedMetrics;
3743
import io.prometheus.metrics.model.snapshots.Label;
3844
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
3945
import java.math.BigDecimal;
4046
import java.util.List;
4147
import java.util.Map;
48+
import java.util.Optional;
4249
import java.util.stream.Stream;
4350
import org.apache.kafka.clients.admin.ConfigEntry;
4451
import org.apache.kafka.clients.admin.ProducerState;
@@ -118,7 +125,38 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
118125

119126
ReplicaDTO toReplica(InternalReplica replica);
120127

121-
ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);
128+
default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List<InternalConnectorInfo> connectors) {
129+
int connectorCount = connectors.size();
130+
int failedConnectors = 0;
131+
int tasksCount = 0;
132+
int failedTasksCount = 0;
133+
134+
for (InternalConnectorInfo connector : connectors) {
135+
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
136+
137+
failedConnectors += internalConnector
138+
.map(ConnectorDTO::getStatus)
139+
.map(ConnectorStatusDTO::getState)
140+
.filter(ConnectorStateDTO.FAILED::equals)
141+
.map(s -> 1).orElse(0);
142+
143+
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
144+
145+
for (TaskDTO task : connector.getTasks()) {
146+
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
147+
failedTasksCount += tasksCount;
148+
}
149+
}
150+
}
151+
152+
return new ConnectDTO()
153+
.address(connect.getAddress())
154+
.name(connect.getName())
155+
.connectorsCount(connectorCount)
156+
.failedConnectorsCount(failedConnectors)
157+
.tasksCount(tasksCount)
158+
.failedTasksCount(failedTasksCount);
159+
}
122160

123161
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
124162

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.kafbat.ui.model.FullConnectorInfoDTO;
1212
import io.kafbat.ui.model.TaskDTO;
1313
import io.kafbat.ui.model.TaskStatusDTO;
14-
import io.kafbat.ui.model.connect.InternalConnectInfo;
14+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1515
import java.util.List;
1616
import org.mapstruct.Mapper;
1717
import org.mapstruct.Mapping;
@@ -38,7 +38,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
3838
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
3939
connectorPluginConfigValidationResponse);
4040

41-
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
41+
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
4242
ConnectorDTO connector = connectInfo.getConnector();
4343
List<TaskDTO> tasks = connectInfo.getTasks();
4444
int failedTasksCount = (int) tasks.stream()

api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java renamed to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
@Data
1111
@Builder(toBuilder = true)
12-
public class InternalConnectInfo {
12+
public class InternalConnectorInfo {
1313
private final ConnectorDTO connector;
1414
private final Map<String, Object> config;
1515
private final List<TaskDTO> tasks;

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import io.kafbat.ui.model.KafkaCluster;
2424
import io.kafbat.ui.model.NewConnectorDTO;
2525
import io.kafbat.ui.model.TaskDTO;
26-
import io.kafbat.ui.model.connect.InternalConnectInfo;
26+
import io.kafbat.ui.model.TaskStatusDTO;
27+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
2728
import io.kafbat.ui.util.ReactiveFailover;
2829
import java.util.List;
2930
import java.util.Map;
@@ -33,7 +34,6 @@
3334
import javax.annotation.Nullable;
3435
import lombok.RequiredArgsConstructor;
3536
import lombok.extern.slf4j.Slf4j;
36-
import org.apache.commons.lang3.StringUtils;
3737
import org.springframework.stereotype.Service;
3838
import org.springframework.web.reactive.function.client.WebClientResponseException;
3939
import reactor.core.publisher.Flux;
@@ -48,11 +48,24 @@ public class KafkaConnectService {
4848
private final KafkaConfigSanitizer kafkaConfigSanitizer;
4949

5050
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
51-
return Flux.fromIterable(
52-
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
53-
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
54-
.orElse(List.of())
55-
);
51+
return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
52+
.map(connects -> Flux.fromIterable(connects).flatMap(connect ->
53+
getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
54+
Mono.zip(
55+
getConnector(cluster, connect.getName(), connectorName),
56+
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
57+
).map(tuple ->
58+
InternalConnectorInfo.builder()
59+
.connector(tuple.getT1())
60+
.config(null)
61+
.tasks(tuple.getT2())
62+
.topics(null)
63+
.build()
64+
)
65+
).collectList().map(connectors ->
66+
clusterMapper.toKafkaConnect(connect, connectors)
67+
)
68+
)).orElse(Flux.fromIterable(List.of()));
5669
}
5770

5871
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@@ -67,7 +80,7 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
6780
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
6881
getConnectorTopics(cluster, connect.getName(), connectorName)
6982
).map(tuple ->
70-
InternalConnectInfo.builder()
83+
InternalConnectorInfo.builder()
7184
.connector(tuple.getT1())
7285
.config(tuple.getT2())
7386
.tasks(tuple.getT3())

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3494,6 +3494,14 @@ components:
34943494
type: string
34953495
address:
34963496
type: string
3497+
connectors_count:
3498+
type: integer
3499+
failed_connectors_count:
3500+
type: integer
3501+
tasks_count:
3502+
type: integer
3503+
failed_tasks_count:
3504+
type: integer
34973505
required:
34983506
- name
34993507

0 commit comments

Comments
 (0)