Skip to content

Commit 2e3d11b

Browse files
committed
Decouple standard and GPC Schema Registries APIs and components
1 parent f2e8a53 commit 2e3d11b

File tree

12 files changed

+831
-106
lines changed

12 files changed

+831
-106
lines changed

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 97 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import io.kafbat.ui.api.SchemasApi;
44
import io.kafbat.ui.exception.ValidationException;
5+
import io.kafbat.ui.mapper.GcpKafkaSrMapper;
6+
import io.kafbat.ui.mapper.GcpKafkaSrMapperImpl;
57
import io.kafbat.ui.mapper.KafkaSrMapper;
68
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
79
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
@@ -12,6 +14,7 @@
1214
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
1315
import io.kafbat.ui.model.rbac.AccessContext;
1416
import io.kafbat.ui.model.rbac.permission.SchemaAction;
17+
import io.kafbat.ui.service.GcpSchemaRegistryService;
1518
import io.kafbat.ui.service.SchemaRegistryService;
1619
import io.kafbat.ui.service.mcp.McpTool;
1720
import java.util.List;
@@ -34,18 +37,22 @@ public class SchemasController extends AbstractController implements SchemasApi,
3437
private static final Integer DEFAULT_PAGE_SIZE = 25;
3538

3639
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
40+
private final GcpKafkaSrMapper gcpKafkaSrMapper = new GcpKafkaSrMapperImpl();
3741

3842
private final SchemaRegistryService schemaRegistryService;
43+
private final GcpSchemaRegistryService gcpSchemaRegistryService;
3944

4045
@Override
4146
protected KafkaCluster getCluster(String clusterName) {
4247
var c = super.getCluster(clusterName);
43-
if (c.getSchemaRegistryClient() == null) {
48+
if (c.getSchemaRegistryClient() == null && c.getGcpSchemaRegistryClient() == null) {
4449
throw new ValidationException("Schema Registry is not set for cluster " + clusterName);
4550
}
4651
return c;
4752
}
4853

54+
55+
4956
@Override
5057
public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
5158
String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
@@ -57,13 +64,16 @@ public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibil
5764
.build();
5865

5966
return validateAccess(context).then(
60-
newSchemaSubjectMono.flatMap(subjectDTO ->
61-
schemaRegistryService.checksSchemaCompatibility(
62-
getCluster(clusterName),
63-
subject,
64-
kafkaSrMapper.fromDto(subjectDTO)
65-
))
66-
.map(kafkaSrMapper::toDto)
67+
newSchemaSubjectMono.flatMap(subjectDTO -> {
68+
var cluster = getCluster(clusterName);
69+
return cluster.isGcpSchemaRegistryEnabled()
70+
? gcpSchemaRegistryService.checksSchemaCompatibility(
71+
cluster, subject, gcpKafkaSrMapper.fromDto(subjectDTO))
72+
.map(gcpKafkaSrMapper::toDto) :
73+
schemaRegistryService.checksSchemaCompatibility(
74+
cluster, subject, kafkaSrMapper.fromDto(subjectDTO))
75+
.map(kafkaSrMapper::toDto);
76+
})
6777
.map(ResponseEntity::ok)
6878
).doOnEach(sig -> audit(context, sig));
6979
}
@@ -73,22 +83,23 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
7383
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
7484
ServerWebExchange exchange) {
7585
return newSchemaSubjectMono.flatMap(newSubject -> {
76-
var context = AccessContext.builder()
77-
.cluster(clusterName)
78-
.schemaActions(newSubject.getSubject(), SchemaAction.CREATE)
79-
.operationName("createNewSchema")
80-
.build();
81-
return validateAccess(context).then(
86+
var context = AccessContext.builder()
87+
.cluster(clusterName)
88+
.schemaActions(newSubject.getSubject(), SchemaAction.CREATE)
89+
.operationName("createNewSchema")
90+
.build();
91+
var cluster = getCluster(clusterName);
92+
return validateAccess(context).then(
93+
cluster.isGcpSchemaRegistryEnabled()
94+
? gcpSchemaRegistryService.registerNewSchema(
95+
cluster, newSubject.getSubject(), gcpKafkaSrMapper.fromDto(newSubject))
96+
.map(gcpKafkaSrMapper::toDto) :
8297
schemaRegistryService.registerNewSchema(
83-
getCluster(clusterName),
84-
newSubject.getSubject(),
85-
kafkaSrMapper.fromDto(newSubject)
86-
))
87-
.map(kafkaSrMapper::toDto)
88-
.map(ResponseEntity::ok)
89-
.doOnEach(sig -> audit(context, sig));
90-
}
91-
);
98+
cluster, newSubject.getSubject(), kafkaSrMapper.fromDto(newSubject))
99+
.map(kafkaSrMapper::toDto))
100+
.map(ResponseEntity::ok)
101+
.doOnEach(sig -> audit(context, sig));
102+
});
92103
}
93104

94105
@Override
@@ -100,8 +111,11 @@ public Mono<ResponseEntity<Void>> deleteLatestSchema(
100111
.operationName("deleteLatestSchema")
101112
.build();
102113

114+
var cluster = getCluster(clusterName);
103115
return validateAccess(context).then(
104-
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
116+
(cluster.isGcpSchemaRegistryEnabled()
117+
? gcpSchemaRegistryService.deleteLatestSchemaSubject(cluster, subject) :
118+
schemaRegistryService.deleteLatestSchemaSubject(cluster, subject))
105119
.doOnEach(sig -> audit(context, sig))
106120
.thenReturn(ResponseEntity.ok().build())
107121
);
@@ -116,8 +130,11 @@ public Mono<ResponseEntity<Void>> deleteSchema(
116130
.operationName("deleteSchema")
117131
.build();
118132

133+
var cluster = getCluster(clusterName);
119134
return validateAccess(context).then(
120-
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
135+
(cluster.isGcpSchemaRegistryEnabled()
136+
? gcpSchemaRegistryService.deleteSchemaSubjectEntirely(cluster, subject) :
137+
schemaRegistryService.deleteSchemaSubjectEntirely(cluster, subject))
121138
.doOnEach(sig -> audit(context, sig))
122139
.thenReturn(ResponseEntity.ok().build())
123140
);
@@ -132,8 +149,11 @@ public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
132149
.operationName("deleteSchemaByVersion")
133150
.build();
134151

152+
var cluster = getCluster(clusterName);
135153
return validateAccess(context).then(
136-
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
154+
(cluster.isGcpSchemaRegistryEnabled()
155+
? gcpSchemaRegistryService.deleteSchemaSubjectByVersion(cluster, subjectName, version) :
156+
schemaRegistryService.deleteSchemaSubjectByVersion(cluster, subjectName, version))
137157
.doOnEach(sig -> audit(context, sig))
138158
.thenReturn(ResponseEntity.ok().build())
139159
);
@@ -148,9 +168,10 @@ public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
148168
.operationName("getAllVersionsBySubject")
149169
.build();
150170

151-
Flux<SchemaSubjectDTO> schemas =
152-
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
153-
.map(kafkaSrMapper::toDto);
171+
var cluster = getCluster(clusterName);
172+
Flux<SchemaSubjectDTO> schemas = cluster.isGcpSchemaRegistryEnabled()
173+
? gcpSchemaRegistryService.getAllVersionsBySubject(cluster, subjectName).map(gcpKafkaSrMapper::toDto) :
174+
schemaRegistryService.getAllVersionsBySubject(cluster, subjectName).map(kafkaSrMapper::toDto);
154175

155176
return validateAccess(context)
156177
.thenReturn(ResponseEntity.ok(schemas))
@@ -160,8 +181,12 @@ public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
160181
@Override
161182
public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
162183
String clusterName, ServerWebExchange exchange) {
163-
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
164-
.map(c -> new CompatibilityLevelDTO().compatibility(kafkaSrMapper.toDto(c)))
184+
var cluster = getCluster(clusterName);
185+
return (cluster.isGcpSchemaRegistryEnabled()
186+
? gcpSchemaRegistryService.getGlobalSchemaCompatibilityLevel(cluster)
187+
.map(c -> new CompatibilityLevelDTO().compatibility(gcpKafkaSrMapper.toDto(c))) :
188+
schemaRegistryService.getGlobalSchemaCompatibilityLevel(cluster)
189+
.map(c -> new CompatibilityLevelDTO().compatibility(kafkaSrMapper.toDto(c))))
165190
.map(ResponseEntity::ok)
166191
.defaultIfEmpty(ResponseEntity.notFound().build());
167192
}
@@ -176,9 +201,13 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName
176201
.operationName("getLatestSchema")
177202
.build();
178203

204+
var cluster = getCluster(clusterName);
179205
return validateAccess(context).then(
180-
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
181-
.map(kafkaSrMapper::toDto)
206+
(cluster.isGcpSchemaRegistryEnabled()
207+
? gcpSchemaRegistryService.getLatestSchemaVersionBySubject(cluster, subject)
208+
.map(gcpKafkaSrMapper::toDto) :
209+
schemaRegistryService.getLatestSchemaVersionBySubject(cluster, subject)
210+
.map(kafkaSrMapper::toDto))
182211
.map(ResponseEntity::ok)
183212
).doOnEach(sig -> audit(context, sig));
184213
}
@@ -193,10 +222,13 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
193222
.operationParams(Map.of("subject", subject, "version", version))
194223
.build();
195224

225+
var cluster = getCluster(clusterName);
196226
return validateAccess(context).then(
197-
schemaRegistryService.getSchemaSubjectByVersion(
198-
getCluster(clusterName), subject, version)
199-
.map(kafkaSrMapper::toDto)
227+
(cluster.isGcpSchemaRegistryEnabled()
228+
? gcpSchemaRegistryService.getSchemaSubjectByVersion(cluster, subject, version)
229+
.map(gcpKafkaSrMapper::toDto) :
230+
schemaRegistryService.getSchemaSubjectByVersion(cluster, subject, version)
231+
.map(kafkaSrMapper::toDto))
200232
.map(ResponseEntity::ok)
201233
).doOnEach(sig -> audit(context, sig));
202234
}
@@ -212,8 +244,10 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
212244
.operationName("getSchemas")
213245
.build();
214246

215-
return schemaRegistryService
216-
.getAllSubjectNames(getCluster(clusterName))
247+
var cluster = getCluster(clusterName);
248+
return (cluster.isGcpSchemaRegistryEnabled()
249+
? gcpSchemaRegistryService.getAllSubjectNames(cluster) :
250+
schemaRegistryService.getAllSubjectNames(cluster))
217251
.flatMapIterable(l -> l)
218252
.filterWhen(schema -> accessControlService.isSchemaAccessible(schema, clusterName))
219253
.collectList()
@@ -230,9 +264,14 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
230264
.skip(subjectToSkip)
231265
.limit(pageSize)
232266
.toList();
233-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
234-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
235-
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
267+
return (cluster.isGcpSchemaRegistryEnabled()
268+
? gcpSchemaRegistryService.getAllLatestVersionSchemas(cluster, subjectsToRender)
269+
.map(subjs -> subjs.stream()
270+
.map(gcpKafkaSrMapper::toDto).toList()) :
271+
schemaRegistryService.getAllLatestVersionSchemas(cluster, subjectsToRender)
272+
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList()))
273+
.map(subjs -> new SchemaSubjectsResponseDTO()
274+
.pageCount(totalPages).schemas(subjs));
236275
}).map(ResponseEntity::ok)
237276
.doOnEach(sig -> audit(context, sig));
238277
}
@@ -247,13 +286,15 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
247286
.operationName("updateGlobalSchemaCompatibilityLevel")
248287
.build();
249288

289+
var cluster = getCluster(clusterName);
250290
return validateAccess(context).then(
251291
compatibilityLevelMono
252292
.flatMap(compatibilityLevelDTO ->
253-
schemaRegistryService.updateGlobalSchemaCompatibility(
254-
getCluster(clusterName),
255-
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
256-
))
293+
cluster.isGcpSchemaRegistryEnabled()
294+
? gcpSchemaRegistryService.updateGlobalSchemaCompatibility(
295+
cluster, gcpKafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())) :
296+
schemaRegistryService.updateGlobalSchemaCompatibility(
297+
cluster, kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())))
257298
.doOnEach(sig -> audit(context, sig))
258299
.thenReturn(ResponseEntity.ok().build())
259300
);
@@ -271,15 +312,16 @@ public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
271312
.operationParams(Map.of("subject", subject))
272313
.build();
273314

274-
return compatibilityLevelMono.flatMap(compatibilityLevelDTO ->
275-
validateAccess(context).then(
276-
schemaRegistryService.updateSchemaCompatibility(
277-
getCluster(clusterName),
278-
subject,
279-
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
280-
))
281-
.doOnEach(sig -> audit(context, sig))
282-
.thenReturn(ResponseEntity.ok().build())
283-
);
315+
return compatibilityLevelMono.flatMap(compatibilityLevelDTO -> {
316+
var cluster = getCluster(clusterName);
317+
return validateAccess(context).then(
318+
cluster.isGcpSchemaRegistryEnabled()
319+
? gcpSchemaRegistryService.updateSchemaCompatibility(
320+
cluster, subject, gcpKafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())) :
321+
schemaRegistryService.updateSchemaCompatibility(
322+
cluster, subject, kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())))
323+
.doOnEach(sig -> audit(context, sig))
324+
.thenReturn(ResponseEntity.ok().build());
325+
});
284326
}
285327
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kafbat.ui.mapper;
2+
3+
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
4+
import io.kafbat.ui.model.CompatibilityLevelDTO;
5+
import io.kafbat.ui.model.NewSchemaSubjectDTO;
6+
import io.kafbat.ui.model.SchemaReferenceDTO;
7+
import io.kafbat.ui.model.SchemaSubjectDTO;
8+
import io.kafbat.ui.model.SchemaTypeDTO;
9+
import io.kafbat.ui.service.GcpSchemaRegistryService;
10+
import java.util.List;
11+
import java.util.Optional;
12+
import org.mapstruct.Mapper;
13+
14+
@Mapper
15+
public interface GcpKafkaSrMapper {
16+
17+
// Convert GCP SubjectWithCompatibilityLevel to DTO
18+
default SchemaSubjectDTO toDto(GcpSchemaRegistryService.SubjectWithCompatibilityLevel s) {
19+
return new SchemaSubjectDTO()
20+
.id(s.getId())
21+
.version(s.getVersion())
22+
.subject(s.getSubject())
23+
.schema(s.getSchema())
24+
.schemaType(SchemaTypeDTO.fromValue(
25+
Optional.ofNullable(s.getSchemaType())
26+
.orElse(io.kafbat.ui.gcp.sr.model.SchemaType.AVRO)
27+
.getValue()))
28+
.references(toDto(s.getReferences()))
29+
.compatibilityLevel(Optional.ofNullable(s.getCompatibility())
30+
.map(Object::toString).orElse(null));
31+
}
32+
33+
// Convert GCP SchemaReference list to DTO list
34+
List<SchemaReferenceDTO> toDto(List<io.kafbat.ui.gcp.sr.model.SchemaReference> references);
35+
36+
// Convert GCP CompatibilityCheckResponse to DTO
37+
CompatibilityCheckResponseDTO toDto(io.kafbat.ui.gcp.sr.model.CompatibilityCheckResponse ccr);
38+
39+
// Convert GCP Compatibility to DTO enum
40+
CompatibilityLevelDTO.CompatibilityEnum toDto(io.kafbat.ui.gcp.sr.model.Compatibility compatibility);
41+
42+
// Convert DTO to GCP NewSubject
43+
io.kafbat.ui.gcp.sr.model.NewSubject fromDto(NewSchemaSubjectDTO subjectDto);
44+
45+
// Convert DTO enum to GCP Compatibility
46+
io.kafbat.ui.gcp.sr.model.Compatibility fromDto(CompatibilityLevelDTO.CompatibilityEnum dtoEnum);
47+
48+
}

api/src/main/java/io/kafbat/ui/model/KafkaCluster.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
55
import io.kafbat.ui.emitter.PollingSettings;
6+
import io.kafbat.ui.gcp.sr.api.KafkaGcpSrClientApi;
67
import io.kafbat.ui.service.ksql.KsqlApiClient;
78
import io.kafbat.ui.service.masking.DataMasking;
89
import io.kafbat.ui.sr.api.KafkaSrClientApi;
@@ -31,6 +32,8 @@ public class KafkaCluster {
3132
private final DataMasking masking;
3233
private final PollingSettings pollingSettings;
3334
private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
35+
private final ReactiveFailover<KafkaGcpSrClientApi> gcpSchemaRegistryClient;
36+
private final boolean isGcpSchemaRegistryEnabled;
3437
private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
3538
private final ReactiveFailover<KsqlApiClient> ksqlClient;
3639
}

api/src/main/java/io/kafbat/ui/service/FeatureService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient admin
3636
features.add(Mono.just(ClusterFeature.KSQL_DB));
3737
}
3838

39-
if (cluster.getSchemaRegistryClient() != null) {
39+
if (cluster.getSchemaRegistryClient() != null || cluster.getGcpSchemaRegistryClient() != null) {
4040
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
4141
}
4242

0 commit comments

Comments
 (0)