Skip to content

Commit 776811e

Browse files
committed
ISSUE-208 Added schemas ordering backend
1 parent 4e7767b commit 776811e

File tree

6 files changed

+162
-20
lines changed

6 files changed

+162
-20
lines changed

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,25 @@
33
import static org.apache.commons.lang3.Strings.CI;
44

55
import io.kafbat.ui.api.SchemasApi;
6+
import io.kafbat.ui.api.model.SchemaColumnsToSort;
67
import io.kafbat.ui.exception.ValidationException;
78
import io.kafbat.ui.mapper.KafkaSrMapper;
89
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
910
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
1011
import io.kafbat.ui.model.CompatibilityLevelDTO;
12+
import io.kafbat.ui.model.InternalTopic;
1113
import io.kafbat.ui.model.KafkaCluster;
1214
import io.kafbat.ui.model.NewSchemaSubjectDTO;
15+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1316
import io.kafbat.ui.model.SchemaSubjectDTO;
1417
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
18+
import io.kafbat.ui.model.SortOrderDTO;
1519
import io.kafbat.ui.model.rbac.AccessContext;
1620
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1721
import io.kafbat.ui.service.SchemaRegistryService;
22+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
1823
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.util.Comparator;
1925
import java.util.List;
2026
import java.util.Map;
2127
import javax.validation.Valid;
@@ -208,6 +214,8 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
208214
@Valid Integer pageNum,
209215
@Valid Integer perPage,
210216
@Valid String search,
217+
SchemaColumnsToSortDTO orderBy,
218+
SortOrderDTO sortOrder,
211219
ServerWebExchange serverWebExchange) {
212220
var context = AccessContext.builder()
213221
.cluster(clusterName)
@@ -222,23 +230,72 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222230
.flatMap(subjects -> {
223231
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224232
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
233+
List<String> filteredSubjects = new java.util.ArrayList<>(subjects
226234
.stream()
227235
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
236+
.sorted().toList());
229237
var totalPages = (filteredSubjects.size() / pageSize)
230238
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231-
List<String> subjectsToRender = filteredSubjects.stream()
232-
.skip(subjectToSkip)
233-
.limit(pageSize)
234-
.toList();
235-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
236-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
239+
240+
List<String> subjectsToRender;
241+
boolean paginate = true;
242+
var schemaComparator = getComparatorForSchema(orderBy);
243+
final Comparator<SubjectWithCompatibilityLevel> comparator =
244+
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
245+
? schemaComparator : schemaComparator.reversed();
246+
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
247+
if (SortOrderDTO.DESC.equals(sortOrder)) {
248+
filteredSubjects.sort(Comparator.reverseOrder());
249+
}
250+
subjectsToRender = filteredSubjects.stream()
251+
.skip(subjectToSkip)
252+
.limit(pageSize)
253+
.toList();
254+
paginate = false;
255+
} else {
256+
subjectsToRender = filteredSubjects;
257+
}
258+
259+
final boolean shouldPaginate = paginate;
260+
261+
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender, pageSize)
262+
.map(subjs ->
263+
paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip)
264+
).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
237265
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
238266
}).map(ResponseEntity::ok)
239267
.doOnEach(sig -> audit(context, sig));
240268
}
241269

270+
private List<SubjectWithCompatibilityLevel> paginateSchemas(
271+
List<SubjectWithCompatibilityLevel> subjects,
272+
Comparator<SubjectWithCompatibilityLevel> comparator,
273+
boolean paginate,
274+
int pageSize,
275+
int subjectToSkip) {
276+
subjects.sort(comparator);
277+
if (paginate) {
278+
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
279+
} else {
280+
return subjects;
281+
}
282+
}
283+
284+
private Comparator<SubjectWithCompatibilityLevel> getComparatorForSchema(
285+
@Valid SchemaColumnsToSortDTO orderBy) {
286+
var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
287+
if (orderBy == null) {
288+
return defaultComparator;
289+
}
290+
return switch (orderBy) {
291+
case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
292+
case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId);
293+
case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType);
294+
case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility);
295+
default -> defaultComparator;
296+
};
297+
}
298+
242299
@Override
243300
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
244301
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,

api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
4747
}
4848

4949
public Mono<List<SubjectWithCompatibilityLevel>> getAllLatestVersionSchemas(KafkaCluster cluster,
50-
List<String> subjects) {
50+
List<String> subjects,
51+
int pageSize) {
5152
return Flux.fromIterable(subjects)
52-
.concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
53+
.flatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject), pageSize)
5354
.collect(Collectors.toList());
5455
}
5556

api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
package io.kafbat.ui.service;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.anyInt;
45
import static org.mockito.ArgumentMatchers.anyList;
56
import static org.mockito.ArgumentMatchers.isA;
67
import static org.mockito.Mockito.mock;
78
import static org.mockito.Mockito.when;
89

910
import io.kafbat.ui.controller.SchemasController;
1011
import io.kafbat.ui.model.KafkaCluster;
12+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1113
import io.kafbat.ui.model.SchemaSubjectDTO;
14+
import io.kafbat.ui.model.SortOrderDTO;
15+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
1216
import io.kafbat.ui.service.audit.AuditService;
1317
import io.kafbat.ui.sr.model.Compatibility;
1418
import io.kafbat.ui.sr.model.SchemaSubject;
19+
import io.kafbat.ui.sr.model.SchemaType;
1520
import io.kafbat.ui.util.AccessControlServiceMock;
1621
import io.kafbat.ui.util.ReactiveFailover;
1722
import java.util.Comparator;
1823
import java.util.List;
24+
import java.util.Map;
1925
import java.util.Optional;
26+
import java.util.function.Function;
27+
import java.util.stream.Collectors;
2028
import java.util.stream.IntStream;
2129
import org.junit.jupiter.api.Test;
2230
import org.mockito.Mockito;
@@ -29,19 +37,31 @@ class SchemaRegistryPaginationTest {
2937
private SchemasController controller;
3038

3139
private void init(List<String> subjects) {
40+
initWithData(subjects.stream().map(s ->
41+
new SubjectWithCompatibilityLevel(
42+
new SchemaSubject().subject(s),
43+
Compatibility.FULL
44+
)
45+
).toList());
46+
}
47+
48+
private void initWithData(List<SubjectWithCompatibilityLevel> subjects) {
3249
ClustersStorage clustersStorage = Mockito.mock(ClustersStorage.class);
3350
when(clustersStorage.getClusterByName(isA(String.class)))
3451
.thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
3552

53+
Map<String, SubjectWithCompatibilityLevel> subjectsMap = subjects.stream().collect(Collectors.toMap(
54+
SubjectWithCompatibilityLevel::getSubject,
55+
Function.identity()
56+
));
57+
3658
SchemaRegistryService schemaRegistryService = Mockito.mock(SchemaRegistryService.class);
3759
when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
38-
.thenReturn(Mono.just(subjects));
60+
.thenReturn(Mono.just(subjects.stream().map(SubjectWithCompatibilityLevel::getSubject).toList()));
3961
when(schemaRegistryService
40-
.getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
62+
.getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList(), anyInt())).thenCallRealMethod();
4163
when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
42-
.thenAnswer(a -> Mono.just(
43-
new SchemaRegistryService.SubjectWithCompatibilityLevel(
44-
new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
64+
.thenAnswer(a -> Mono.just(subjectsMap.get(a.getArgument(1))));
4565

4666
this.controller = new SchemasController(schemaRegistryService);
4767
this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
@@ -58,7 +78,7 @@ void shouldListFirst25andThen10Schemas() {
5878
.toList()
5979
);
6080
var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
61-
null, null, null, null).block();
81+
null, null, null, null, null, null).block();
6282
assertThat(schemasFirst25).isNotNull();
6383
assertThat(schemasFirst25.getBody()).isNotNull();
6484
assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
@@ -67,7 +87,7 @@ void shouldListFirst25andThen10Schemas() {
6787
.isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
6888

6989
var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
70-
null, 10, null, null).block();
90+
null, 10, null, null, null, null).block();
7191

7292
assertThat(schemasFirst10).isNotNull();
7393
assertThat(schemasFirst10.getBody()).isNotNull();
@@ -86,7 +106,7 @@ void shouldListSchemasContaining_1() {
86106
.toList()
87107
);
88108
var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
89-
null, null, "1", null).block();
109+
null, null, "1", null, null, null).block();
90110
assertThat(schemasSearch7).isNotNull();
91111
assertThat(schemasSearch7.getBody()).isNotNull();
92112
assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
@@ -102,7 +122,7 @@ void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
102122
.toList()
103123
);
104124
var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
105-
0, -1, null, null).block();
125+
0, -1, null, null, null, null).block();
106126

107127
assertThat(schemas).isNotNull();
108128
assertThat(schemas.getBody()).isNotNull();
@@ -121,7 +141,7 @@ void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
121141
);
122142

123143
var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
124-
4, 33, null, null).block();
144+
4, 33, null, null, null, null).block();
125145

126146
assertThat(schemas).isNotNull();
127147
assertThat(schemas.getBody()).isNotNull();
@@ -137,4 +157,39 @@ private KafkaCluster buildKafkaCluster(String clusterName) {
137157
.schemaRegistryClient(mock(ReactiveFailover.class))
138158
.build();
139159
}
160+
161+
@Test
162+
void shouldOrderByAndPaginate() {
163+
List<SubjectWithCompatibilityLevel> schemas = IntStream.rangeClosed(1, 100)
164+
.boxed()
165+
.map(num -> new
166+
SubjectWithCompatibilityLevel(
167+
new SchemaSubject()
168+
.subject("subject" + num)
169+
.schemaType(SchemaType.AVRO)
170+
.id(num),
171+
Compatibility.FULL
172+
)
173+
).toList();
174+
175+
initWithData(schemas);
176+
177+
var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
178+
null, null, null,
179+
SchemaColumnsToSortDTO.ID, SortOrderDTO.DESC, null
180+
).block();
181+
182+
List<String> last25OrderedById = schemas.stream()
183+
.sorted(Comparator.comparing(SubjectWithCompatibilityLevel::getId).reversed())
184+
.map(SubjectWithCompatibilityLevel::getSubject)
185+
.limit(25)
186+
.toList();
187+
188+
assertThat(schemasFirst25).isNotNull();
189+
assertThat(schemasFirst25.getBody()).isNotNull();
190+
assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
191+
assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25);
192+
assertThat(schemasFirst25.getBody().getSchemas().stream().map(SchemaSubjectDTO::getSubject).toList())
193+
.isEqualTo(last25OrderedById);
194+
}
140195
}

contract-typespec/api/schemas.tsp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "@typespec/openapi";
22
import "./responses.tsp";
3+
import "./models.tsp";
34

45
namespace Api;
56

@@ -26,6 +27,8 @@ interface SchemasApi {
2627
@query page?: int32,
2728
@query perPage?: int32,
2829
@query search?: string,
30+
@query orderBy?: SchemaColumnsToSort,
31+
@query sortOrder?: SortOrder,
2932
): SchemaSubjectsResponse;
3033

3134
@delete
@@ -172,3 +175,10 @@ model SchemaSubjectsResponse {
172175
pageCount?: int32;
173176
schemas?: SchemaSubject[];
174177
}
178+
179+
enum SchemaColumnsToSort {
180+
SUBJECT,
181+
ID,
182+
TYPE,
183+
COMPATIBILITY,
184+
}

contract-typespec/api/topics.tsp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "@typespec/openapi";
22
import "./responses.tsp";
3+
import "./models.tsp";
34

45
namespace Api;
56

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,16 @@ paths:
12081208
required: false
12091209
schema:
12101210
type: string
1211+
- name: orderBy
1212+
in: query
1213+
required: false
1214+
schema:
1215+
$ref: '#/components/schemas/SchemaColumnsToSort'
1216+
- name: sortOrder
1217+
in: query
1218+
required: false
1219+
schema:
1220+
$ref: '#/components/schemas/SortOrder'
12111221
responses:
12121222
200:
12131223
description: OK
@@ -2683,6 +2693,14 @@ components:
26832693
- REPLICATION_FACTOR
26842694
- SIZE
26852695

2696+
SchemaColumnsToSort:
2697+
type: string
2698+
enum:
2699+
- SUBJECT
2700+
- ID
2701+
- TYPE
2702+
- COMPATIBILITY
2703+
26862704
ConnectorColumnsToSort:
26872705
type: string
26882706
enum:

0 commit comments

Comments
 (0)