Skip to content

Commit a27ff02

Browse files
committed
Added configs, stats and cache
1 parent e398596 commit a27ff02

File tree

13 files changed

+167
-84
lines changed

13 files changed

+167
-84
lines changed

api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525
implementation libs.spring.starter.oauth2.client
2626
implementation libs.spring.security.oauth2.resource.server
2727
implementation libs.spring.boot.actuator
28+
2829
compileOnly libs.spring.boot.devtools
2930

3031
implementation libs.spring.security.ldap
@@ -48,6 +49,7 @@ dependencies {
4849

4950
implementation libs.jackson.databind.nullable
5051
implementation libs.cel
52+
implementation libs.caffeine
5153
antlr libs.antlr
5254
implementation libs.antlr.runtime
5355

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import jakarta.validation.Valid;
66
import jakarta.validation.constraints.NotBlank;
77
import jakarta.validation.constraints.NotNull;
8+
import java.time.Duration;
89
import java.util.ArrayList;
910
import java.util.HashMap;
1011
import java.util.HashSet;
@@ -36,6 +37,8 @@ public class ClustersProperties {
3637

3738
PollingProperties polling = new PollingProperties();
3839

40+
CacheProperties cache = new CacheProperties();
41+
3942
@Data
4043
public static class Cluster {
4144
@NotBlank(message = "field name for for cluster could not be blank")
@@ -183,6 +186,14 @@ public enum LogLevel {
183186
}
184187
}
185188

189+
@Data
190+
@NoArgsConstructor
191+
@AllArgsConstructor
192+
public static class CacheProperties {
193+
boolean enabled = true;
194+
Duration connectCacheExpiry = Duration.ofMinutes(1);
195+
}
196+
186197
@PostConstruct
187198
public void validateAndSetDefaults() {
188199
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC
4545

4646
@Override
4747
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
48+
Boolean withStats,
4849
ServerWebExchange exchange) {
4950

50-
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
51-
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
51+
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(
52+
getCluster(clusterName), withStats != null ? withStats : false
53+
).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
5254

5355
return Mono.just(ResponseEntity.ok(availableConnects));
5456
}

api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.kafbat.ui.mapper;
22

3-
import io.kafbat.ui.config.ClustersProperties;
43
import io.kafbat.ui.model.BrokerConfigDTO;
54
import io.kafbat.ui.model.BrokerDTO;
65
import io.kafbat.ui.model.BrokerDiskUsageDTO;
@@ -11,11 +10,6 @@
1110
import io.kafbat.ui.model.ClusterStatsDTO;
1211
import io.kafbat.ui.model.ConfigSourceDTO;
1312
import io.kafbat.ui.model.ConfigSynonymDTO;
14-
import io.kafbat.ui.model.ConnectDTO;
15-
import io.kafbat.ui.model.ConnectorDTO;
16-
import io.kafbat.ui.model.ConnectorStateDTO;
17-
import io.kafbat.ui.model.ConnectorStatusDTO;
18-
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1913
import io.kafbat.ui.model.InternalBroker;
2014
import io.kafbat.ui.model.InternalBrokerConfig;
2115
import io.kafbat.ui.model.InternalBrokerDiskUsage;
@@ -31,16 +25,13 @@
3125
import io.kafbat.ui.model.Metrics;
3226
import io.kafbat.ui.model.PartitionDTO;
3327
import io.kafbat.ui.model.ReplicaDTO;
34-
import io.kafbat.ui.model.TaskDTO;
3528
import io.kafbat.ui.model.TopicConfigDTO;
3629
import io.kafbat.ui.model.TopicDTO;
3730
import io.kafbat.ui.model.TopicDetailsDTO;
3831
import io.kafbat.ui.model.TopicProducerStateDTO;
39-
import io.kafbat.ui.model.connect.InternalConnectorInfo;
4032
import io.kafbat.ui.service.metrics.RawMetric;
4133
import java.util.List;
4234
import java.util.Map;
43-
import java.util.Optional;
4435
import org.apache.kafka.clients.admin.ConfigEntry;
4536
import org.apache.kafka.clients.admin.ProducerState;
4637
import org.apache.kafka.common.acl.AccessControlEntry;
@@ -114,39 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
114105

115106
ReplicaDTO toReplica(InternalReplica replica);
116107

117-
default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List<InternalConnectorInfo> connectors) {
118-
int connectorCount = connectors.size();
119-
int failedConnectors = 0;
120-
int tasksCount = 0;
121-
int failedTasksCount = 0;
122-
123-
for (InternalConnectorInfo connector : connectors) {
124-
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
125-
126-
failedConnectors += internalConnector
127-
.map(ConnectorDTO::getStatus)
128-
.map(ConnectorStatusDTO::getState)
129-
.filter(ConnectorStateDTO.FAILED::equals)
130-
.map(s -> 1).orElse(0);
131-
132-
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
133-
134-
for (TaskDTO task : connector.getTasks()) {
135-
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
136-
failedTasksCount += tasksCount;
137-
}
138-
}
139-
}
140-
141-
return new ConnectDTO()
142-
.address(connect.getAddress())
143-
.name(connect.getName())
144-
.connectorsCount(connectorCount)
145-
.failedConnectorsCount(failedConnectors)
146-
.tasksCount(tasksCount)
147-
.failedTasksCount(failedTasksCount);
148-
}
149-
150108
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
151109

152110
default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package io.kafbat.ui.mapper;
22

3+
import io.kafbat.ui.config.ClustersProperties;
34
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
45
import io.kafbat.ui.connect.model.ConnectorTask;
56
import io.kafbat.ui.connect.model.NewConnector;
7+
import io.kafbat.ui.model.ConnectDTO;
68
import io.kafbat.ui.model.ConnectorDTO;
79
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
810
import io.kafbat.ui.model.ConnectorPluginDTO;
11+
import io.kafbat.ui.model.ConnectorStateDTO;
912
import io.kafbat.ui.model.ConnectorStatusDTO;
1013
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1114
import io.kafbat.ui.model.FullConnectorInfoDTO;
1215
import io.kafbat.ui.model.TaskDTO;
1316
import io.kafbat.ui.model.TaskStatusDTO;
1417
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1518
import java.util.List;
19+
import java.util.Optional;
1620
import org.mapstruct.Mapper;
1721
import org.mapstruct.Mapping;
1822

@@ -38,6 +42,50 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
3842
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
3943
connectorPluginConfigValidationResponse);
4044

45+
default ConnectDTO toKafkaConnect(
46+
ClustersProperties.ConnectCluster connect,
47+
List<InternalConnectorInfo> connectors,
48+
boolean withStats) {
49+
Integer connectorCount = null;
50+
Integer failedConnectors = null;
51+
Integer tasksCount = null;
52+
Integer failedTasksCount = null;
53+
54+
if (withStats) {
55+
connectorCount = connectors.size();
56+
failedConnectors = 0;
57+
tasksCount = 0;
58+
failedTasksCount = 0;
59+
60+
for (InternalConnectorInfo connector : connectors) {
61+
Optional<ConnectorDTO> internalConnector = Optional.ofNullable(connector.getConnector());
62+
63+
failedConnectors += internalConnector
64+
.map(ConnectorDTO::getStatus)
65+
.map(ConnectorStatusDTO::getState)
66+
.filter(ConnectorStateDTO.FAILED::equals)
67+
.map(s -> 1).orElse(0);
68+
69+
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
70+
71+
for (TaskDTO task : connector.getTasks()) {
72+
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
73+
failedTasksCount += tasksCount;
74+
}
75+
}
76+
}
77+
78+
}
79+
80+
return new ConnectDTO()
81+
.address(connect.getAddress())
82+
.name(connect.getName())
83+
.connectorsCount(connectorCount)
84+
.failedConnectorsCount(failedConnectors)
85+
.tasksCount(tasksCount)
86+
.failedTasksCount(failedTasksCount);
87+
}
88+
4189
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
4290
ConnectorDTO connector = connectInfo.getConnector();
4391
List<TaskDTO> tasks = connectInfo.getTasks();

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package io.kafbat.ui.service;
22

3+
4+
import com.github.benmanes.caffeine.cache.AsyncCache;
5+
import com.github.benmanes.caffeine.cache.Caffeine;
6+
import io.kafbat.ui.config.ClustersProperties;
37
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
48
import io.kafbat.ui.connect.model.ConnectorStatus;
59
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
@@ -23,13 +27,13 @@
2327
import io.kafbat.ui.model.TaskDTO;
2428
import io.kafbat.ui.model.connect.InternalConnectorInfo;
2529
import io.kafbat.ui.util.ReactiveFailover;
30+
import jakarta.validation.Valid;
2631
import java.util.List;
2732
import java.util.Map;
2833
import java.util.Optional;
2934
import java.util.function.Predicate;
3035
import java.util.stream.Stream;
3136
import javax.annotation.Nullable;
32-
import lombok.RequiredArgsConstructor;
3337
import lombok.extern.slf4j.Slf4j;
3438
import org.apache.commons.lang3.StringUtils;
3539
import org.springframework.stereotype.Service;
@@ -39,36 +43,67 @@
3943

4044
@Service
4145
@Slf4j
42-
@RequiredArgsConstructor
4346
public class KafkaConnectService {
4447
private final ClusterMapper clusterMapper;
4548
private final KafkaConnectMapper kafkaConnectMapper;
4649
private final KafkaConfigSanitizer kafkaConfigSanitizer;
50+
private final ClustersProperties clustersProperties;
4751

48-
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
49-
return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
50-
.map(connects -> Flux.fromIterable(connects).flatMap(connect ->
51-
getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
52-
Mono.zip(
53-
getConnector(cluster, connect.getName(), connectorName),
54-
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
55-
).map(tuple ->
56-
InternalConnectorInfo.builder()
57-
.connector(tuple.getT1())
58-
.config(null)
59-
.tasks(tuple.getT2())
60-
.topics(null)
61-
.build()
62-
)
63-
).collectList().map(connectors ->
64-
clusterMapper.toKafkaConnect(connect, connectors)
65-
)
66-
)).orElse(Flux.fromIterable(List.of()));
52+
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
53+
54+
public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
55+
KafkaConfigSanitizer kafkaConfigSanitizer,
56+
ClustersProperties clustersProperties) {
57+
this.clusterMapper = clusterMapper;
58+
this.kafkaConnectMapper = kafkaConnectMapper;
59+
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
60+
this.clustersProperties = clustersProperties;
61+
this.cachedConnectors = Caffeine.newBuilder()
62+
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
63+
.buildAsync();
64+
}
65+
66+
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
67+
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
68+
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
69+
if (withStats) {
70+
return connectClusters.map(connects -> Flux.fromIterable(connects).flatMap(connect -> (
71+
clustersProperties.getCache().isEnabled() ? Mono.fromFuture(
72+
cachedConnectors.get(new ConnectCacheKey(cluster, connect), (t, e) ->
73+
getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture())
74+
) : getConnectConnectors(cluster, connect).collectList()).map(connectors ->
75+
kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
76+
)
77+
)
78+
).orElse(Flux.fromIterable(List.of()));
79+
} else {
80+
return Flux.fromIterable(connectClusters.map(connects ->
81+
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
82+
).orElse(List.of()));
83+
}
84+
}
85+
86+
private Flux<InternalConnectorInfo> getConnectConnectors(
87+
KafkaCluster cluster,
88+
ClustersProperties.ConnectCluster connect) {
89+
return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName ->
90+
Mono.zip(
91+
getConnector(cluster, connect.getName(), connectorName),
92+
getConnectorTasks(cluster, connect.getName(), connectorName).collectList()
93+
).map(tuple ->
94+
InternalConnectorInfo.builder()
95+
.connector(tuple.getT1())
96+
.config(null)
97+
.tasks(tuple.getT2())
98+
.topics(null)
99+
.build()
100+
)
101+
);
67102
}
68103

69104
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
70105
@Nullable final String search) {
71-
return getConnects(cluster)
106+
return getConnects(cluster, false)
72107
.flatMap(connect ->
73108
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
74109
.flatMap(connectorName ->
@@ -302,4 +337,6 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
302337
.formatted(connectorName, connectName));
303338
});
304339
}
340+
341+
record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {}
305342
}

api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class ConnectorsExporter {
2424
private final KafkaConnectService kafkaConnectService;
2525

2626
Flux<DataEntityList> export(KafkaCluster cluster) {
27-
return kafkaConnectService.getConnects(cluster)
27+
return kafkaConnectService.getConnects(cluster, false)
2828
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
2929
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
3030
.flatMap(connectorDTO ->
@@ -41,7 +41,7 @@ Flux<DataEntityList> export(KafkaCluster cluster) {
4141
}
4242

4343
Flux<DataSource> getConnectDataSources(KafkaCluster cluster) {
44-
return kafkaConnectService.getConnects(cluster)
44+
return kafkaConnectService.getConnects(cluster, false)
4545
.map(ConnectorsExporter::toDataSource);
4646
}
4747

api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) {
187187
}
188188

189189
private String serializeToYaml(PropertiesStructure props) {
190-
//representer, that skips fields with null values
191-
Representer representer = new Representer(new DumperOptions()) {
192-
@Override
193-
protected NodeTuple representJavaBeanProperty(Object javaBean,
194-
Property property,
195-
Object propertyValue,
196-
Tag customTag) {
197-
if (propertyValue == null) {
198-
return null; // if value of property is null, ignore it.
199-
} else {
200-
return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag);
201-
}
202-
}
203-
};
190+
Representer representer = new YamlNullSkipRepresenter(new DumperOptions());
204191
var propertyUtils = new PropertyUtils();
205192
propertyUtils.setBeanAccess(BeanAccess.FIELD);
206193
representer.setPropertyUtils(propertyUtils);

0 commit comments

Comments
 (0)