Skip to content

Commit b9e9211

Browse files
authored
Fixed Zookeeper connecion leak (#185)
1 parent ddc7845 commit b9e9211

File tree

2 files changed

+42
-14
lines changed

2 files changed

+42
-14
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.provectus.kafka.ui.model.CompatibilityLevel;
1010
import com.provectus.kafka.ui.model.NewSchemaSubject;
1111
import com.provectus.kafka.ui.model.SchemaSubject;
12+
import java.util.Formatter;
1213
import lombok.RequiredArgsConstructor;
1314
import lombok.extern.log4j.Log4j2;
1415
import org.springframework.http.HttpStatus;
@@ -51,9 +52,12 @@ public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaN
5152
.map(cluster -> webClient.get()
5253
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
5354
.retrieve()
54-
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
55-
.bodyToFlux(Integer.class))
56-
.orElse(Flux.error(new NotFoundException("No such cluster")));
55+
.onStatus(HttpStatus.NOT_FOUND::equals,
56+
resp -> Mono.error(
57+
new NotFoundException(formatted("No such schema %s"))
58+
)
59+
).bodyToFlux(Integer.class)
60+
).orElse(Flux.error(new NotFoundException("No such cluster")));
5761
}
5862

5963
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
@@ -70,8 +74,12 @@ private Mono<SchemaSubject> getSchemaSubject(String clusterName, String schemaNa
7074
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
7175
.retrieve()
7276
.onStatus(HttpStatus.NOT_FOUND::equals,
73-
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
74-
.bodyToMono(SchemaSubject.class)
77+
resp -> Mono.error(
78+
new NotFoundException(
79+
formatted("No such schema %s with version %s", schemaName, version)
80+
)
81+
)
82+
).bodyToMono(SchemaSubject.class)
7583
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
7684
.map(tuple -> {
7785
SchemaSubject schema = tuple.getT1();
@@ -97,17 +105,27 @@ private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, Strin
97105
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
98106
.retrieve()
99107
.onStatus(HttpStatus.NOT_FOUND::equals,
100-
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
101-
.toBodilessEntity())
102-
.orElse(Mono.error(new NotFoundException("No such cluster")));
108+
resp -> Mono.error(
109+
new NotFoundException(
110+
formatted("No such schema %s with version %s", schemaName, version)
111+
)
112+
)
113+
).toBodilessEntity()
114+
).orElse(Mono.error(new NotFoundException("No such cluster")));
103115
}
104116

105117
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
106118
return clustersStorage.getClusterByName(clusterName)
107119
.map(cluster -> webClient.delete()
108120
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
109121
.retrieve()
110-
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
122+
.onStatus(HttpStatus.NOT_FOUND::equals,
123+
resp -> Mono.error(
124+
new NotFoundException(
125+
formatted("No such schema %s", schemaName)
126+
)
127+
)
128+
)
111129
.toBodilessEntity())
112130
.orElse(Mono.error(new NotFoundException("No such cluster")));
113131
}
@@ -120,7 +138,9 @@ public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName,
120138
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
121139
.retrieve()
122140
.onStatus(HttpStatus.NOT_FOUND::equals,
123-
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
141+
resp -> Mono.error(
142+
new NotFoundException(formatted("No such schema %s", schemaName)))
143+
)
124144
.toEntity(SchemaSubject.class)
125145
.log())
126146
.orElse(Mono.error(new NotFoundException("No such cluster")));
@@ -142,7 +162,7 @@ public Mono<Void> updateSchemaCompatibility(String clusterName, String schemaNam
142162
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
143163
.retrieve()
144164
.onStatus(HttpStatus.NOT_FOUND::equals,
145-
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
165+
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
146166
.bodyToMono(Void.class);
147167
}).orElse(Mono.error(new NotFoundException("No such cluster")));
148168
}
@@ -181,10 +201,14 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(String cluster
181201
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
182202
.retrieve()
183203
.onStatus(HttpStatus.NOT_FOUND::equals,
184-
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
204+
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
185205
.bodyToMono(InternalCompatibilityCheck.class)
186206
.map(mapper::toCompatibilityCheckResponse)
187207
.log()
188208
).orElse(Mono.error(new NotFoundException("No such cluster")));
189209
}
210+
211+
public String formatted(String str, Object... args) {
212+
return new Formatter().format(str, args).toString();
213+
}
190214
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.provectus.kafka.ui.zookeeper;
22

33
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
4+
import java.util.concurrent.ConcurrentHashMap;
45
import lombok.RequiredArgsConstructor;
56
import lombok.extern.log4j.Log4j2;
67
import org.I0Itec.zkclient.ZkClient;
@@ -14,7 +15,7 @@
1415
@Log4j2
1516
public class ZookeeperService {
1617

17-
private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
18+
private final Map<String, ZkClient> cachedZkClient = new ConcurrentHashMap<>();
1819

1920
public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
2021
var isConnected = false;
@@ -33,7 +34,10 @@ private boolean isZkClientConnected(ZkClient zkClient) {
3334

3435
private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
3536
try {
36-
return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
37+
return cachedZkClient.computeIfAbsent(
38+
cluster.getName(),
39+
(n) -> new ZkClient(cluster.getZookeeper(), 1000)
40+
);
3741
} catch (Exception e) {
3842
log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
3943
return null;

0 commit comments

Comments
 (0)