2121import static org .apache .beam .sdk .util .construction .BeamUrns .getUrn ;
2222
2323import com .google .auto .service .AutoService ;
24+ import io .confluent .kafka .serializers .KafkaAvroDeserializerConfig ;
2425import java .io .FileOutputStream ;
2526import java .io .IOException ;
2627import java .nio .ByteBuffer ;
@@ -132,6 +133,13 @@ public List<String> outputCollectionNames() {
132133
133134 static class KafkaReadSchemaTransform extends SchemaTransform {
134135 private final KafkaReadSchemaTransformConfiguration configuration ;
136+ private static final String googleManagedSchemaRegistryPrefix =
137+ "https://managedkafka.googleapis.com/" ;
138+
139+ enum SchemaRegistryProvider {
140+ UNSPECIFIED ,
141+ GOOGLE_MANAGED
142+ }
135143
136144 KafkaReadSchemaTransform (KafkaReadSchemaTransformConfiguration configuration ) {
137145 this .configuration = configuration ;
@@ -151,6 +159,13 @@ Row getConfigurationRow() {
151159 }
152160 }
153161
162+ private SchemaRegistryProvider getSchemaRegistryProvider (String confluentSchemaRegUrl ) {
163+ if (confluentSchemaRegUrl .contains (googleManagedSchemaRegistryPrefix )) {
164+ return SchemaRegistryProvider .GOOGLE_MANAGED ;
165+ }
166+ return SchemaRegistryProvider .UNSPECIFIED ;
167+ }
168+
154169 @ Override
155170 public PCollectionRowTuple expand (PCollectionRowTuple input ) {
156171 configuration .validate ();
@@ -178,16 +193,41 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
178193 if (confluentSchemaRegUrl != null ) {
179194 final String confluentSchemaRegSubject =
180195 checkArgumentNotNull (configuration .getConfluentSchemaRegistrySubject ());
181- KafkaIO .Read <byte [], GenericRecord > kafkaRead =
196+ KafkaIO .Read <byte [], GenericRecord > kafkaRead ;
197+
198+ kafkaRead =
182199 KafkaIO .<byte [], GenericRecord >read ()
183200 .withTopic (configuration .getTopic ())
184201 .withConsumerFactoryFn (new ConsumerFactoryWithGcsTrustStores ())
185202 .withBootstrapServers (configuration .getBootstrapServers ())
186203 .withConsumerConfigUpdates (consumerConfigs )
187- .withKeyDeserializer (ByteArrayDeserializer .class )
188- .withValueDeserializer (
204+ .withKeyDeserializer (ByteArrayDeserializer .class );
205+
206+ SchemaRegistryProvider provider = getSchemaRegistryProvider (confluentSchemaRegUrl );
207+ switch (provider ) {
208+ case GOOGLE_MANAGED :
209+ // Custom configs to authenticate with Google's Managed Schema Registry
210+ Map <String , Object > configs = new HashMap <>();
211+ configs .put (
212+ KafkaAvroDeserializerConfig .SCHEMA_REGISTRY_URL_CONFIG , confluentSchemaRegUrl );
213+ configs .put (KafkaAvroDeserializerConfig .BEARER_AUTH_CREDENTIALS_SOURCE , "CUSTOM" );
214+ configs .put (
215+ "bearer.auth.custom.provider.class" ,
216+ "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider" );
217+
218+ LOG .info ("Constructing read transform with Google Managed Schema Registry URL." );
219+ kafkaRead =
220+ kafkaRead .withValueDeserializer (
221+ ConfluentSchemaRegistryDeserializerProvider .of (
222+ confluentSchemaRegUrl , confluentSchemaRegSubject , null , configs ));
223+ break ;
224+ case UNSPECIFIED :
225+ kafkaRead =
226+ kafkaRead .withValueDeserializer (
189227 ConfluentSchemaRegistryDeserializerProvider .of (
190228 confluentSchemaRegUrl , confluentSchemaRegSubject ));
229+ }
230+
191231 Integer maxReadTimeSeconds = configuration .getMaxReadTimeSeconds ();
192232 if (maxReadTimeSeconds != null ) {
193233 kafkaRead = kafkaRead .withMaxReadTime (Duration .standardSeconds (maxReadTimeSeconds ));
0 commit comments