Skip to content

Commit 109f7c0

Browse files
authored
BE: Fixed issue-1334 switched to extended connectors endpoint (#1418)
1 parent 3616d36 commit 109f7c0

File tree

11 files changed

+214
-135
lines changed

11 files changed

+214
-135
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import io.kafbat.ui.connect.ApiClient;
88
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
99
import io.kafbat.ui.connect.model.Connector;
10+
import io.kafbat.ui.connect.model.ConnectorExpand;
1011
import io.kafbat.ui.connect.model.ConnectorPlugin;
1112
import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse;
1213
import io.kafbat.ui.connect.model.ConnectorStatus;
1314
import io.kafbat.ui.connect.model.ConnectorTask;
1415
import io.kafbat.ui.connect.model.ConnectorTopics;
16+
import io.kafbat.ui.connect.model.ExpandedConnector;
1517
import io.kafbat.ui.connect.model.NewConnector;
1618
import io.kafbat.ui.connect.model.TaskStatus;
1719
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
@@ -221,13 +223,17 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
221223
}
222224

223225
@Override
224-
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
225-
return withRetryOnConflictOrRebalance(super.getConnectors(search));
226+
public Mono<Map<String, ExpandedConnector>> getConnectors(
227+
String search, List<ConnectorExpand> expand
228+
) throws WebClientResponseException {
229+
return withRetryOnConflictOrRebalance(super.getConnectors(search, expand));
226230
}
227231

228232
@Override
229-
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
230-
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
233+
public Mono<ResponseEntity<Map<String, ExpandedConnector>>> getConnectorsWithHttpInfo(
234+
String search, List<ConnectorExpand> expand
235+
) throws WebClientResponseException {
236+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand));
231237
}
232238

233239
@Override

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ public enum LogLevel {
227227
@AllArgsConstructor
228228
public static class CacheProperties {
229229
boolean enabled = true;
230-
Duration connectCacheExpiry = Duration.ofMinutes(1);
231230
Duration connectClusterCacheExpiry = Duration.ofHours(24);
232231
}
233232

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6666
.build();
6767

6868
return validateAccess(context)
69-
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
70-
.doOnEach(sig -> audit(context, sig));
69+
.thenReturn(
70+
ResponseEntity.ok(
71+
kafkaConnectService.getConnectors(getCluster(clusterName), connectName)
72+
.flatMapMany(m -> Flux.fromIterable(m.keySet()))
73+
)
74+
).doOnEach(sig -> audit(context, sig));
7175
}
7276

7377
@Override

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kafbat.ui.connect.model.ClusterInfo;
55
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
66
import io.kafbat.ui.connect.model.ConnectorTask;
7+
import io.kafbat.ui.connect.model.ExpandedConnector;
78
import io.kafbat.ui.connect.model.NewConnector;
89
import io.kafbat.ui.model.ConnectDTO;
910
import io.kafbat.ui.model.ConnectorDTO;
@@ -14,10 +15,15 @@
1415
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1516
import io.kafbat.ui.model.FullConnectorInfoDTO;
1617
import io.kafbat.ui.model.TaskDTO;
18+
import io.kafbat.ui.model.TaskIdDTO;
1719
import io.kafbat.ui.model.TaskStatusDTO;
1820
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1921
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Objects;
2024
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
import javax.annotation.Nullable;
2127
import org.mapstruct.Mapper;
2228
import org.mapstruct.Mapping;
2329

@@ -43,6 +49,39 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4349
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
4450
connectorPluginConfigValidationResponse);
4551

52+
default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List<String> topics) {
53+
Objects.requireNonNull(connector.getInfo());
54+
Objects.requireNonNull(connector.getStatus());
55+
List<TaskDTO> tasks = List.of();
56+
57+
if (connector.getInfo().getTasks() != null
58+
&& connector.getStatus().getTasks() != null
59+
) {
60+
Map<Integer, TaskIdDTO> taskIds = connector.getInfo().getTasks()
61+
.stream().map(t -> new TaskIdDTO().task(t.getTask()).connector(t.getConnector()))
62+
.collect(Collectors.toMap(
63+
TaskIdDTO::getTask,
64+
t -> t
65+
));
66+
67+
tasks = connector.getStatus().getTasks().stream()
68+
.map(s ->
69+
new TaskDTO().status(fromClient(s)).id(taskIds.get(s.getId()))
70+
).toList();
71+
}
72+
73+
ConnectorDTO connectorDto = fromClient(connector.getInfo())
74+
.connect(connect)
75+
.status(fromClient(connector.getStatus().getConnector()));
76+
77+
return InternalConnectorInfo.builder()
78+
.connector(connectorDto)
79+
.config(connector.getInfo().getConfig())
80+
.tasks(tasks)
81+
.topics(topics)
82+
.build();
83+
}
84+
4685
default ConnectDTO toKafkaConnect(
4786
ClustersProperties.ConnectCluster connect,
4887
List<InternalConnectorInfo> connectors,

api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.springframework.stereotype.Component;
2020

2121
@Component
22-
class KafkaConfigSanitizer {
22+
public class KafkaConfigSanitizer {
2323

2424
private static final String SANITIZED_VALUE = "******";
2525

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 44 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import io.kafbat.ui.config.ClustersProperties;
66
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
77
import io.kafbat.ui.connect.model.ClusterInfo;
8+
import io.kafbat.ui.connect.model.ConnectorExpand;
89
import io.kafbat.ui.connect.model.ConnectorStatus;
910
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
1011
import io.kafbat.ui.connect.model.ConnectorTopics;
12+
import io.kafbat.ui.connect.model.ExpandedConnector;
1113
import io.kafbat.ui.connect.model.TaskStatus;
1214
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
1315
import io.kafbat.ui.exception.NotFoundException;
@@ -24,6 +26,7 @@
2426
import io.kafbat.ui.model.KafkaCluster;
2527
import io.kafbat.ui.model.NewConnectorDTO;
2628
import io.kafbat.ui.model.TaskDTO;
29+
import io.kafbat.ui.model.TaskIdDTO;
2730
import io.kafbat.ui.model.connect.InternalConnectorInfo;
2831
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
2932
import io.kafbat.ui.util.ReactiveFailover;
@@ -32,7 +35,6 @@
3235
import java.util.Map;
3336
import java.util.Optional;
3437
import java.util.function.Predicate;
35-
import java.util.stream.Stream;
3638
import javax.annotation.Nullable;
3739
import lombok.extern.slf4j.Slf4j;
3840
import org.springframework.stereotype.Service;
@@ -47,8 +49,6 @@ public class KafkaConnectService {
4749
private final KafkaConnectMapper kafkaConnectMapper;
4850
private final KafkaConfigSanitizer kafkaConfigSanitizer;
4951
private final ClustersProperties clustersProperties;
50-
51-
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
5252
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
5353

5454
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
@@ -57,9 +57,6 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5757
this.kafkaConnectMapper = kafkaConnectMapper;
5858
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
5959
this.clustersProperties = clustersProperties;
60-
this.cachedConnectors = Caffeine.newBuilder()
61-
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
62-
.buildAsync();
6360
this.cacheClusterInfo = Caffeine.newBuilder()
6461
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
6562
.buildAsync();
@@ -74,9 +71,10 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
7471
Flux.fromIterable(connects).flatMap(c ->
7572
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
7673
).flatMap(tuple -> (
77-
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()))
74+
getConnectConnectors(cluster, tuple.getT1())
75+
.collectList()
7876
.map(connectors ->
79-
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
77+
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), true)
8078
)
8179
)
8280
)
@@ -85,29 +83,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
8583
return Flux.fromIterable(connectClusters.orElse(List.of()))
8684
.flatMap(c ->
8785
getClusterInfo(cluster, c.getName()).map(info ->
88-
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
86+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
8987
)
9088
);
9189
}
9290
}
9391

94-
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key) {
95-
if (clustersProperties.getCache().isEnabled()) {
96-
return Mono.fromFuture(
97-
cachedConnectors.get(key, (t, e) ->
98-
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()
99-
)
100-
);
101-
} else {
102-
return getConnectConnectors(key.cluster(), key.connect()).collectList();
103-
}
104-
}
105-
10692
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
10793
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
10894
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
10995
.onErrorResume(th -> {
110-
log.error("Error on collecting cluster info" + th.getMessage(), th);
96+
log.error("Error on collecting cluster info", th);
11197
return Mono.just(new ClusterInfo());
11298
}).toFuture()
11399
));
@@ -116,17 +102,11 @@ private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectNam
116102
private Flux<InternalConnectorInfo> getConnectConnectors(
117103
KafkaCluster cluster,
118104
ClustersProperties.ConnectCluster connect) {
119-
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
120-
Mono.zip(
121-
getConnector(cluster, connect.getName(), connectorName),
122-
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
123-
).map(tuple ->
124-
InternalConnectorInfo.builder()
125-
.connector(tuple.getT1())
126-
.config(null)
127-
.tasks(tuple.getT2())
128-
.topics(null)
129-
.build()
105+
return getConnectorsWithErrorsSuppress(cluster, connect.getName()).flatMapMany(connectors ->
106+
Flux.fromStream(
107+
connectors.values().stream().map(c ->
108+
kafkaConnectMapper.fromClient(connect.getName(), c, null)
109+
)
130110
)
131111
);
132112
}
@@ -135,21 +115,20 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
135115
@Nullable final String search, Boolean fts) {
136116
return getConnects(cluster, false)
137117
.flatMap(connect ->
138-
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
139-
.flatMap(connectorName ->
140-
Mono.zip(
141-
getConnector(cluster, connect.getName(), connectorName),
142-
getConnectorConfig(cluster, connect.getName(), connectorName),
143-
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
144-
getConnectorTopics(cluster, connect.getName(), connectorName)
145-
).map(tuple ->
146-
InternalConnectorInfo.builder()
147-
.connector(tuple.getT1())
148-
.config(tuple.getT2())
149-
.tasks(tuple.getT3())
150-
.topics(tuple.getT4().getTopics())
151-
.build())))
152-
.map(kafkaConnectMapper::fullConnectorInfo)
118+
getConnectorsWithErrorsSuppress(cluster, connect.getName())
119+
.flatMapMany(connectors ->
120+
Flux.fromIterable(connectors.entrySet())
121+
.flatMap(e ->
122+
getConnectorTopics(
123+
cluster,
124+
connect.getName(),
125+
e.getKey()
126+
).map(topics ->
127+
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
128+
)
129+
)
130+
)
131+
).map(kafkaConnectMapper::fullConnectorInfo)
153132
.collectList()
154133
.map(lst -> filterConnectors(lst, search, fts))
155134
.flatMapMany(Flux::fromIterable);
@@ -165,14 +144,6 @@ private List<FullConnectorInfoDTO> filterConnectors(
165144
return filter.find(search);
166145
}
167146

168-
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
169-
return Stream.of(
170-
fullConnectorInfo.getName(),
171-
fullConnectorInfo.getConnect(),
172-
fullConnectorInfo.getStatus().getState().getValue(),
173-
fullConnectorInfo.getType().getValue());
174-
}
175-
176147
public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
177148
String connectorName) {
178149
return api(cluster, connectClusterName)
@@ -183,15 +154,17 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con
183154
.onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
184155
}
185156

186-
public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
157+
public Mono<Map<String, ExpandedConnector>> getConnectors(KafkaCluster cluster, String connectName) {
187158
return api(cluster, connectName)
188-
.mono(client -> client.getConnectors(null))
189-
.flatMapMany(Flux::fromIterable);
159+
.mono(client ->
160+
client.getConnectors(null, List.of(ConnectorExpand.INFO, ConnectorExpand.STATUS))
161+
);
190162
}
191163

192164
// returns empty flux if there was an error communicating with Connect
193-
public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
194-
return getConnectorNames(cluster, connectName).onErrorComplete();
165+
public Mono<Map<String, ExpandedConnector>> getConnectorsWithErrorsSuppress(
166+
KafkaCluster cluster, String connectName) {
167+
return getConnectors(cluster, connectName).onErrorComplete();
195168
}
196169

197170
public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
@@ -216,8 +189,8 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
216189

217190
private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
218191
String connectorName) {
219-
return getConnectorNames(cluster, connectName)
220-
.any(name -> name.equals(connectorName));
192+
return getConnectors(cluster, connectName)
193+
.map(m -> m.containsKey(connectorName));
221194
}
222195

223196
public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
@@ -306,8 +279,11 @@ private Mono<Void> restartTasks(KafkaCluster cluster, String connectName,
306279
return getConnectorTasks(cluster, connectName, connectorName)
307280
.filter(taskFilter)
308281
.flatMap(t ->
309-
restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask()))
310-
.then();
282+
restartConnectorTask(
283+
cluster, connectName, connectorName,
284+
Optional.ofNullable(t.getId()).map(TaskIdDTO::getTask).orElseThrow()
285+
)
286+
).then();
311287
}
312288

313289
public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
@@ -318,8 +294,9 @@ public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName,
318294
.map(kafkaConnectMapper::fromClient)
319295
.flatMap(task ->
320296
client
321-
.getConnectorTaskStatus(connectorName, task.getId().getTask())
322-
.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
297+
.getConnectorTaskStatus(connectorName,
298+
Optional.ofNullable(task.getId()).map(TaskIdDTO::getTask).orElseThrow()
299+
).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
323300
.map(kafkaConnectMapper::fromClient)
324301
.map(task::status)
325302
));
@@ -372,6 +349,4 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
372349
.formatted(connectorName, connectName));
373350
});
374351
}
375-
376-
record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
377352
}

0 commit comments

Comments
 (0)