Skip to content

Commit bf2fce1

Browse files
committed
BE: issue-1333 optional fts
1 parent 35f6c08 commit bf2fce1

25 files changed

+141
-62
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,23 @@ public static class NgramProperties {
243243
@NoArgsConstructor
244244
@AllArgsConstructor
245245
public static class ClusterFtsProperties {
246-
boolean enabled = false;
246+
boolean enabled = true;
247+
boolean defaultEnabled = false;
247248
NgramProperties schemas = new NgramProperties(1, 4);
248249
NgramProperties consumers = new NgramProperties(1, 4);
249250
NgramProperties connect = new NgramProperties(1, 4);
250251
NgramProperties acl = new NgramProperties(1, 4);
252+
253+
public boolean use(Boolean request) {
254+
if (enabled) {
255+
if (Boolean.TRUE.equals(request)) {
256+
return true;
257+
} else if (request == null && defaultEnabled) {
258+
return true;
259+
}
260+
}
261+
return false;
262+
}
251263
}
252264

253265
@PostConstruct

api/src/main/java/io/kafbat/ui/controller/AclsController.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
@RestController
2727
@RequiredArgsConstructor
28-
public class AclsController extends AbstractController implements AclsApi, McpTool {
28+
public class AclsController extends AbstractController implements AclsApi, McpTool {
2929

3030
private final AclsService aclsService;
3131

@@ -69,6 +69,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
6969
String resourceName,
7070
KafkaAclNamePatternTypeDTO namePatternTypeDto,
7171
String search,
72+
Boolean fts,
7273
ServerWebExchange exchange) {
7374
AccessContext context = AccessContext.builder()
7475
.cluster(clusterName)
@@ -89,7 +90,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
8990
return validateAccess(context).then(
9091
Mono.just(
9192
ResponseEntity.ok(
92-
aclsService.listAcls(getCluster(clusterName), filter, search)
93+
aclsService.listAcls(getCluster(clusterName), filter, search, fts)
9394
.map(ClusterMapper::toKafkaAclDto)))
9495
).doOnEach(sig -> audit(context, sig));
9596
}

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage
128128
String search,
129129
ConsumerGroupOrderingDTO orderBy,
130130
SortOrderDTO sortOrderDto,
131+
Boolean fts,
131132
ServerWebExchange exchange) {
132133

133134
var context = AccessContext.builder()

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
129129
String search,
130130
ConnectorColumnsToSortDTO orderBy,
131131
SortOrderDTO sortOrder,
132+
Boolean fts,
132133
ServerWebExchange exchange
133134
) {
134135
var context = AccessContext.builder()
@@ -140,7 +141,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
140141
? getConnectorsComparator(orderBy)
141142
: getConnectorsComparator(orderBy).reversed();
142143

143-
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
144+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
144145
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
145146
.sort(comparator);
146147

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
217217
@Valid String search,
218218
SchemaColumnsToSortDTO orderBy,
219219
SortOrderDTO sortOrder,
220+
Boolean fts,
220221
ServerWebExchange serverWebExchange) {
221222
var context = AccessContext.builder()
222223
.cluster(clusterName)
223224
.operationName("getSchemas")
224225
.build();
225226

226-
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
227+
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
228+
boolean useFts = ftsProperties.use(fts);
227229

228230
return schemaRegistryService
229231
.getAllSubjectNames(getCluster(clusterName))
@@ -234,7 +236,7 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
234236
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
235237
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
236238

237-
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
239+
SchemasFilter filter = new SchemasFilter(subjects, useFts, ftsProperties.getSchemas());
238240
List<String> filteredSubjects = new ArrayList<>(filter.find(search));
239241

240242
var totalPages = (filteredSubjects.size() / pageSize)

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,20 +174,22 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
174174
@Valid String search,
175175
@Valid TopicColumnsToSortDTO orderBy,
176176
@Valid SortOrderDTO sortOrder,
177+
Boolean fts,
177178
ServerWebExchange exchange) {
178179

179180
AccessContext context = AccessContext.builder()
180181
.cluster(clusterName)
181182
.operationName("getTopics")
182183
.build();
183184

184-
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts)
185186
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186187
.flatMap(topics -> {
187188
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188189
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
189-
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
190-
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
190+
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
191+
boolean useFts = ftsProperties.use(fts);
192+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, useFts);
191193
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
192194
? comparatorForTopic : comparatorForTopic.reversed();
193195

api/src/main/java/io/kafbat/ui/model/ClusterFeature.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ public enum ClusterFeature {
88
KAFKA_ACL_VIEW,
99
KAFKA_ACL_EDIT,
1010
CLIENT_QUOTA_MANAGEMENT,
11-
GRAPHS_ENABLED
11+
GRAPHS_ENABLED,
12+
FTS_ENABLED,
13+
FTS_DEFAULT_ENABLED
1214
}

api/src/main/java/io/kafbat/ui/service/FeatureService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static io.kafbat.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;
44

5+
import io.kafbat.ui.config.ClustersProperties;
56
import io.kafbat.ui.model.ClusterFeature;
67
import io.kafbat.ui.model.KafkaCluster;
78
import io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;
@@ -11,6 +12,7 @@
1112
import java.util.Optional;
1213
import java.util.Set;
1314
import java.util.function.Predicate;
15+
import lombok.RequiredArgsConstructor;
1416
import lombok.extern.slf4j.Slf4j;
1517
import org.apache.kafka.common.acl.AclOperation;
1618
import org.springframework.stereotype.Service;
@@ -19,7 +21,9 @@
1921

2022
@Service
2123
@Slf4j
24+
@RequiredArgsConstructor
2225
public class FeatureService {
26+
private final ClustersProperties clustersProperties;
2327

2428
public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
2529
KafkaCluster cluster,
@@ -49,6 +53,16 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient admin
4953
features.add(aclEdit(adminClient, clusterDescription));
5054
features.add(quotaManagement(adminClient));
5155

56+
if (clustersProperties.getFts() != null) {
57+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
58+
if (fts.isEnabled()) {
59+
features.add(Mono.just(ClusterFeature.FTS_ENABLED));
60+
if (fts.isDefaultEnabled()) {
61+
features.add(Mono.just(ClusterFeature.FTS_DEFAULT_ENABLED));
62+
}
63+
}
64+
}
65+
5266
return Flux.fromIterable(features).flatMap(m -> m).collectList();
5367
}
5468

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.service;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import com.github.benmanes.caffeine.cache.AsyncCache;
64
import com.github.benmanes.caffeine.cache.Caffeine;
75
import io.kafbat.ui.config.ClustersProperties;
@@ -134,7 +132,7 @@ private Flux<InternalConnectorInfo> getConnectConnectors(
134132
}
135133

136134
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
137-
@Nullable final String search) {
135+
@Nullable final String search, Boolean fts) {
138136
return getConnects(cluster, false)
139137
.flatMap(connect ->
140138
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
@@ -153,14 +151,17 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
153151
.build())))
154152
.map(kafkaConnectMapper::fullConnectorInfo)
155153
.collectList()
156-
.map(lst -> filterConnectors(lst, search))
154+
.map(lst -> filterConnectors(lst, search, fts))
157155
.flatMapMany(Flux::fromIterable);
158156
}
159157

160-
private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
161-
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
158+
private List<FullConnectorInfoDTO> filterConnectors(
159+
List<FullConnectorInfoDTO> connectors,
160+
String search,
161+
Boolean fts) {
162+
boolean useFts = clustersProperties.getFts().use(fts);
162163
KafkaConnectNgramFilter filter =
163-
new KafkaConnectNgramFilter(connectors, fts.isEnabled(), fts.getConnect());
164+
new KafkaConnectNgramFilter(connectors, useFts, clustersProperties.getFts().getConnect());
164165
return filter.find(search);
165166
}
166167

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.kafbat.ui.model.TopicUpdateDTO;
2727
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
2828
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState.TopicState;
29-
import java.io.IOException;
3029
import java.time.Duration;
3130
import java.util.ArrayList;
3231
import java.util.Collection;
@@ -49,7 +48,6 @@
4948
import org.apache.kafka.common.errors.TopicExistsException;
5049
import org.springframework.beans.factory.annotation.Value;
5150
import org.springframework.stereotype.Service;
52-
import reactor.core.publisher.Flux;
5351
import reactor.core.publisher.Mono;
5452
import reactor.util.retry.Retry;
5553

@@ -467,13 +465,14 @@ public Mono<InternalTopic> cloneTopic(
467465
);
468466
}
469467

470-
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal) {
468+
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal,
469+
Boolean fts) {
471470
Statistics stats = statisticsCache.get(cluster);
472471
ScrapedClusterState clusterState = stats.getClusterState();
473-
472+
boolean useFts = clustersProperties.getFts().use(fts);
474473
try {
475474
return Mono.just(
476-
clusterState.getTopicIndex().find(search, showInternal, null)
475+
clusterState.getTopicIndex().find(search, showInternal, useFts, null)
477476
).flatMap(lst -> filterExisting(cluster, lst)).map(lst ->
478477
lst.stream().map(t -> t.withMetrics(stats.getMetrics())).toList()
479478
);

0 commit comments

Comments
 (0)