99import io .kafbat .ui .sr .api .KafkaSrClientApi ;
1010import io .kafbat .ui .sr .model .Compatibility ;
1111import io .kafbat .ui .sr .model .CompatibilityCheckResponse ;
12- import io .kafbat .ui .sr .model .CompatibilityConfig ;
1312import io .kafbat .ui .sr .model .CompatibilityLevelChange ;
13+ import io .kafbat .ui .sr .model .GetGlobalCompatibilityLevel200Response ;
1414import io .kafbat .ui .sr .model .NewSubject ;
1515import io .kafbat .ui .sr .model .SchemaSubject ;
1616import io .kafbat .ui .util .ReactiveFailover ;
17+ import io .kafbat .ui .util .gcp .GcpBearerAuthFilter ;
1718import java .util .List ;
1819import java .util .Objects ;
1920import java .util .stream .Collectors ;
@@ -35,9 +36,6 @@ public class SchemaRegistryService {
3536
3637 private static final String LATEST = "latest" ;
3738
38- private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
39- "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider" ;
40-
4139 @ AllArgsConstructor
4240 public static class SubjectWithCompatibilityLevel {
4341 @ Delegate
@@ -152,14 +150,16 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
152150 String schemaName ) {
153151 return api (cluster )
154152 .mono (c -> c .getSubjectCompatibilityLevel (schemaName , true ))
155- .map (compatibilityConfig -> selectCompatibilityFormat (cluster , compatibilityConfig ))
153+ .map (compatibilityResponse ->
154+ normalizeCompatibilityResponse (cluster , compatibilityResponse ))
156155 .onErrorResume (error -> Mono .empty ());
157156 }
158157
159158 public Mono <Compatibility > getGlobalSchemaCompatibilityLevel (KafkaCluster cluster ) {
160159 return api (cluster )
161160 .mono (KafkaSrClientApi ::getGlobalCompatibilityLevel )
162- .map (compatibilityConfig -> selectCompatibilityFormat (cluster , compatibilityConfig ));
161+ .map (compatibilityResponse ->
162+ normalizeCompatibilityResponse (cluster , compatibilityResponse ));
163163 }
164164
165165 private Mono <Compatibility > getSchemaCompatibilityInfoOrGlobal (KafkaCluster cluster ,
@@ -174,13 +174,14 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
174174 return api (cluster ).mono (c -> c .checkSchemaCompatibility (schemaName , LATEST , true , newSchemaSubject ));
175175 }
176176
177- private Compatibility selectCompatibilityFormat (KafkaCluster cluster , CompatibilityConfig compatibilityConfig ) {
177+ private Compatibility normalizeCompatibilityResponse (KafkaCluster cluster ,
178+ GetGlobalCompatibilityLevel200Response compatibilityResponse ) {
178179 if (cluster .getOriginalProperties ().getSchemaRegistryAuth () != null
179180 && Objects .equals (cluster .getOriginalProperties ().getSchemaRegistryAuth ().getBearerAuthCustomProviderClass (),
180- GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS )) {
181- return compatibilityConfig .getCompatibility ();
181+ GcpBearerAuthFilter . getGcpBearerAuthCustomProviderClass () )) {
182+ return compatibilityResponse .getCompatibility ();
182183 } else {
183- return compatibilityConfig .getCompatibilityLevel ();
184+ return compatibilityResponse .getCompatibilityLevel ();
184185 }
185186 }
186187}
0 commit comments