Skip to content

Commit c9a6d94

Browse files
committed
fts
1 parent 153c44d commit c9a6d94

25 files changed

+402
-127
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public static class CacheProperties {
222222
@NoArgsConstructor
223223
@AllArgsConstructor
224224
public static class FtsProperties {
225-
boolean enabled = true;
225+
boolean enabled = false;
226226
int topicsMinNGram = 3;
227227
int topicsMaxNGram = 5;
228228
int filterMinNGram = 1;

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.kafbat.ui.controller;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.config.ClustersProperties;
65
import io.kafbat.ui.exception.ValidationException;
76
import io.kafbat.ui.mapper.KafkaSrMapper;
87
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
@@ -15,13 +14,13 @@
1514
import io.kafbat.ui.model.rbac.AccessContext;
1615
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1716
import io.kafbat.ui.service.SchemaRegistryService;
17+
import io.kafbat.ui.service.index.SchemasFilter;
1818
import io.kafbat.ui.service.mcp.McpTool;
1919
import java.util.List;
2020
import java.util.Map;
2121
import javax.validation.Valid;
2222
import lombok.RequiredArgsConstructor;
2323
import lombok.extern.slf4j.Slf4j;
24-
import org.apache.commons.lang3.StringUtils;
2524
import org.springframework.http.ResponseEntity;
2625
import org.springframework.web.bind.annotation.RestController;
2726
import org.springframework.web.server.ServerWebExchange;
@@ -38,6 +37,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
3837
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
3938

4039
private final SchemaRegistryService schemaRegistryService;
40+
private final ClustersProperties clustersProperties;
4141

4242
@Override
4343
protected KafkaCluster getCluster(String clusterName) {
@@ -214,6 +214,8 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
214214
.operationName("getSchemas")
215215
.build();
216216

217+
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
218+
217219
return schemaRegistryService
218220
.getAllSubjectNames(getCluster(clusterName))
219221
.flatMapIterable(l -> l)
@@ -222,10 +224,12 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222224
.flatMap(subjects -> {
223225
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224226
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
226-
.stream()
227-
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
227+
228+
SchemasFilter filter =
229+
new SchemasFilter(subjects, fts.getFilterMinNGram(), fts.getFilterMaxNGram(), fts.isEnabled());
230+
231+
List<String> filteredSubjects = filter.find(search);
232+
229233
var totalPages = (filteredSubjects.size() / pageSize)
230234
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231235
List<String> subjectsToRender = filteredSubjects.stream()

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.apache.commons.lang3.Strings.CI;
1111

1212
import io.kafbat.ui.api.TopicsApi;
13+
import io.kafbat.ui.config.ClustersProperties;
1314
import io.kafbat.ui.mapper.ClusterMapper;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
@@ -55,6 +56,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5556
private final TopicsService topicsService;
5657
private final TopicAnalysisService topicAnalysisService;
5758
private final ClusterMapper clusterMapper;
59+
private final ClustersProperties clustersProperties;
5860

5961
@Override
6062
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -181,23 +183,30 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
181183
.operationName("getTopics")
182184
.build();
183185

184-
return topicsService.getTopicsForPagination(getCluster(clusterName))
186+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185187
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186188
.flatMap(topics -> {
187189
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188190
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
191+
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
192+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
189193
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
190-
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
191-
List<InternalTopic> filtered = topics.stream()
194+
? comparatorForTopic : comparatorForTopic.reversed();
195+
196+
List<InternalTopic> filtered = fts.isEnabled() ? topics : topics.stream()
192197
.filter(topic -> !topic.isInternal()
193198
|| showInternal != null && showInternal)
194-
.filter(topic -> search == null || CI.contains(topic.getName(), search))
199+
.filter(
200+
topic -> search == null || CI.contains(topic.getName(), search)
201+
)
195202
.sorted(comparator)
196203
.toList();
204+
197205
var totalPages = (filtered.size() / pageSize)
198206
+ (filtered.size() % pageSize == 0 ? 0 : 1);
199207

200208
List<String> topicsPage = filtered.stream()
209+
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
201210
.skip(topicsToSkip)
202211
.limit(pageSize)
203212
.map(InternalTopic::getName)
@@ -348,9 +357,12 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
348357
}
349358

350359
private Comparator<InternalTopic> getComparatorForTopic(
351-
TopicColumnsToSortDTO orderBy) {
360+
TopicColumnsToSortDTO orderBy,
361+
boolean ftsEnabled) {
352362
var defaultComparator = Comparator.comparing(InternalTopic::getName);
353-
if (orderBy == null) {
363+
if (orderBy == null && ftsEnabled) {
364+
return (o1, o2) -> 0;
365+
} else if (orderBy == null) {
354366
return defaultComparator;
355367
}
356368
return switch (orderBy) {

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ public static InternalTopic from(TopicDescription topicDescription,
113113
topic.segmentSize(stats.getSegmentSize());
114114
});
115115

116-
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
117-
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
116+
if (metrics != null) {
117+
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
118+
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
119+
}
118120

119121
topic.topicConfigs(
120122
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
import com.google.common.collect.Streams;
66
import com.google.common.collect.Table;
7+
import io.kafbat.ui.config.ClustersProperties;
78
import io.kafbat.ui.emitter.EnhancedConsumer;
89
import io.kafbat.ui.model.ConsumerGroupOrderingDTO;
910
import io.kafbat.ui.model.InternalConsumerGroup;
1011
import io.kafbat.ui.model.InternalTopicConsumerGroup;
1112
import io.kafbat.ui.model.KafkaCluster;
1213
import io.kafbat.ui.model.SortOrderDTO;
14+
import io.kafbat.ui.service.index.ConsumerGroupFilter;
1315
import io.kafbat.ui.service.rbac.AccessControlService;
1416
import io.kafbat.ui.util.ApplicationMetrics;
1517
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
@@ -41,6 +43,7 @@ public class ConsumerGroupService {
4143

4244
private final AdminClientService adminClientService;
4345
private final AccessControlService accessControlService;
46+
private final ClustersProperties clustersProperties;
4447

4548
private Mono<List<InternalConsumerGroup>> getConsumerGroups(
4649
ReactiveAdminClient ac,
@@ -114,11 +117,7 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
114117
SortOrderDTO sortOrderDto) {
115118
return adminClientService.get(cluster).flatMap(ac ->
116119
ac.listConsumerGroups()
117-
.map(listing -> search == null
118-
? listing
119-
: listing.stream()
120-
.filter(g -> CI.contains(g.groupId(), search))
121-
.toList()
120+
.map(listing -> filterGroups(listing, search)
122121
)
123122
.flatMapIterable(lst -> lst)
124123
.filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
@@ -131,6 +130,19 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
131130
(allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
132131
}
133132

133+
private Collection<ConsumerGroupListing> filterGroups(Collection<ConsumerGroupListing> groups, String search) {
134+
if (search == null || search.isBlank()) {
135+
return groups;
136+
}
137+
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
138+
if (fts.isEnabled()) {
139+
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
140+
return filter.find(search);
141+
} else {
142+
return groups.stream().filter(g -> CI.contains(g.groupId(), search)).toList();
143+
}
144+
}
145+
134146
private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
135147
List<ConsumerGroupListing> groups,
136148
int pageNum,

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.kafbat.ui.model.NewConnectorDTO;
2828
import io.kafbat.ui.model.TaskDTO;
2929
import io.kafbat.ui.model.connect.InternalConnectorInfo;
30+
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
3031
import io.kafbat.ui.util.ReactiveFailover;
3132
import jakarta.validation.Valid;
3233
import java.util.List;
@@ -151,15 +152,27 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
151152
.topics(tuple.getT4().getTopics())
152153
.build())))
153154
.map(kafkaConnectMapper::fullConnectorInfo)
154-
.filter(matchesSearchTerm(search));
155+
.collectList()
156+
.map(lst -> filterConnectors(lst, search))
157+
.flatMapMany(Flux::fromIterable);
155158
}
156159

157-
private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable final String search) {
160+
private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
158161
if (search == null) {
159-
return c -> true;
162+
return connectors;
163+
}
164+
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
165+
if (fts.isEnabled()) {
166+
KafkaConnectNgramFilter filter =
167+
new KafkaConnectNgramFilter(connectors, fts.getFilterMinNGram(), fts.getFilterMaxNGram());
168+
return filter.find(search);
169+
} else {
170+
return connectors.stream()
171+
.filter(connector -> getStringsForSearch(connector)
172+
.anyMatch(string -> CI.contains(string, search)))
173+
.toList();
160174
}
161-
return connector -> getStringsForSearch(connector)
162-
.anyMatch(string -> CI.contains(string, search));
175+
163176
}
164177

165178
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {

api/src/main/java/io/kafbat/ui/service/StatisticsCache.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kafbat.ui.service;
22

3+
import io.kafbat.ui.config.ClustersProperties;
34
import io.kafbat.ui.model.InternalPartitionsOffsets;
45
import io.kafbat.ui.model.KafkaCluster;
56
import io.kafbat.ui.model.ServerStatusDTO;
@@ -31,11 +32,14 @@ public synchronized void replace(KafkaCluster c, Statistics stats) {
3132
public synchronized void update(KafkaCluster c,
3233
Map<String, TopicDescription> descriptions,
3334
Map<String, List<ConfigEntry>> configs,
34-
InternalPartitionsOffsets partitionsOffsets) {
35+
InternalPartitionsOffsets partitionsOffsets,
36+
ClustersProperties clustersProperties) {
3537
var stats = get(c);
3638
replace(
3739
c,
38-
stats.withClusterState(s -> s.updateTopics(descriptions, configs, partitionsOffsets))
40+
stats.withClusterState(s ->
41+
s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties)
42+
)
3943
);
4044
try {
4145
if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) {

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;
44

5+
import io.kafbat.ui.config.ClustersProperties;
56
import io.kafbat.ui.model.ClusterFeature;
67
import io.kafbat.ui.model.KafkaCluster;
78
import io.kafbat.ui.model.Metrics;
@@ -22,6 +23,7 @@ public class StatisticsService {
2223
private final AdminClientService adminClientService;
2324
private final FeatureService featureService;
2425
private final StatisticsCache cache;
26+
private final ClustersProperties clustersProperties;
2527

2628
public Mono<Statistics> updateCache(KafkaCluster c) {
2729
return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
@@ -62,7 +64,7 @@ private Statistics createStats(ClusterDescription description,
6264

6365
private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
6466
ReactiveAdminClient ac) {
65-
return ScrapedClusterState.scrape(clusterDescription, ac);
67+
return ScrapedClusterState.scrape(clusterDescription, ac, clustersProperties);
6668
}
6769

6870
private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static java.util.stream.Collectors.toList;
44
import static java.util.stream.Collectors.toMap;
5+
import static org.apache.commons.lang3.Strings.CI;
56

67
import com.google.common.collect.Sets;
78
import io.kafbat.ui.config.ClustersProperties;
@@ -26,6 +27,7 @@
2627
import io.kafbat.ui.model.TopicUpdateDTO;
2728
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
2829
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
30+
import java.io.IOException;
2931
import java.time.Duration;
3032
import java.util.ArrayList;
3133
import java.util.Collection;
@@ -48,6 +50,7 @@
4850
import org.apache.kafka.common.errors.TopicExistsException;
4951
import org.springframework.beans.factory.annotation.Value;
5052
import org.springframework.stereotype.Service;
53+
import reactor.core.publisher.Flux;
5154
import reactor.core.publisher.Mono;
5255
import reactor.util.retry.Retry;
5356

@@ -76,7 +79,7 @@ public Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics)
7679
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false),
7780
(descriptions, configs) ->
7881
getPartitionOffsets(descriptions, ac).map(offsets -> {
79-
statisticsCache.update(c, descriptions, configs, offsets);
82+
statisticsCache.update(c, descriptions, configs, offsets, clustersProperties);
8083
var stats = statisticsCache.get(c);
8184
return createList(
8285
topics,
@@ -465,23 +468,36 @@ public Mono<InternalTopic> cloneTopic(
465468
);
466469
}
467470

468-
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
471+
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal) {
469472
Statistics stats = statisticsCache.get(cluster);
470-
Map<String, TopicState> topicStates = stats.getClusterState().getTopicStates();
471-
return filterExisting(cluster, topicStates.keySet())
472-
.map(lst -> lst.stream()
473-
.map(topicName ->
474-
InternalTopic.from(
475-
topicStates.get(topicName).description(),
476-
topicStates.get(topicName).configs(),
477-
InternalPartitionsOffsets.empty(),
478-
stats.getMetrics(),
479-
Optional.ofNullable(topicStates.get(topicName))
480-
.map(TopicState::segmentStats).orElse(null),
481-
Optional.ofNullable(topicStates.get(topicName))
482-
.map(TopicState::partitionsSegmentStats).orElse(null),
483-
clustersProperties.getInternalTopicPrefix()
484-
)).collect(toList()));
473+
ScrapedClusterState clusterState = stats.getClusterState();
474+
ClustersProperties.FtsProperties fts = clustersProperties.getFts();
475+
Mono<List<String>> topics;
476+
477+
Map<String, TopicState> topicStates = clusterState.getTopicStates();
478+
if (fts.isEnabled() && clusterState.getTopicIndex() != null && search != null && !search.isBlank()) {
479+
try {
480+
topics = Mono.just(clusterState.getTopicIndex().find(search, showInternal, null));
481+
} catch (IOException e) {
482+
topics = Mono.error(e);
483+
}
484+
} else {
485+
topics = Mono.just(new ArrayList<>(topicStates.keySet()));
486+
}
487+
488+
return topics.flatMap(lst -> filterExisting(cluster, lst))
489+
.flatMapMany(Flux::fromIterable)
490+
.map(topicName -> InternalTopic.from(
491+
topicStates.get(topicName).description(),
492+
topicStates.get(topicName).configs(),
493+
InternalPartitionsOffsets.empty(),
494+
stats.getMetrics(),
495+
Optional.ofNullable(topicStates.get(topicName))
496+
.map(TopicState::segmentStats).orElse(null),
497+
Optional.ofNullable(topicStates.get(topicName))
498+
.map(TopicState::partitionsSegmentStats).orElse(null),
499+
clustersProperties.getInternalTopicPrefix()
500+
)).collectList();
485501
}
486502

487503
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {

0 commit comments

Comments
 (0)