Skip to content

Commit 6ec5163

Browse files
author
Ildar Almakaev
authored
Issue-181 Refactor schema CRUD API (#192)
* Rename attribute 'schemaName' to 'subject' for consistency with Schema Registry terms * Refactor 'GET /schemas' and 'GET .../schemas/{subject}/versions' to get the latest schemas data at once * Fix getting error code from attributes in our custom GlobalErrorWebExceptionHandler
1 parent b9e9211 commit 6ec5163

File tree

5 files changed

+106
-67
lines changed

5 files changed

+106
-67
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/GlobalErrorWebExceptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes erro
3838
private Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
3939
Map<String, Object> errorAttributes = getErrorAttributes(request, false);
4040
HttpStatus statusCode = Optional.ofNullable(errorAttributes.get(GlobalErrorAttributes.STATUS))
41-
.map(code -> (HttpStatus) code)
41+
.map(code -> code instanceof Integer ? HttpStatus.valueOf((Integer) code) : (HttpStatus) code)
4242
.orElse(HttpStatus.BAD_REQUEST);
4343
return ServerResponse
4444
.status(statusCode)

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.Formatter;
1313
import lombok.RequiredArgsConstructor;
1414
import lombok.extern.log4j.Log4j2;
15+
import org.springframework.core.ParameterizedTypeReference;
16+
import org.springframework.http.HttpEntity;
1517
import org.springframework.http.HttpStatus;
1618
import org.springframework.http.MediaType;
1719
import org.springframework.http.ResponseEntity;
@@ -21,6 +23,8 @@
2123
import reactor.core.publisher.Flux;
2224
import reactor.core.publisher.Mono;
2325

26+
import java.util.Arrays;
27+
import java.util.List;
2428
import java.util.Objects;
2529

2630
@Service
@@ -37,17 +41,30 @@ public class SchemaRegistryService {
3741
private final ClusterMapper mapper;
3842
private final WebClient webClient;
3943

40-
public Flux<String> getAllSchemaSubjects(String clusterName) {
44+
public Flux<SchemaSubject> getAllLatestVersionSchemas(String clusterName) {
45+
var allSubjectNames = getAllSubjectNames(clusterName);
46+
return allSubjectNames
47+
.flatMapMany(Flux::fromArray)
48+
.flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
49+
}
50+
51+
public Mono<String[]> getAllSubjectNames(String clusterName) {
4152
return clustersStorage.getClusterByName(clusterName)
4253
.map(cluster -> webClient.get()
4354
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
4455
.retrieve()
45-
.bodyToFlux(String.class)
46-
.doOnError(log::error))
47-
.orElse(Flux.error(new NotFoundException("No such cluster")));
56+
.bodyToMono(String[].class)
57+
.doOnError(log::error)
58+
)
59+
.orElse(Mono.error(new NotFoundException("No such cluster")));
60+
}
61+
62+
public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
63+
Flux<Integer> versions = getSubjectVersions(clusterName, subject);
64+
return versions.flatMap(version -> getSchemaSubjectByVersion(clusterName, subject, version));
4865
}
4966

50-
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String schemaName) {
67+
private Flux<Integer> getSubjectVersions(String clusterName, String schemaName) {
5168
return clustersStorage.getClusterByName(clusterName)
5269
.map(cluster -> webClient.get()
5370
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)

kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,29 +105,30 @@ public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroups(String cluste
105105
}
106106

107107
@Override
108-
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
109-
return schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName).map(ResponseEntity::ok);
108+
public Mono<ResponseEntity<SchemaSubject>> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
109+
return schemaRegistryService.getLatestSchemaSubject(clusterName, subject).map(ResponseEntity::ok);
110110
}
111111

112112
@Override
113-
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) {
114-
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version).map(ResponseEntity::ok);
113+
public Mono<ResponseEntity<SchemaSubject>> getSchemaByVersion(String clusterName, String subject, Integer version, ServerWebExchange exchange) {
114+
return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subject, version).map(ResponseEntity::ok);
115115
}
116116

117117
@Override
118-
public Mono<ResponseEntity<Flux<String>>> getSchemas(String clusterName, ServerWebExchange exchange) {
119-
Flux<String> subjects = schemaRegistryService.getAllSchemaSubjects(clusterName);
118+
public Mono<ResponseEntity<Flux<SchemaSubject>>> getSchemas(String clusterName, ServerWebExchange exchange) {
119+
Flux<SchemaSubject> subjects = schemaRegistryService.getAllLatestVersionSchemas(clusterName);
120120
return Mono.just(ResponseEntity.ok(subjects));
121121
}
122122

123123
@Override
124-
public Mono<ResponseEntity<Flux<Integer>>> getSchemaVersions(String clusterName, String subjectName, ServerWebExchange exchange) {
125-
return Mono.just(ResponseEntity.ok(schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName)));
124+
public Mono<ResponseEntity<Flux<SchemaSubject>>> getAllVersionsBySubject(String clusterName, String subjectName, ServerWebExchange exchange) {
125+
Flux<SchemaSubject> schemas = schemaRegistryService.getAllVersionsBySubject(clusterName, subjectName);
126+
return Mono.just(ResponseEntity.ok(schemas));
126127
}
127128

128129
@Override
129-
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) {
130-
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName);
130+
public Mono<ResponseEntity<Void>> deleteLatestSchema(String clusterName, String subject, ServerWebExchange exchange) {
131+
return schemaRegistryService.deleteLatestSchemaSubject(clusterName, subject);
131132
}
132133

133134
@Override
@@ -141,10 +142,10 @@ public Mono<ResponseEntity<Void>> deleteSchema(String clusterName, String subjec
141142
}
142143

143144
@Override
144-
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String schemaName,
145+
public Mono<ResponseEntity<SchemaSubject>> createNewSchema(String clusterName, String subject,
145146
@Valid Mono<NewSchemaSubject> newSchemaSubject,
146147
ServerWebExchange exchange) {
147-
return schemaRegistryService.createNewSubject(clusterName, schemaName, newSchemaSubject);
148+
return schemaRegistryService.createNewSubject(clusterName, subject, newSchemaSubject);
148149
}
149150

150151
@Override
@@ -172,17 +173,17 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(String cl
172173
}
173174

174175
@Override
175-
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String schemaName,
176+
public Mono<ResponseEntity<CompatibilityCheckResponse>> checkSchemaCompatibility(String clusterName, String subject,
176177
@Valid Mono<NewSchemaSubject> newSchemaSubject,
177178
ServerWebExchange exchange) {
178-
return schemaRegistryService.checksSchemaCompatibility(clusterName, schemaName, newSchemaSubject)
179+
return schemaRegistryService.checksSchemaCompatibility(clusterName, subject, newSchemaSubject)
179180
.map(ResponseEntity::ok);
180181
}
181182

182183
@Override
183-
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String schemaName, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
184-
log.info("Updating schema compatibility for schema: {}", schemaName);
185-
return schemaRegistryService.updateSchemaCompatibility(clusterName, schemaName, compatibilityLevel)
184+
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(String clusterName, String subject, @Valid Mono<CompatibilityLevel> compatibilityLevel, ServerWebExchange exchange) {
185+
log.info("Updating schema compatibility for subject: {}", subject);
186+
return schemaRegistryService.updateSchemaCompatibility(clusterName, subject, compatibilityLevel)
186187
.map(ResponseEntity::ok);
187188
}
188189

kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import lombok.extern.log4j.Log4j2;
66
import lombok.val;
77
import org.junit.jupiter.api.Assertions;
8+
import org.junit.jupiter.api.BeforeEach;
89
import org.junit.jupiter.api.Test;
910
import org.springframework.beans.factory.annotation.Autowired;
1011
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
@@ -23,6 +24,12 @@
2324
class SchemaRegistryServiceTests extends AbstractBaseTest {
2425
@Autowired
2526
WebTestClient webTestClient;
27+
String subject;
28+
29+
@BeforeEach
30+
void setUpBefore() {
31+
this.subject = UUID.randomUUID().toString();
32+
}
2633

2734
@Test
2835
public void should404WhenGetAllSchemasForUnknownCluster() {
@@ -34,11 +41,11 @@ public void should404WhenGetAllSchemasForUnknownCluster() {
3441
}
3542

3643
@Test
37-
void shouldReturn404WhenGetLatestSchemaByNonExistingSchemaName() {
44+
void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() {
3845
String unknownSchema = "unknown-schema";
3946
webTestClient
4047
.get()
41-
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", unknownSchema)
48+
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
4249
.exchange()
4350
.expectStatus().isNotFound();
4451
}
@@ -59,49 +66,51 @@ void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
5966
}
6067

6168
@Test
62-
public void shouldReturnNotNullResponseWhenGetAllSchemas() {
69+
public void shouldReturnNotEmptyResponseWhenGetAllSchemas() {
70+
createNewSubjectAndAssert(subject);
71+
6372
webTestClient
6473
.get()
6574
.uri("http://localhost:8080/api/clusters/local/schemas")
6675
.exchange()
6776
.expectStatus().isOk()
68-
.expectBodyList(String.class)
77+
.expectBodyList(SchemaSubject.class)
6978
.consumeWith(result -> {
70-
List<String> responseBody = result.getResponseBody();
71-
Assertions.assertNotNull(responseBody);
79+
List<SchemaSubject> responseBody = result.getResponseBody();
7280
log.info("Response of test schemas: {}", responseBody);
81+
Assertions.assertNotNull(responseBody);
82+
Assertions.assertFalse(responseBody.isEmpty());
83+
84+
SchemaSubject actualSchemaSubject = responseBody.stream()
85+
.filter(schemaSubject -> subject.equals(schemaSubject.getSubject()))
86+
.findFirst()
87+
.orElseThrow();
88+
Assertions.assertNotNull(actualSchemaSubject.getId());
89+
Assertions.assertNotNull(actualSchemaSubject.getVersion());
90+
Assertions.assertNotNull(actualSchemaSubject.getCompatibilityLevel());
91+
Assertions.assertEquals("\"string\"", actualSchemaSubject.getSchema());
7392
});
7493
}
7594

7695
@Test
7796
public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
78-
String schemaName = UUID.randomUUID().toString();
79-
// Create a new schema
80-
webTestClient
81-
.post()
82-
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}", schemaName)
83-
.contentType(MediaType.APPLICATION_JSON)
84-
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
85-
.exchange()
86-
.expectStatus().isOk()
87-
.expectBody(SchemaSubject.class)
88-
.consumeWith(this::assertResponseBodyWhenCreateNewSchema);
97+
createNewSubjectAndAssert(subject);
8998

9099
//Get the created schema and check its items
91100
webTestClient
92101
.get()
93-
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
102+
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
94103
.exchange()
95104
.expectStatus().isOk()
96105
.expectBodyList(SchemaSubject.class)
97106
.consumeWith(listEntityExchangeResult -> {
98107
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.BACKWARD;
99-
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
108+
assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
100109
});
101110

102111
//Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
103112
webTestClient.put()
104-
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/compatibility", schemaName)
113+
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
105114
.contentType(MediaType.APPLICATION_JSON)
106115
.body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
107116
.exchange()
@@ -110,23 +119,35 @@ public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() {
110119
//Get one more time to check the schema compatibility level is changed to FULL
111120
webTestClient
112121
.get()
113-
.uri("http://localhost:8080/api/clusters/local/schemas/{schemaName}/latest", schemaName)
122+
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
114123
.exchange()
115124
.expectStatus().isOk()
116125
.expectBodyList(SchemaSubject.class)
117126
.consumeWith(listEntityExchangeResult -> {
118127
val expectedCompatibility = CompatibilityLevel.CompatibilityEnum.FULL;
119-
assertSchemaWhenGetLatest(schemaName, listEntityExchangeResult, expectedCompatibility);
128+
assertSchemaWhenGetLatest(subject, listEntityExchangeResult, expectedCompatibility);
120129
});
121130
}
122131

123-
private void assertSchemaWhenGetLatest(String schemaName, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
132+
private void createNewSubjectAndAssert(String subject) {
133+
webTestClient
134+
.post()
135+
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
136+
.contentType(MediaType.APPLICATION_JSON)
137+
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
138+
.exchange()
139+
.expectStatus().isOk()
140+
.expectBody(SchemaSubject.class)
141+
.consumeWith(this::assertResponseBodyWhenCreateNewSchema);
142+
}
143+
144+
private void assertSchemaWhenGetLatest(String subject, EntityExchangeResult<List<SchemaSubject>> listEntityExchangeResult, CompatibilityLevel.CompatibilityEnum expectedCompatibility) {
124145
List<SchemaSubject> responseBody = listEntityExchangeResult.getResponseBody();
125146
Assertions.assertNotNull(responseBody);
126147
Assertions.assertEquals(1, responseBody.size());
127148
SchemaSubject actualSchema = responseBody.get(0);
128149
Assertions.assertNotNull(actualSchema);
129-
Assertions.assertEquals(schemaName, actualSchema.getSubject());
150+
Assertions.assertEquals(subject, actualSchema.getSubject());
130151
Assertions.assertEquals("\"string\"", actualSchema.getSchema());
131152

132153
Assertions.assertNotNull(actualSchema.getCompatibilityLevel());

0 commit comments

Comments
 (0)