Skip to content

Commit 5392e77

Browse files
committed
Expose BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameter in config
1 parent e9d5a7d commit 5392e77

File tree

8 files changed

+61
-39
lines changed

8 files changed

+61
-39
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ public static class Cluster {
7171

7272
AuditProperties audit;
7373

74-
boolean gcpSchemaRegistry = false;
75-
7674
}
7775

7876
@Data
@@ -116,6 +114,7 @@ public static class ConnectCluster {
116114
public static class SchemaRegistryAuth {
117115
String username;
118116
String password;
117+
String bearerAuthCustomProviderClass;
119118
}
120119

121120
@Data

api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ interface MessageFormatter {
2020
String format(String topic, byte[] value);
2121

2222
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
23-
boolean gcpSchemaRegistry) {
23+
//boolean gcpSchemaRegistry) {
24+
String bearerAuthCustomProviderClass) {
2425
return Map.of(
25-
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry),
26+
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass),
2627
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
2728
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
2829
);
@@ -31,7 +32,7 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
3132
class AvroMessageFormatter implements MessageFormatter {
3233
private final KafkaAvroDeserializer avroDeserializer;
3334

34-
AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) {
35+
AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) {
3536
this.avroDeserializer = new KafkaAvroDeserializer(client);
3637

3738
final Map<String, Object> avroProps = new HashMap<>();
@@ -40,10 +41,10 @@ class AvroMessageFormatter implements MessageFormatter {
4041
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
4142
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);
4243

43-
if (gcpSchemaRegistry) {
44+
if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) {
4445
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
4546
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
46-
"class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider");
47+
String.format("class %s", bearerAuthCustomProviderClass));
4748
}
4849

4950
this.avroDeserializer.configure(avroProps, false);

api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ public class SchemaRegistrySerde implements BuiltInSerde {
4343
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
4444
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
4545
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";
46-
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
47-
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";
4846

4947
public static String name() {
5048
return "SchemaRegistry";
@@ -80,13 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
8078
urls,
8179
kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null),
8280
kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null),
81+
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
82+
.orElse(null),
8383
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
8484
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
8585
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
86-
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
87-
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
86+
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
8887
),
89-
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
88+
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
89+
.orElse(null),
9090
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
9191
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
9292
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
@@ -108,13 +108,15 @@ public void configure(PropertyResolver serdeProperties,
108108
urls,
109109
serdeProperties.getProperty("username", String.class).orElse(null),
110110
serdeProperties.getProperty("password", String.class).orElse(null),
111+
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
112+
.orElse(null),
111113
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
112114
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
113115
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
114-
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
115-
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
116+
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
116117
),
117-
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
118+
kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class)
119+
.orElse(null),
118120
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
119121
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
120122
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
@@ -126,26 +128,27 @@ public void configure(PropertyResolver serdeProperties,
126128
void configure(
127129
List<String> schemaRegistryUrls,
128130
SchemaRegistryClient schemaRegistryClient,
129-
boolean gcpSchemaRegistry,
131+
String bearerAuthCustomProviderClass,
130132
String keySchemaNameTemplate,
131133
String valueSchemaNameTemplate,
132134
boolean checkTopicSchemaExistenceForDeserialize) {
133135
this.schemaRegistryUrls = schemaRegistryUrls;
134136
this.schemaRegistryClient = schemaRegistryClient;
135137
this.keySchemaNameTemplate = keySchemaNameTemplate;
136138
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
137-
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry);
139+
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass);
138140
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
139141
}
140142

141143
private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
142144
@Nullable String username,
143145
@Nullable String password,
146+
@Nullable String bearerAuthCustomProviderClass,
144147
@Nullable String keyStoreLocation,
145148
@Nullable String keyStorePassword,
146149
@Nullable String trustStoreLocation,
147-
@Nullable String trustStorePassword,
148-
boolean gcpSchemaRegistry) {
150+
@Nullable String trustStorePassword
151+
) {
149152
Map<String, String> configs = new HashMap<>();
150153
if (username != null && password != null) {
151154
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
@@ -175,9 +178,9 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
175178
keyStorePassword);
176179
}
177180

178-
if (gcpSchemaRegistry) {
181+
if (bearerAuthCustomProviderClass != null) {
179182
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
180-
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS);
183+
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass);
181184
}
182185

183186
return new CachedSchemaRegistryClient(

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,15 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope
168168

169169
private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {
170170

171+
172+
//System.out.println("Creating Schema Registry Client for cluster: " + clusterProperties.getName());
171173
var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
172174
.orElse(new ClustersProperties.SchemaRegistryAuth());
175+
//System.out.println("Auth details: " + auth.toString());
173176
WebClient webClient = new WebClientConfigurator()
174177
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
175178
.configureBasicAuth(auth.getUsername(), auth.getPassword())
176-
.configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry())
179+
.configureBearerTokenAuth(auth.getBearerAuthCustomProviderClass())
177180
.configureBufferSize(webClientMaxBuffSize)
178181
.build();
179182
return ReactiveFailover.create(

api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.kafbat.ui.sr.model.SchemaSubject;
1616
import io.kafbat.ui.util.ReactiveFailover;
1717
import java.util.List;
18+
import java.util.Objects;
1819
import java.util.stream.Collectors;
1920
import lombok.AllArgsConstructor;
2021
import lombok.Getter;
@@ -34,6 +35,9 @@ public class SchemaRegistryService {
3435

3536
private static final String LATEST = "latest";
3637

38+
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
39+
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";
40+
3741
@AllArgsConstructor
3842
public static class SubjectWithCompatibilityLevel {
3943
@Delegate
@@ -148,21 +152,14 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
148152
String schemaName) {
149153
return api(cluster)
150154
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
151-
.map(compatibilityConfig ->
152-
cluster.getOriginalProperties().isGcpSchemaRegistry()
153-
? compatibilityConfig.getCompatibility()
154-
: compatibilityConfig.getCompatibilityLevel())
155+
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig))
155156
.onErrorResume(error -> Mono.empty());
156157
}
157158

158159
public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
159160
return api(cluster)
160161
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
161-
.map(compatibilityConfig ->
162-
cluster.getOriginalProperties().isGcpSchemaRegistry()
163-
? compatibilityConfig.getCompatibility()
164-
: compatibilityConfig.getCompatibilityLevel()
165-
);
162+
.map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig));
166163
}
167164

168165
private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
@@ -176,4 +173,17 @@ public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster c
176173
NewSubject newSchemaSubject) {
177174
return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
178175
}
176+
177+
private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) {
178+
if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null
179+
&& Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(),
180+
GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
181+
return compatibilityConfig.getCompatibility();
182+
} else {
183+
return compatibilityConfig.getCompatibilityLevel();
184+
}
185+
}
179186
}
187+
188+
189+

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import java.security.KeyStore;
1515
import java.time.Duration;
1616
import java.util.Collections;
17+
import java.util.Objects;
1718
import java.util.function.Consumer;
1819
import javax.annotation.Nullable;
1920
import javax.net.ssl.KeyManagerFactory;
2021
import javax.net.ssl.TrustManagerFactory;
22+
import javax.validation.constraints.Null;
2123
import lombok.SneakyThrows;
2224
import org.openapitools.jackson.nullable.JsonNullableModule;
2325
import org.springframework.http.MediaType;
@@ -36,6 +38,9 @@
3638

3739
public class WebClientConfigurator {
3840

41+
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
42+
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";
43+
3944
private final WebClient.Builder builder = WebClient.builder();
4045
private HttpClient httpClient = HttpClient
4146
.create()
@@ -52,9 +57,10 @@ private static ObjectMapper defaultOM() {
5257
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
5358
}
5459

55-
public WebClientConfigurator configureGcpBearerAuth(boolean enabled) {
56-
if (enabled) {
57-
System.out.println("Configuring GCP Bearer Auth");
60+
public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) {
61+
//System.out.println("Configuring GCP Bearer Auth in web client");
62+
//System.out.println("Bearer Auth Custom Provider Class: " + bearerAuthCustomProviderClass);
63+
if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) {
5864
builder.filter(createGcpBearerAuthFilter());
5965
}
6066
return this;

api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
3535
@BeforeEach
3636
void init() {
3737
serde = new SchemaRegistrySerde();
38-
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
38+
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
3939
}
4040

4141
@ParameterizedTest
@@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {
135135

136136
@BeforeEach
137137
void init() {
138-
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false);
138+
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false);
139139
}
140140

141141
@Test
@@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {
151151

152152
@BeforeEach
153153
void init() {
154-
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
154+
serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true);
155155
}
156156

157157
@Test

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4334,15 +4334,15 @@ components:
43344334
type: string
43354335
password:
43364336
type: string
4337+
bearerAuthCustomProviderClass:
4338+
type: string
43374339
schemaRegistrySsl:
43384340
type: object
43394341
properties:
43404342
keystoreLocation:
43414343
type: string
43424344
keystorePassword:
43434345
type: string
4344-
gcpSchemaRegistry:
4345-
type: boolean
43464346
ksqldbServer:
43474347
type: string
43484348
ksqldbServerSsl:

0 commit comments

Comments
 (0)