Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import io.kafbat.ui.sr.api.KafkaSrClientApi;
import io.kafbat.ui.sr.model.Compatibility;
import io.kafbat.ui.sr.model.CompatibilityCheckResponse;
import io.kafbat.ui.sr.model.CompatibilityConfig;
import io.kafbat.ui.sr.model.CompatibilityLevelChange;
import io.kafbat.ui.sr.model.GetGlobalCompatibilityLevel200Response;
import io.kafbat.ui.sr.model.NewSubject;
import io.kafbat.ui.sr.model.SchemaSubject;
import io.kafbat.ui.util.ReactiveFailover;
import io.kafbat.ui.util.gcp.GcpBearerAuthFilter;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -35,9 +36,6 @@ public class SchemaRegistryService {

private static final String LATEST = "latest";

private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

@AllArgsConstructor
public static class SubjectWithCompatibilityLevel {
@Delegate
Expand Down Expand Up @@ -152,14 +150,16 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
return api(cluster)
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig))
.map(compatibilityResponse ->
normalizeCompatibilityResponse(cluster, compatibilityResponse))
.onErrorResume(error -> Mono.empty());
}

public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return api(cluster)
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig));
.map(compatibilityResponse ->
normalizeCompatibilityResponse(cluster, compatibilityResponse));
}

private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
Expand All @@ -174,13 +174,14 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
}

private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) {
private Compatibility normalizeCompatibilityResponse(KafkaCluster cluster,
GetGlobalCompatibilityLevel200Response compatibilityResponse) {
if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null
&& Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(),
GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
return compatibilityConfig.getCompatibility();
GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) {
return compatibilityResponse.getCompatibility();
} else {
return compatibilityConfig.getCompatibilityLevel();
return compatibilityResponse.getCompatibilityLevel();
}
}
}
Expand Down
25 changes: 17 additions & 8 deletions contract/src/main/resources/swagger/kafka-sr-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityConfig'
oneOf:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems messy. If GCP SR uses a different API, we should define a separate contract for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for taking so long. But I finally managed to decouple both APIs. To make it work I decided to create a GcpSchemaRegistryService and a GcpKafkaSrMapper and make use of them in the SchemaController based on a KafkaCluster variable called "isGcpSchemaRegistry" (bool) initialized in the KafkaClusterFactory. I think with this approach would be easy to undo this changes if Google decides at some point to make their SchemaRegistry fully compliant with the standard SR API.

The Serdes remains as it was because the problematic "compatibility" field is not used there. For that component just the custom auth class is needed.

- $ref: '#/components/schemas/CompatibilityConfig'
- $ref: '#/components/schemas/CompatibilityConfigGcp'
404:
description: Not found
put:
Expand Down Expand Up @@ -220,7 +222,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/CompatibilityConfig'
oneOf:
- $ref: '#/components/schemas/CompatibilityConfig'
- $ref: '#/components/schemas/CompatibilityConfigGcp'
404:
description: Not found
put:
Expand Down Expand Up @@ -381,13 +385,20 @@ components:
properties:
compatibilityLevel:
$ref: '#/components/schemas/Compatibility'
# GCP Managed Kafka Schema registries specific fields
required:
- compatibilityLevel

CompatibilityConfigGcp:
type: object
properties:
alias:
type: string
type: string
compatibility:
$ref: '#/components/schemas/Compatibility'
$ref: '#/components/schemas/Compatibility'
normalize:
type: boolean
type: boolean
required:
- compatibility

CompatibilityLevelChange:
type: object
Expand All @@ -397,7 +408,6 @@ components:
required:
- compatibility


Compatibility:
type: string
enum:
Expand All @@ -409,7 +419,6 @@ components:
- FULL_TRANSITIVE
- NONE


CompatibilityCheckResponse:
type: object
properties:
Expand Down
Loading