Skip to content

Commit 253b303

Browse files
committed
BE: Fixes #445 Added connect stats
1 parent d73595a commit 253b303

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
Subject: [PATCH] BE: Fixes #445 be connect info
2+
---
3+
Index: api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
4+
IDEA additional info:
5+
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
6+
<+>UTF-8
7+
===================================================================
8+
diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
9+
--- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243)
10+
+++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e)
11+
@@ -15,6 +15,10 @@
12+
import io.kafbat.ui.model.ConfigSourceDTO;
13+
import io.kafbat.ui.model.ConfigSynonymDTO;
14+
import io.kafbat.ui.model.ConnectDTO;
15+
+import io.kafbat.ui.model.ConnectorDTO;
16+
+import io.kafbat.ui.model.ConnectorStateDTO;
17+
+import io.kafbat.ui.model.ConnectorStatusDTO;
18+
+import io.kafbat.ui.model.ConnectorTaskStatusDTO;
19+
import io.kafbat.ui.model.InternalBroker;
20+
import io.kafbat.ui.model.InternalBrokerConfig;
21+
import io.kafbat.ui.model.InternalClusterState;
22+
@@ -29,16 +33,19 @@
23+
import io.kafbat.ui.model.Metrics;
24+
import io.kafbat.ui.model.PartitionDTO;
25+
import io.kafbat.ui.model.ReplicaDTO;
26+
+import io.kafbat.ui.model.TaskDTO;
27+
import io.kafbat.ui.model.TopicConfigDTO;
28+
import io.kafbat.ui.model.TopicDTO;
29+
import io.kafbat.ui.model.TopicDetailsDTO;
30+
import io.kafbat.ui.model.TopicProducerStateDTO;
31+
+import io.kafbat.ui.model.connect.InternalConnectorInfo;
32+
import io.kafbat.ui.service.metrics.SummarizedMetrics;
33+
import io.prometheus.metrics.model.snapshots.Label;
34+
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
35+
import java.math.BigDecimal;
36+
import java.util.List;
37+
import java.util.Map;
38+
+import java.util.Optional;
39+
import java.util.stream.Stream;
40+
import org.apache.kafka.clients.admin.ConfigEntry;
41+
import org.apache.kafka.clients.admin.ProducerState;
42+
@@ -118,7 +125,38 @@
43+
44+
ReplicaDTO toReplica(InternalReplica replica);
45+
46+
- ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect);
47+
+ default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List<InternalConnectorInfo> connectors) {
48+
+ int connectorCount = connectors.size();
49+
+ int failedConnectors = 0;
50+
+ int tasksCount = 0;
51+
+ int failedTasksCount = 0;
52+
+
53+
+ for (InternalConnectorInfo connector : connectors) {
54+
+ Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
55+
+
56+
+ failedConnectors += internalConnector
57+
+ .map(ConnectorDTO::getStatus)
58+
+ .map(ConnectorStatusDTO::getState)
59+
+ .filter(ConnectorStateDTO.FAILED::equals)
60+
+ .map(s -> 1).orElse(0);
61+
+
62+
+ tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
63+
+
64+
+ for (TaskDTO task : connector.getTasks()) {
65+
+ if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
66+
+ failedTasksCount += tasksCount;
67+
+ }
68+
+ }
69+
+ }
70+
+
71+
+ return new ConnectDTO()
72+
+ .address(connect.getAddress())
73+
+ .name(connect.getName())
74+
+ .connectorsCount(connectorCount)
75+
+ .failedConnectorsCount(failedConnectors)
76+
+ .tasksCount(tasksCount)
77+
+ .failedTasksCount(failedTasksCount);
78+
+ }
79+
80+
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
81+
82+
Index: api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
83+
IDEA additional info:
84+
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
85+
<+>UTF-8
86+
===================================================================
87+
diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
88+
--- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243)
89+
+++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e)
90+
@@ -11,7 +11,7 @@
91+
import io.kafbat.ui.model.FullConnectorInfoDTO;
92+
import io.kafbat.ui.model.TaskDTO;
93+
import io.kafbat.ui.model.TaskStatusDTO;
94+
-import io.kafbat.ui.model.connect.InternalConnectInfo;
95+
+import io.kafbat.ui.model.connect.InternalConnectorInfo;
96+
import java.util.List;
97+
import org.mapstruct.Mapper;
98+
import org.mapstruct.Mapping;
99+
@@ -38,7 +38,7 @@
100+
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
101+
connectorPluginConfigValidationResponse);
102+
103+
- default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
104+
+ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
105+
ConnectorDTO connector = connectInfo.getConnector();
106+
List<TaskDTO> tasks = connectInfo.getTasks();
107+
int failedTasksCount = (int) tasks.stream()
108+
Index: api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java
109+
IDEA additional info:
110+
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
111+
<+>UTF-8
112+
===================================================================
113+
diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java
114+
rename from api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java
115+
rename to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java
116+
--- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243)
117+
+++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java (revision abb46f1075df173e25157e5add12606ccbc7547e)
118+
@@ -9,7 +9,7 @@
119+
120+
@Data
121+
@Builder(toBuilder = true)
122+
-public class InternalConnectInfo {
123+
+public class InternalConnectorInfo {
124+
private final ConnectorDTO connector;
125+
private final Map<String, Object> config;
126+
private final List<TaskDTO> tasks;
127+
Index: api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
128+
IDEA additional info:
129+
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
130+
<+>UTF-8
131+
===================================================================
132+
diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
133+
--- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243)
134+
+++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision abb46f1075df173e25157e5add12606ccbc7547e)
135+
@@ -23,7 +23,8 @@
136+
import io.kafbat.ui.model.KafkaCluster;
137+
import io.kafbat.ui.model.NewConnectorDTO;
138+
import io.kafbat.ui.model.TaskDTO;
139+
-import io.kafbat.ui.model.connect.InternalConnectInfo;
140+
+import io.kafbat.ui.model.TaskStatusDTO;
141+
+import io.kafbat.ui.model.connect.InternalConnectorInfo;
142+
import io.kafbat.ui.util.ReactiveFailover;
143+
import java.util.List;
144+
import java.util.Map;
145+
@@ -33,7 +34,6 @@
146+
import javax.annotation.Nullable;
147+
import lombok.RequiredArgsConstructor;
148+
import lombok.extern.slf4j.Slf4j;
149+
-import org.apache.commons.lang3.StringUtils;
150+
import org.springframework.stereotype.Service;
151+
import org.springframework.web.reactive.function.client.WebClientResponseException;
152+
import reactor.core.publisher.Flux;
153+
@@ -48,11 +48,24 @@
154+
private final KafkaConfigSanitizer kafkaConfigSanitizer;
155+
156+
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
157+
- return Flux.fromIterable(
158+
- Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
159+
- .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
160+
- .orElse(List.of())
161+
- );
162+
+ return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
163+
+ .map(connects -> Flux.fromIterable(connects).flatMap(connect ->
164+
+ getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
165+
+ Mono.zip(
166+
+ getConnector(cluster, connect.getName(), connectorName),
167+
+ getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
168+
+ ).map(tuple ->
169+
+ InternalConnectorInfo.builder()
170+
+ .connector(tuple.getT1())
171+
+ .config(null)
172+
+ .tasks(tuple.getT2())
173+
+ .topics(null)
174+
+ .build()
175+
+ )
176+
+ ).collectList().map(connectors ->
177+
+ clusterMapper.toKafkaConnect(connect, connectors)
178+
+ )
179+
+ )).orElse(Flux.fromIterable(List.of()));
180+
}
181+
182+
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
183+
@@ -67,7 +80,7 @@
184+
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
185+
getConnectorTopics(cluster, connect.getName(), connectorName)
186+
).map(tuple ->
187+
- InternalConnectInfo.builder()
188+
+ InternalConnectorInfo.builder()
189+
.connector(tuple.getT1())
190+
.config(tuple.getT2())
191+
.tasks(tuple.getT3())
192+
Index: contract/src/main/resources/swagger/kafbat-ui-api.yaml
193+
IDEA additional info:
194+
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
195+
<+>UTF-8
196+
===================================================================
197+
diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml
198+
--- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision 9ea69c7868652a04a7af949eff383ee09e8cc243)
199+
+++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision abb46f1075df173e25157e5add12606ccbc7547e)
200+
@@ -3494,6 +3494,14 @@
201+
type: string
202+
address:
203+
type: string
204+
+ connectors_count:
205+
+ type: integer
206+
+ failed_connectors_count:
207+
+ type: integer
208+
+ tasks_count:
209+
+ type: integer
210+
+ failed_tasks_count:
211+
+ type: integer
212+
required:
213+
- name
214+

0 commit comments

Comments
 (0)