Skip to content

Commit ad050f6

Browse files
authored
Merge branch 'main' into issues/1312
2 parents 08a3431 + a87278a commit ad050f6

File tree

58 files changed

+1469
-183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1469
-183
lines changed

.github/dependabot.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ updates:
2020
update-types:
2121
- "patch"
2222
- "minor"
23+
exclude-patterns:
24+
- "org.springframework.boot:*"
25+
- "io.spring.dependency-management"
26+
# All netty references are temporary overwrites that must be set carefully
27+
# We do not need dependabot to send pull requests
28+
- "io.netty:*"
2329
other-dependencies:
2430
exclude-patterns:
2531
- "org.springframework.boot:*"
2632
- "io.spring.dependency-management"
33+
# All netty references are temporary overwrites that must be set carefully
34+
# We do not need dependabot to send pull requests
35+
- "io.netty:*"
2736
patterns:
2837
- "*"
2938
update-types:
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
name: "E2E: Playwright Manual run"
2+
on:
3+
workflow_dispatch:
4+
inputs:
5+
sha:
6+
description: Commit to run on
7+
required: true
8+
type: string
9+
10+
permissions:
11+
contents: read
12+
checks: write
13+
statuses: write
14+
15+
jobs:
16+
build-and-test:
17+
uses: ./.github/workflows/e2e-playwright-run.yml
18+
secrets: inherit
19+
with:
20+
sha: ${{ inputs.sha }}
21+
22+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: "Playwright E2E: PR tests"
2+
on:
3+
pull_request:
4+
types: [ "opened", "reopened", "synchronize" ]
5+
paths:
6+
- "build.gradle"
7+
- "gradle.properties"
8+
- "settings.gradle"
9+
- "gradle/libs.versions.toml"
10+
11+
- "contract/**"
12+
- "api/**"
13+
- "serde-api/**"
14+
- "frontend/**"
15+
- "e2e-playwright/**"
16+
17+
permissions:
18+
contents: read
19+
checks: write
20+
statuses: write
21+
22+
jobs:
23+
build-and-test:
24+
uses: ./.github/workflows/e2e-playwright-run.yml
25+
secrets: inherit
26+
with:
27+
sha: ${{ github.event.pull_request.head.sha }}

.github/workflows/e2e-playwright.yml renamed to .github/workflows/e2e-playwright-run.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
name: "E2E: Playwright Manual run"
1+
name: "E2E: Playwright"
2+
23
on:
3-
workflow_dispatch:
4-
sha:
5-
required: true
6-
type: string
4+
workflow_call:
5+
inputs:
6+
sha:
7+
required: true
8+
type: string
79

810
permissions:
911
contents: read
@@ -30,6 +32,8 @@ jobs:
3032
uses: actions/setup-node@v3
3133
with:
3234
node-version: 18
35+
cache-dependency-path: ./e2e-playwright/package-lock.json
36+
cache: 'npm'
3337

3438
- name: Install NPM dependencies
3539
working-directory: ./e2e-playwright

api/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies {
5454
antlr libs.antlr
5555
implementation libs.antlr.runtime
5656

57+
implementation libs.lucene
58+
implementation libs.lucene.queryparser
59+
implementation libs.lucene.analysis.common
60+
5761
implementation libs.opendatadiscovery.oddrn
5862
implementation(libs.opendatadiscovery.client) {
5963
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
@@ -68,8 +72,13 @@ dependencies {
6872
// CVE Fixes
6973
implementation libs.apache.commons.compress
7074
implementation libs.okhttp3.logging.intercepter
75+
// START Fixes https://www.cve.org/CVERecord?id=CVE-2025-58056 and https://www.cve.org/CVERecord?id=CVE-2025-58057
76+
implementation libs.netty.codec
77+
implementation libs.netty.codec.http
78+
// END Fixes https://www.cve.org/CVERecord?id=CVE-2025-58056 and https://www.cve.org/CVERecord?id=CVE-2025-58057
7179
// CVE Fixes End
7280

81+
7382
implementation libs.modelcontextprotocol.spring.webflux
7483
implementation libs.victools.jsonschema.generator
7584

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44+
ClusterFtsProperties fts = new ClusterFtsProperties();
4445

4546
@Data
4647
public static class Cluster {
@@ -217,6 +218,25 @@ public static class CacheProperties {
217218
Duration connectClusterCacheExpiry = Duration.ofHours(24);
218219
}
219220

221+
@Data
222+
@NoArgsConstructor
223+
@AllArgsConstructor
224+
public static class NgramProperties {
225+
int ngramMin = 1;
226+
int ngramMax = 4;
227+
}
228+
229+
@Data
230+
@NoArgsConstructor
231+
@AllArgsConstructor
232+
public static class ClusterFtsProperties {
233+
boolean enabled = false;
234+
NgramProperties schemas = new NgramProperties(1, 4);
235+
NgramProperties consumers = new NgramProperties(1, 4);
236+
NgramProperties connect = new NgramProperties(1, 4);
237+
NgramProperties acl = new NgramProperties(1, 4);
238+
}
239+
220240
@PostConstruct
221241
public void validateAndSetDefaults() {
222242
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
package io.kafbat.ui.controller;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.api.model.SchemaColumnsToSort;
5+
import io.kafbat.ui.config.ClustersProperties;
66
import io.kafbat.ui.exception.ValidationException;
77
import io.kafbat.ui.mapper.KafkaSrMapper;
88
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
99
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
1010
import io.kafbat.ui.model.CompatibilityLevelDTO;
11+
import io.kafbat.ui.model.InternalTopic;
1112
import io.kafbat.ui.model.KafkaCluster;
1213
import io.kafbat.ui.model.NewSchemaSubjectDTO;
14+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1315
import io.kafbat.ui.model.SchemaSubjectDTO;
1416
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
17+
import io.kafbat.ui.model.SortOrderDTO;
1518
import io.kafbat.ui.model.rbac.AccessContext;
1619
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1720
import io.kafbat.ui.service.SchemaRegistryService;
21+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
22+
import io.kafbat.ui.service.index.SchemasFilter;
1823
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.util.Comparator;
1925
import java.util.List;
2026
import java.util.Map;
2127
import javax.validation.Valid;
2228
import lombok.RequiredArgsConstructor;
2329
import lombok.extern.slf4j.Slf4j;
24-
import org.apache.commons.lang3.StringUtils;
2530
import org.springframework.http.ResponseEntity;
2631
import org.springframework.web.bind.annotation.RestController;
2732
import org.springframework.web.server.ServerWebExchange;
@@ -38,6 +43,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
3843
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
3944

4045
private final SchemaRegistryService schemaRegistryService;
46+
private final ClustersProperties clustersProperties;
4147

4248
@Override
4349
protected KafkaCluster getCluster(String clusterName) {
@@ -208,12 +214,16 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
208214
@Valid Integer pageNum,
209215
@Valid Integer perPage,
210216
@Valid String search,
217+
SchemaColumnsToSortDTO orderBy,
218+
SortOrderDTO sortOrder,
211219
ServerWebExchange serverWebExchange) {
212220
var context = AccessContext.builder()
213221
.cluster(clusterName)
214222
.operationName("getSchemas")
215223
.build();
216224

225+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
226+
217227
return schemaRegistryService
218228
.getAllSubjectNames(getCluster(clusterName))
219229
.flatMapIterable(l -> l)
@@ -222,23 +232,72 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222232
.flatMap(subjects -> {
223233
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224234
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
226-
.stream()
227-
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
235+
236+
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
237+
List<String> filteredSubjects = filter.find(search);
238+
229239
var totalPages = (filteredSubjects.size() / pageSize)
230240
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231-
List<String> subjectsToRender = filteredSubjects.stream()
232-
.skip(subjectToSkip)
233-
.limit(pageSize)
234-
.toList();
235-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
236-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
241+
242+
List<String> subjectsToRetrieve;
243+
boolean paginate = true;
244+
var schemaComparator = getComparatorForSchema(orderBy);
245+
final Comparator<SubjectWithCompatibilityLevel> comparator =
246+
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
247+
? schemaComparator : schemaComparator.reversed();
248+
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
249+
if (SortOrderDTO.DESC.equals(sortOrder)) {
250+
filteredSubjects.sort(Comparator.reverseOrder());
251+
}
252+
subjectsToRetrieve = filteredSubjects.stream()
253+
.skip(subjectToSkip)
254+
.limit(pageSize)
255+
.toList();
256+
paginate = false;
257+
} else {
258+
subjectsToRetrieve = filteredSubjects;
259+
}
260+
261+
final boolean shouldPaginate = paginate;
262+
263+
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRetrieve, pageSize)
264+
.map(subjs ->
265+
paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip)
266+
).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
237267
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
238268
}).map(ResponseEntity::ok)
239269
.doOnEach(sig -> audit(context, sig));
240270
}
241271

272+
private List<SubjectWithCompatibilityLevel> paginateSchemas(
273+
List<SubjectWithCompatibilityLevel> subjects,
274+
Comparator<SubjectWithCompatibilityLevel> comparator,
275+
boolean paginate,
276+
int pageSize,
277+
int subjectToSkip) {
278+
subjects.sort(comparator);
279+
if (paginate) {
280+
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
281+
} else {
282+
return subjects;
283+
}
284+
}
285+
286+
private Comparator<SubjectWithCompatibilityLevel> getComparatorForSchema(
287+
@Valid SchemaColumnsToSortDTO orderBy) {
288+
var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
289+
if (orderBy == null) {
290+
return defaultComparator;
291+
}
292+
return switch (orderBy) {
293+
case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
294+
case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId);
295+
case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType);
296+
case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility);
297+
default -> defaultComparator;
298+
};
299+
}
300+
242301
@Override
243302
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
244303
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT;
88
import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW;
99
import static java.util.stream.Collectors.toList;
10-
import static org.apache.commons.lang3.Strings.CI;
1110

1211
import io.kafbat.ui.api.TopicsApi;
12+
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
1414
import io.kafbat.ui.model.InternalTopic;
1515
import io.kafbat.ui.model.InternalTopicConfig;
@@ -37,7 +37,6 @@
3737
import javax.validation.Valid;
3838
import lombok.RequiredArgsConstructor;
3939
import lombok.extern.slf4j.Slf4j;
40-
import org.apache.commons.lang3.StringUtils;
4140
import org.springframework.http.HttpStatus;
4241
import org.springframework.http.ResponseEntity;
4342
import org.springframework.web.bind.annotation.RestController;
@@ -55,6 +54,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5554
private final TopicsService topicsService;
5655
private final TopicAnalysisService topicAnalysisService;
5756
private final ClusterMapper clusterMapper;
57+
private final ClustersProperties clustersProperties;
5858

5959
@Override
6060
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -181,23 +181,23 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
181181
.operationName("getTopics")
182182
.build();
183183

184-
return topicsService.getTopicsForPagination(getCluster(clusterName))
184+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185185
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186186
.flatMap(topics -> {
187187
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188188
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
189+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
190+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
189191
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
190-
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
191-
List<InternalTopic> filtered = topics.stream()
192-
.filter(topic -> !topic.isInternal()
193-
|| showInternal != null && showInternal)
194-
.filter(topic -> search == null || CI.contains(topic.getName(), search))
195-
.sorted(comparator)
196-
.toList();
192+
? comparatorForTopic : comparatorForTopic.reversed();
193+
194+
List<InternalTopic> filtered = topics.stream().sorted(comparator).toList();
195+
197196
var totalPages = (filtered.size() / pageSize)
198197
+ (filtered.size() % pageSize == 0 ? 0 : 1);
199198

200199
List<String> topicsPage = filtered.stream()
200+
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
201201
.skip(topicsToSkip)
202202
.limit(pageSize)
203203
.map(InternalTopic::getName)
@@ -348,16 +348,23 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
348348
}
349349

350350
private Comparator<InternalTopic> getComparatorForTopic(
351-
TopicColumnsToSortDTO orderBy) {
351+
TopicColumnsToSortDTO orderBy,
352+
boolean ftsEnabled) {
352353
var defaultComparator = Comparator.comparing(InternalTopic::getName);
353-
if (orderBy == null) {
354+
if (orderBy == null && ftsEnabled) {
355+
return (o1, o2) -> 0;
356+
} else if (orderBy == null) {
354357
return defaultComparator;
355358
}
356359
return switch (orderBy) {
357360
case TOTAL_PARTITIONS -> Comparator.comparing(InternalTopic::getPartitionCount);
358361
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
359362
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
360363
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
364+
case MESSAGES_COUNT -> Comparator.comparing(
365+
InternalTopic::getMessagesCount,
366+
Comparator.nullsLast(Long::compareTo)
367+
);
361368
default -> defaultComparator;
362369
};
363370
}

0 commit comments

Comments
 (0)