Skip to content

Commit e9d5a7d

Browse files
committed
Add all changes to connect with GCP Schema Registries
1 parent 7f97f71 commit e9d5a7d

File tree

9 files changed

+103
-23
lines changed

9 files changed

+103
-23
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ public static class Cluster {
7070
List<@Valid Masking> masking;
7171

7272
AuditProperties audit;
73+
74+
boolean gcpSchemaRegistry = false;
75+
7376
}
7477

7578
@Data

api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@
1111
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
1212
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
1313
import io.kafbat.ui.util.jsonschema.JsonAvroConversion;
14+
import java.util.HashMap;
1415
import java.util.Map;
1516
import lombok.SneakyThrows;
1617

1718
interface MessageFormatter {
1819

1920
String format(String topic, byte[] value);
2021

21-
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient) {
22+
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
23+
boolean gcpSchemaRegistry) {
2224
return Map.of(
23-
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient),
25+
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry),
2426
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
2527
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
2628
);
@@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
2931
class AvroMessageFormatter implements MessageFormatter {
3032
private final KafkaAvroDeserializer avroDeserializer;
3133

32-
AvroMessageFormatter(SchemaRegistryClient client) {
34+
AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) {
3335
this.avroDeserializer = new KafkaAvroDeserializer(client);
34-
this.avroDeserializer.configure(
35-
Map.of(
36-
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
37-
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
38-
KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
39-
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
40-
),
41-
false
42-
);
36+
37+
final Map<String, Object> avroProps = new HashMap<>();
38+
avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused");
39+
avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
40+
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
41+
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);
42+
43+
if (gcpSchemaRegistry) {
44+
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
45+
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
46+
"class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider");
47+
}
48+
49+
this.avroDeserializer.configure(avroProps, false);
50+
4351
}
4452

4553
@Override

api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
4242

4343
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
4444
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
45+
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";
46+
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
47+
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";
4548

4649
public static String name() {
4750
return "SchemaRegistry";
@@ -80,8 +83,10 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
8083
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
8184
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
8285
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
83-
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
86+
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
87+
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
8488
),
89+
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
8590
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
8691
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
8792
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
@@ -106,8 +111,10 @@ public void configure(PropertyResolver serdeProperties,
106111
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
107112
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
108113
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
109-
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
114+
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
115+
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
110116
),
117+
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
111118
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
112119
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
113120
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
@@ -119,14 +126,15 @@ public void configure(PropertyResolver serdeProperties,
119126
void configure(
120127
List<String> schemaRegistryUrls,
121128
SchemaRegistryClient schemaRegistryClient,
129+
boolean gcpSchemaRegistry,
122130
String keySchemaNameTemplate,
123131
String valueSchemaNameTemplate,
124132
boolean checkTopicSchemaExistenceForDeserialize) {
125133
this.schemaRegistryUrls = schemaRegistryUrls;
126134
this.schemaRegistryClient = schemaRegistryClient;
127135
this.keySchemaNameTemplate = keySchemaNameTemplate;
128136
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
129-
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
137+
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry);
130138
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
131139
}
132140

@@ -136,7 +144,8 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
136144
@Nullable String keyStoreLocation,
137145
@Nullable String keyStorePassword,
138146
@Nullable String trustStoreLocation,
139-
@Nullable String trustStorePassword) {
147+
@Nullable String trustStorePassword,
148+
boolean gcpSchemaRegistry) {
140149
Map<String, String> configs = new HashMap<>();
141150
if (username != null && password != null) {
142151
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
@@ -166,6 +175,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
166175
keyStorePassword);
167176
}
168177

178+
if (gcpSchemaRegistry) {
179+
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
180+
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS);
181+
}
182+
169183
return new CachedSchemaRegistryClient(
170184
urls,
171185
1_000,

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,13 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope
167167
}
168168

169169
private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {
170+
170171
var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
171172
.orElse(new ClustersProperties.SchemaRegistryAuth());
172173
WebClient webClient = new WebClientConfigurator()
173174
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
174175
.configureBasicAuth(auth.getUsername(), auth.getPassword())
176+
.configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry())
175177
.configureBufferSize(webClientMaxBuffSize)
176178
.build();
177179
return ReactiveFailover.create(

api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,21 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
148148
String schemaName) {
149149
return api(cluster)
150150
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
151-
.map(CompatibilityConfig::getCompatibilityLevel)
151+
.map(compatibilityConfig ->
152+
cluster.getOriginalProperties().isGcpSchemaRegistry()
153+
? compatibilityConfig.getCompatibility()
154+
: compatibilityConfig.getCompatibilityLevel())
152155
.onErrorResume(error -> Mono.empty());
153156
}
154157

155158
public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
156159
return api(cluster)
157160
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
158-
.map(CompatibilityConfig::getCompatibilityLevel);
161+
.map(compatibilityConfig ->
162+
cluster.getOriginalProperties().isGcpSchemaRegistry()
163+
? compatibilityConfig.getCompatibility()
164+
: compatibilityConfig.getCompatibilityLevel()
165+
);
159166
}
160167

161168
private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import com.fasterxml.jackson.databind.DeserializationFeature;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6+
import com.google.auth.oauth2.GoogleCredentials;
67
import io.kafbat.ui.config.ClustersProperties;
78
import io.kafbat.ui.exception.ValidationException;
89
import io.netty.handler.ssl.SslContext;
910
import io.netty.handler.ssl.SslContextBuilder;
1011
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
1112
import java.io.FileInputStream;
13+
import java.io.IOException;
1214
import java.security.KeyStore;
1315
import java.time.Duration;
16+
import java.util.Collections;
1417
import java.util.function.Consumer;
1518
import javax.annotation.Nullable;
1619
import javax.net.ssl.KeyManagerFactory;
@@ -24,9 +27,13 @@
2427
import org.springframework.http.codec.json.Jackson2JsonEncoder;
2528
import org.springframework.util.ResourceUtils;
2629
import org.springframework.util.unit.DataSize;
30+
import org.springframework.web.reactive.function.client.ClientRequest;
31+
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
2732
import org.springframework.web.reactive.function.client.WebClient;
33+
import reactor.core.publisher.Mono;
2834
import reactor.netty.http.client.HttpClient;
2935

36+
3037
public class WebClientConfigurator {
3138

3239
private final WebClient.Builder builder = WebClient.builder();
@@ -45,6 +52,38 @@ private static ObjectMapper defaultOM() {
4552
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
4653
}
4754

55+
public WebClientConfigurator configureGcpBearerAuth(boolean enabled) {
56+
if (enabled) {
57+
System.out.println("Configuring GCP Bearer Auth");
58+
builder.filter(createGcpBearerAuthFilter());
59+
}
60+
return this;
61+
}
62+
63+
private ExchangeFilterFunction createGcpBearerAuthFilter() {
64+
return (request, next) -> {
65+
return Mono.fromCallable(() -> {
66+
try {
67+
// Get credentials using Application Default Credentials (from the GKE service account)
68+
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault()
69+
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
70+
71+
credentials.refreshIfExpired();
72+
return credentials.getAccessToken().getTokenValue();
73+
} catch (IOException e) {
74+
throw new RuntimeException("Failed to get GCP access token", e);
75+
}
76+
})
77+
.flatMap(token -> {
78+
ClientRequest newRequest = ClientRequest.from(request)
79+
// Add the Authorization header
80+
.headers(headers -> headers.setBearerAuth(token))
81+
.build();
82+
return next.exchange(newRequest);
83+
});
84+
};
85+
}
86+
4887
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
4988
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
5089
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {

api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
3535
@BeforeEach
3636
void init() {
3737
serde = new SchemaRegistrySerde();
38-
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
38+
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
3939
}
4040

4141
@ParameterizedTest
@@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {
135135

136136
@BeforeEach
137137
void init() {
138-
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false);
138+
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false);
139139
}
140140

141141
@Test
@@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {
151151

152152
@BeforeEach
153153
void init() {
154-
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
154+
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
155155
}
156156

157157
@Test

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4341,6 +4341,8 @@ components:
43414341
type: string
43424342
keystorePassword:
43434343
type: string
4344+
gcpSchemaRegistry:
4345+
type: boolean
43444346
ksqldbServer:
43454347
type: string
43464348
ksqldbServerSsl:

contract/src/main/resources/swagger/kafka-sr-api.yaml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,13 @@ components:
381381
properties:
382382
compatibilityLevel:
383383
$ref: '#/components/schemas/Compatibility'
384-
required:
385-
- compatibilityLevel
384+
# GCP Managed Kafka Schema registries specific fields
385+
alias:
386+
type: string
387+
compatibility:
388+
$ref: '#/components/schemas/Compatibility'
389+
normalize:
390+
type: boolean
386391

387392
CompatibilityLevelChange:
388393
type: object

0 commit comments

Comments
 (0)