Skip to content

Commit df6a364

Browse files
committed
Refactoring
1 parent fd28e66 commit df6a364

22 files changed

+419
-341
lines changed

api/build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ dependencies {
7272
// CVE Fixes
7373
implementation libs.apache.commons.compress
7474
implementation libs.okhttp3.logging.intercepter
75-
implementation libs.reactor.netty.http
76-
implementation libs.netty.codec.http2
77-
// CVE Fixes End
7875

7976
implementation libs.modelcontextprotocol.spring.webflux
8077
implementation libs.victools.jsonschema.generator

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44-
FtsProperties fts = new FtsProperties();
44+
ClusterFtsProperties fts = new ClusterFtsProperties();
4545

4646
@Data
4747
public static class Cluster {
@@ -222,12 +222,21 @@ public static class CacheProperties {
222222
@NoArgsConstructor
223223
@AllArgsConstructor
224224
public static class FtsProperties {
225+
boolean ngram = true;
226+
int ngramMin = 1;
227+
int ngramMax = 4;
228+
}
229+
230+
@Data
231+
@NoArgsConstructor
232+
@AllArgsConstructor
233+
public static class ClusterFtsProperties {
225234
boolean enabled = false;
226-
boolean topicsNgramEnabled = false;
227-
int topicsMinNGram = 3;
228-
int topicsMaxNGram = 5;
229-
int filterMinNGram = 1;
230-
int filterMaxNGram = 4;
235+
FtsProperties topics = new FtsProperties(true, 3, 5);
236+
FtsProperties schemas = new FtsProperties(true, 1, 4);
237+
FtsProperties consumers = new FtsProperties(true, 1, 4);
238+
FtsProperties connect = new FtsProperties(true, 1, 4);
239+
FtsProperties acl = new FtsProperties(true, 1, 4);
231240
}
232241

233242
@PostConstruct

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
214214
.operationName("getSchemas")
215215
.build();
216216

217-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
217+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
218218

219219
return schemaRegistryService
220220
.getAllSubjectNames(getCluster(clusterName))
@@ -225,9 +225,7 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
225225
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
226226
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
227227

228-
SchemasFilter filter =
229-
new SchemasFilter(subjects, fts.getFilterMinNGram(), fts.getFilterMaxNGram(), fts.isEnabled());
230-
228+
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
231229
List<String> filteredSubjects = filter.find(search);
232230

233231
var totalPages = (filteredSubjects.size() / pageSize)

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import javax.validation.Valid;
3939
import lombok.RequiredArgsConstructor;
4040
import lombok.extern.slf4j.Slf4j;
41-
import org.apache.commons.lang3.StringUtils;
4241
import org.springframework.http.HttpStatus;
4342
import org.springframework.http.ResponseEntity;
4443
import org.springframework.web.bind.annotation.RestController;
@@ -188,19 +187,12 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
188187
.flatMap(topics -> {
189188
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
190189
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
191-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
190+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
192191
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
193192
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
194193
? comparatorForTopic : comparatorForTopic.reversed();
195194

196-
List<InternalTopic> filtered = fts.isEnabled() ? topics : topics.stream()
197-
.filter(topic -> !topic.isInternal()
198-
|| showInternal != null && showInternal)
199-
.filter(
200-
topic -> search == null || CI.contains(topic.getName(), search)
201-
)
202-
.sorted(comparator)
203-
.toList();
195+
List<InternalTopic> filtered = topics.stream().sorted(comparator).toList();
204196

205197
var totalPages = (filtered.size() / pageSize)
206198
+ (filtered.size() % pageSize == 0 ? 0 : 1);

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public class InternalTopic {
3838
private final long segmentSize;
3939
private final long segmentCount;
4040

41+
42+
public InternalTopic withMetrics(Metrics metrics) {
43+
var builder = toBuilder();
44+
if (metrics != null) {
45+
builder.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(this.name));
46+
builder.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(this.name));
47+
}
48+
return builder.build();
49+
}
50+
4151
public static InternalTopic from(TopicDescription topicDescription,
4252
List<ConfigEntry> configs,
4353
InternalPartitionsOffsets partitionsOffsets,

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.service;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import com.google.common.collect.Streams;
64
import com.google.common.collect.Table;
75
import io.kafbat.ui.config.ClustersProperties;
@@ -27,7 +25,6 @@
2725
import java.util.stream.Stream;
2826
import javax.annotation.Nullable;
2927
import lombok.RequiredArgsConstructor;
30-
import org.apache.commons.lang3.StringUtils;
3128
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
3229
import org.apache.kafka.clients.admin.ConsumerGroupListing;
3330
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -131,16 +128,9 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
131128
}
132129

133130
private Collection<ConsumerGroupListing> filterGroups(Collection<ConsumerGroupListing> groups, String search) {
134-
if (search == null || search.isBlank()) {
135-
return groups;
136-
}
137-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
138-
if (fts.isEnabled()) {
139-
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
140-
return filter.find(search);
141-
} else {
142-
return groups.stream().filter(g -> CI.contains(g.groupId(), search)).toList();
143-
}
131+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
132+
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.isEnabled(), fts.getConsumers());
133+
return filter.find(search, false);
144134
}
145135

146136
private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,10 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
158158
}
159159

160160
private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
161-
if (search == null) {
162-
return connectors;
163-
}
164-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
165-
if (fts.isEnabled()) {
166-
KafkaConnectNgramFilter filter =
167-
new KafkaConnectNgramFilter(connectors, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
168-
return filter.find(search);
169-
} else {
170-
return connectors.stream()
171-
.filter(connector -> getStringsForSearch(connector)
172-
.anyMatch(string -> CI.contains(string, search)))
173-
.toList();
174-
}
175-
161+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
162+
KafkaConnectNgramFilter filter =
163+
new KafkaConnectNgramFilter(connectors, fts.isEnabled(), fts.getConnect());
164+
return filter.find(search);
176165
}
177166

178167
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static java.util.stream.Collectors.toList;
44
import static java.util.stream.Collectors.toMap;
5-
import static org.apache.commons.lang3.Strings.CI;
65

76
import com.google.common.collect.Sets;
87
import io.kafbat.ui.config.ClustersProperties;
@@ -471,46 +470,29 @@ public Mono<InternalTopic> cloneTopic(
471470
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal) {
472471
Statistics stats = statisticsCache.get(cluster);
473472
ScrapedClusterState clusterState = stats.getClusterState();
474-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
475-
Mono<List<String>> topics;
476-
477-
Map<String, TopicState> topicStates = clusterState.getTopicStates();
478-
if (fts.isEnabled() && clusterState.getTopicIndex() != null && search != null && !search.isBlank()) {
479-
try {
480-
topics = Mono.just(clusterState.getTopicIndex().find(search, showInternal, null));
481-
} catch (IOException e) {
482-
topics = Mono.error(e);
483-
}
484-
} else {
485-
topics = Mono.just(new ArrayList<>(topicStates.keySet()));
486-
}
487473

488-
return topics.flatMap(lst -> filterExisting(cluster, lst))
489-
.flatMapMany(Flux::fromIterable)
490-
.map(topicName -> InternalTopic.from(
491-
topicStates.get(topicName).description(),
492-
topicStates.get(topicName).configs(),
493-
InternalPartitionsOffsets.empty(),
494-
stats.getMetrics(),
495-
Optional.ofNullable(topicStates.get(topicName))
496-
.map(TopicState::segmentStats).orElse(null),
497-
Optional.ofNullable(topicStates.get(topicName))
498-
.map(TopicState::partitionsSegmentStats).orElse(null),
499-
clustersProperties.getInternalTopicPrefix()
500-
)).collectList();
474+
try {
475+
return Mono.just(
476+
clusterState.getTopicIndex().find(search, showInternal, null)
477+
).flatMap(lst -> filterExisting(cluster, lst)).map(lst ->
478+
lst.stream().map(t -> t.withMetrics(stats.getMetrics())).toList()
479+
);
480+
} catch (Exception e) {
481+
return Mono.error(e);
482+
}
501483
}
502484

503485
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
504486
return adminClientService.get(cluster)
505487
.flatMap(ac -> ac.getActiveProducersState(topic));
506488
}
507489

508-
private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
490+
private Mono<List<InternalTopic>> filterExisting(KafkaCluster cluster, Collection<InternalTopic> topics) {
509491
return adminClientService.get(cluster)
510492
.flatMap(ac -> ac.listTopics(true))
511-
.map(existing -> existing
493+
.map(existing -> topics
512494
.stream()
513-
.filter(topics::contains)
495+
.filter(s -> existing.contains(s.getName()))
514496
.collect(toList()));
515497
}
516498

api/src/main/java/io/kafbat/ui/service/acl/AclsService.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import lombok.extern.slf4j.Slf4j;
3535
import org.apache.kafka.common.acl.AccessControlEntry;
3636
import org.apache.kafka.common.acl.AclBinding;
37-
import org.apache.kafka.common.acl.AclBindingFilter;
3837
import org.apache.kafka.common.acl.AclOperation;
3938
import org.apache.kafka.common.resource.Resource;
4039
import org.apache.kafka.common.resource.ResourcePattern;
@@ -84,18 +83,9 @@ public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter fil
8483
}
8584

8685
private List<AclBinding> filter(List<AclBinding> acls, String principalSearch) {
87-
if (principalSearch == null) {
88-
return acls;
89-
}
90-
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
91-
if (fts.isEnabled()) {
92-
AclBindingNgramFilter filter = new AclBindingNgramFilter(acls, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
93-
return filter.find(principalSearch);
94-
} else {
95-
return acls.stream()
96-
.filter(acl -> acl.entry().principal().contains(principalSearch))
97-
.toList();
98-
}
86+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
87+
AclBindingNgramFilter filter = new AclBindingNgramFilter(acls, fts.isEnabled(), fts.getAcl());
88+
return filter.find(principalSearch);
9989
}
10090

10191
public Mono<String> getAclAsCsvString(KafkaCluster cluster) {

api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kafbat.ui.service.index;
22

3+
import io.kafbat.ui.config.ClustersProperties;
34
import java.util.Collection;
45
import java.util.List;
56
import org.apache.kafka.common.acl.AclBinding;
@@ -10,11 +11,14 @@ public class AclBindingNgramFilter extends NgramFilter<AclBinding> {
1011
private final List<Tuple2<List<String>, AclBinding>> bindings;
1112

1213
public AclBindingNgramFilter(Collection<AclBinding> bindings) {
13-
this(bindings, 1, 4);
14+
this(bindings, true, new ClustersProperties.FtsProperties(true, 1, 4));
1415
}
1516

16-
public AclBindingNgramFilter(Collection<AclBinding> bindings, int minNGram, int maxNGram) {
17-
super(minNGram, maxNGram);
17+
public AclBindingNgramFilter(
18+
Collection<AclBinding> bindings,
19+
boolean enabled,
20+
ClustersProperties.FtsProperties properties) {
21+
super(properties, enabled);
1822
this.bindings = bindings.stream().map(g -> Tuples.of(List.of(g.entry().principal()), g)).toList();
1923
}
2024

0 commit comments

Comments
 (0)