Skip to content

Commit bf40f50

Browse files
authored
Merge branch 'main' into issues/1324
2 parents 7b4f2f5 + b491951 commit bf40f50

38 files changed

+1205
-98
lines changed

.github/workflows/e2e-playwright-run.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ jobs:
3232
uses: actions/setup-node@v3
3333
with:
3434
node-version: 18
35+
cache-dependency-path: ./e2e-playwright/package-lock.json
3536
cache: 'npm'
3637

3738
- name: Install NPM dependencies

api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies {
5454
antlr libs.antlr
5555
implementation libs.antlr.runtime
5656

57+
implementation libs.lucene
58+
implementation libs.lucene.queryparser
59+
implementation libs.lucene.analysis.common
60+
5761
implementation libs.opendatadiscovery.oddrn
5862
implementation(libs.opendatadiscovery.client) {
5963
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
@@ -74,6 +78,7 @@ dependencies {
7478
// END Fixes https://www.cve.org/CVERecord?id=CVE-2025-58056 and https://www.cve.org/CVERecord?id=CVE-2025-58057
7579
// CVE Fixes End
7680

81+
7782
implementation libs.modelcontextprotocol.spring.webflux
7883
implementation libs.victools.jsonschema.generator
7984

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44+
ClusterFtsProperties fts = new ClusterFtsProperties();
4445

4546
@Data
4647
public static class Cluster {
@@ -217,6 +218,25 @@ public static class CacheProperties {
217218
Duration connectClusterCacheExpiry = Duration.ofHours(24);
218219
}
219220

221+
@Data
222+
@NoArgsConstructor
223+
@AllArgsConstructor
224+
public static class NgramProperties {
225+
int ngramMin = 1;
226+
int ngramMax = 4;
227+
}
228+
229+
@Data
230+
@NoArgsConstructor
231+
@AllArgsConstructor
232+
public static class ClusterFtsProperties {
233+
boolean enabled = false;
234+
NgramProperties schemas = new NgramProperties(1, 4);
235+
NgramProperties consumers = new NgramProperties(1, 4);
236+
NgramProperties connect = new NgramProperties(1, 4);
237+
NgramProperties acl = new NgramProperties(1, 4);
238+
}
239+
220240
@PostConstruct
221241
public void validateAndSetDefaults() {
222242
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
package io.kafbat.ui.controller;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.api.model.SchemaColumnsToSort;
5+
import io.kafbat.ui.config.ClustersProperties;
66
import io.kafbat.ui.exception.ValidationException;
77
import io.kafbat.ui.mapper.KafkaSrMapper;
88
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
99
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
1010
import io.kafbat.ui.model.CompatibilityLevelDTO;
11+
import io.kafbat.ui.model.InternalTopic;
1112
import io.kafbat.ui.model.KafkaCluster;
1213
import io.kafbat.ui.model.NewSchemaSubjectDTO;
14+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1315
import io.kafbat.ui.model.SchemaSubjectDTO;
1416
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
17+
import io.kafbat.ui.model.SortOrderDTO;
1518
import io.kafbat.ui.model.rbac.AccessContext;
1619
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1720
import io.kafbat.ui.service.SchemaRegistryService;
21+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
22+
import io.kafbat.ui.service.index.SchemasFilter;
1823
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.util.Comparator;
1925
import java.util.List;
2026
import java.util.Map;
2127
import javax.validation.Valid;
2228
import lombok.RequiredArgsConstructor;
2329
import lombok.extern.slf4j.Slf4j;
24-
import org.apache.commons.lang3.StringUtils;
2530
import org.springframework.http.ResponseEntity;
2631
import org.springframework.web.bind.annotation.RestController;
2732
import org.springframework.web.server.ServerWebExchange;
@@ -38,6 +43,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
3843
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
3944

4045
private final SchemaRegistryService schemaRegistryService;
46+
private final ClustersProperties clustersProperties;
4147

4248
@Override
4349
protected KafkaCluster getCluster(String clusterName) {
@@ -208,12 +214,16 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
208214
@Valid Integer pageNum,
209215
@Valid Integer perPage,
210216
@Valid String search,
217+
SchemaColumnsToSortDTO orderBy,
218+
SortOrderDTO sortOrder,
211219
ServerWebExchange serverWebExchange) {
212220
var context = AccessContext.builder()
213221
.cluster(clusterName)
214222
.operationName("getSchemas")
215223
.build();
216224

225+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
226+
217227
return schemaRegistryService
218228
.getAllSubjectNames(getCluster(clusterName))
219229
.flatMapIterable(l -> l)
@@ -222,23 +232,72 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222232
.flatMap(subjects -> {
223233
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224234
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
226-
.stream()
227-
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
235+
236+
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
237+
List<String> filteredSubjects = filter.find(search);
238+
229239
var totalPages = (filteredSubjects.size() / pageSize)
230240
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231-
List<String> subjectsToRender = filteredSubjects.stream()
232-
.skip(subjectToSkip)
233-
.limit(pageSize)
234-
.toList();
235-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
236-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
241+
242+
List<String> subjectsToRetrieve;
243+
boolean paginate = true;
244+
var schemaComparator = getComparatorForSchema(orderBy);
245+
final Comparator<SubjectWithCompatibilityLevel> comparator =
246+
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
247+
? schemaComparator : schemaComparator.reversed();
248+
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
249+
if (SortOrderDTO.DESC.equals(sortOrder)) {
250+
filteredSubjects.sort(Comparator.reverseOrder());
251+
}
252+
subjectsToRetrieve = filteredSubjects.stream()
253+
.skip(subjectToSkip)
254+
.limit(pageSize)
255+
.toList();
256+
paginate = false;
257+
} else {
258+
subjectsToRetrieve = filteredSubjects;
259+
}
260+
261+
final boolean shouldPaginate = paginate;
262+
263+
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRetrieve, pageSize)
264+
.map(subjs ->
265+
paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip)
266+
).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
237267
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
238268
}).map(ResponseEntity::ok)
239269
.doOnEach(sig -> audit(context, sig));
240270
}
241271

272+
private List<SubjectWithCompatibilityLevel> paginateSchemas(
273+
List<SubjectWithCompatibilityLevel> subjects,
274+
Comparator<SubjectWithCompatibilityLevel> comparator,
275+
boolean paginate,
276+
int pageSize,
277+
int subjectToSkip) {
278+
subjects.sort(comparator);
279+
if (paginate) {
280+
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
281+
} else {
282+
return subjects;
283+
}
284+
}
285+
286+
private Comparator<SubjectWithCompatibilityLevel> getComparatorForSchema(
287+
@Valid SchemaColumnsToSortDTO orderBy) {
288+
var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
289+
if (orderBy == null) {
290+
return defaultComparator;
291+
}
292+
return switch (orderBy) {
293+
case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
294+
case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId);
295+
case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType);
296+
case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility);
297+
default -> defaultComparator;
298+
};
299+
}
300+
242301
@Override
243302
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
244303
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.apache.commons.lang3.Strings.CI;
1111

1212
import io.kafbat.ui.api.TopicsApi;
13+
import io.kafbat.ui.config.ClustersProperties;
1314
import io.kafbat.ui.mapper.ClusterMapper;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
@@ -37,7 +38,6 @@
3738
import javax.validation.Valid;
3839
import lombok.RequiredArgsConstructor;
3940
import lombok.extern.slf4j.Slf4j;
40-
import org.apache.commons.lang3.StringUtils;
4141
import org.springframework.http.HttpStatus;
4242
import org.springframework.http.ResponseEntity;
4343
import org.springframework.web.bind.annotation.RestController;
@@ -55,6 +55,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5555
private final TopicsService topicsService;
5656
private final TopicAnalysisService topicAnalysisService;
5757
private final ClusterMapper clusterMapper;
58+
private final ClustersProperties clustersProperties;
5859

5960
@Override
6061
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -181,23 +182,23 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
181182
.operationName("getTopics")
182183
.build();
183184

184-
return topicsService.getTopicsForPagination(getCluster(clusterName))
185+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185186
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186187
.flatMap(topics -> {
187188
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188189
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
190+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
191+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
189192
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
190-
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
191-
List<InternalTopic> filtered = topics.stream()
192-
.filter(topic -> !topic.isInternal()
193-
|| showInternal != null && showInternal)
194-
.filter(topic -> search == null || CI.contains(topic.getName(), search))
195-
.sorted(comparator)
196-
.toList();
193+
? comparatorForTopic : comparatorForTopic.reversed();
194+
195+
List<InternalTopic> filtered = topics.stream().sorted(comparator).toList();
196+
197197
var totalPages = (filtered.size() / pageSize)
198198
+ (filtered.size() % pageSize == 0 ? 0 : 1);
199199

200200
List<String> topicsPage = filtered.stream()
201+
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
201202
.skip(topicsToSkip)
202203
.limit(pageSize)
203204
.map(InternalTopic::getName)
@@ -348,9 +349,12 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
348349
}
349350

350351
private Comparator<InternalTopic> getComparatorForTopic(
351-
TopicColumnsToSortDTO orderBy) {
352+
TopicColumnsToSortDTO orderBy,
353+
boolean ftsEnabled) {
352354
var defaultComparator = Comparator.comparing(InternalTopic::getName);
353-
if (orderBy == null) {
355+
if (orderBy == null && ftsEnabled) {
356+
return (o1, o2) -> 0;
357+
} else if (orderBy == null) {
354358
return defaultComparator;
355359
}
356360
return switch (orderBy) {

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public class InternalTopic {
3838
private final long segmentSize;
3939
private final long segmentCount;
4040

41+
42+
public InternalTopic withMetrics(Metrics metrics) {
43+
var builder = toBuilder();
44+
if (metrics != null) {
45+
builder.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(this.name));
46+
builder.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(this.name));
47+
}
48+
return builder.build();
49+
}
50+
4151
public static InternalTopic from(TopicDescription topicDescription,
4252
List<ConfigEntry> configs,
4353
InternalPartitionsOffsets partitionsOffsets,
@@ -113,8 +123,10 @@ public static InternalTopic from(TopicDescription topicDescription,
113123
topic.segmentSize(stats.getSegmentSize());
114124
});
115125

116-
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
117-
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
126+
if (metrics != null) {
127+
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
128+
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
129+
}
118130

119131
topic.topicConfigs(
120132
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
@Value
1313
@Builder(toBuilder = true)
14-
public class Statistics {
14+
public class Statistics implements AutoCloseable {
1515
ServerStatusDTO status;
1616
Throwable lastKafkaException;
1717
String version;
@@ -46,4 +46,11 @@ public Stream<TopicDescription> topicDescriptions() {
4646
public Statistics withClusterState(UnaryOperator<ScrapedClusterState> stateUpdate) {
4747
return toBuilder().clusterState(stateUpdate.apply(clusterState)).build();
4848
}
49+
50+
@Override
51+
public void close() throws Exception {
52+
if (clusterState != null) {
53+
clusterState.close();
54+
}
55+
}
4956
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package io.kafbat.ui.service;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import com.google.common.collect.Streams;
64
import com.google.common.collect.Table;
5+
import io.kafbat.ui.config.ClustersProperties;
76
import io.kafbat.ui.emitter.EnhancedConsumer;
87
import io.kafbat.ui.model.ConsumerGroupOrderingDTO;
98
import io.kafbat.ui.model.InternalConsumerGroup;
109
import io.kafbat.ui.model.InternalTopicConsumerGroup;
1110
import io.kafbat.ui.model.KafkaCluster;
1211
import io.kafbat.ui.model.SortOrderDTO;
12+
import io.kafbat.ui.service.index.ConsumerGroupFilter;
1313
import io.kafbat.ui.service.rbac.AccessControlService;
1414
import io.kafbat.ui.util.ApplicationMetrics;
1515
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
@@ -25,7 +25,6 @@
2525
import java.util.stream.Stream;
2626
import javax.annotation.Nullable;
2727
import lombok.RequiredArgsConstructor;
28-
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
3029
import org.apache.kafka.clients.admin.ConsumerGroupListing;
3130
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -41,6 +40,7 @@ public class ConsumerGroupService {
4140

4241
private final AdminClientService adminClientService;
4342
private final AccessControlService accessControlService;
43+
private final ClustersProperties clustersProperties;
4444

4545
private Mono<List<InternalConsumerGroup>> getConsumerGroups(
4646
ReactiveAdminClient ac,
@@ -114,11 +114,7 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
114114
SortOrderDTO sortOrderDto) {
115115
return adminClientService.get(cluster).flatMap(ac ->
116116
ac.listConsumerGroups()
117-
.map(listing -> search == null
118-
? listing
119-
: listing.stream()
120-
.filter(g -> CI.contains(g.groupId(), search))
121-
.toList()
117+
.map(listing -> filterGroups(listing, search)
122118
)
123119
.flatMapIterable(lst -> lst)
124120
.filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
@@ -131,6 +127,12 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
131127
(allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
132128
}
133129

130+
private Collection<ConsumerGroupListing> filterGroups(Collection<ConsumerGroupListing> groups, String search) {
131+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
132+
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.isEnabled(), fts.getConsumers());
133+
return filter.find(search, false);
134+
}
135+
134136
private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
135137
List<ConsumerGroupListing> groups,
136138
int pageNum,

0 commit comments

Comments
 (0)