1717 */
1818package org .apache .beam .sdk .io .aws2 .kinesis ;
1919
20- import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
21- import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
22- import software .amazon .awssdk .regions .Region ;
23- import software .amazon .kinesis .common .InitialPositionInStream ;
2420import com .google .auto .service .AutoService ;
2521import java .net .URI ;
22+ import java .net .URISyntaxException ;
2623import java .util .Map ;
27- import java .util .Properties ;
2824import org .apache .beam .sdk .expansion .ExternalTransformRegistrar ;
2925import org .apache .beam .sdk .io .aws2 .common .ClientConfiguration ;
30- import org .apache .beam .sdk .io . aws2 . kinesis . KinesisIO ;
26+ import org .apache .beam .sdk .transforms . DoFn ;
3127import org .apache .beam .sdk .transforms .ExternalTransformBuilder ;
3228import org .apache .beam .sdk .transforms .PTransform ;
29+ import org .apache .beam .sdk .transforms .ParDo ;
3330import org .apache .beam .sdk .values .PBegin ;
3431import org .apache .beam .sdk .values .PCollection ;
3532import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableMap ;
3633import org .checkerframework .checker .nullness .qual .Nullable ;
3734import org .joda .time .Duration ;
3835import org .joda .time .Instant ;
36+ import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
37+ import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
38+ import software .amazon .awssdk .regions .Region ;
39+ import software .amazon .kinesis .common .InitialPositionInStream ;
3940
4041/**
41- * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for
42- * cross-language usage.
42+ * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link
43+ * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language
44+ * usage.
4345 */
4446@ AutoService (ExternalTransformRegistrar .class )
4547@ SuppressWarnings ({
@@ -79,13 +81,19 @@ public void setRegion(String region) {
7981
8082 public void setServiceEndpoint (@ Nullable String serviceEndpoint ) {
8183 if (serviceEndpoint != null ) {
82- this .serviceEndpoint = new URI (serviceEndpoint );
84+ try {
85+ this .serviceEndpoint = new URI (serviceEndpoint );
86+ } catch (URISyntaxException ex ) {
87+ throw new RuntimeException (
88+ String .format ("Service endpoint must be URI format, got: %s" , serviceEndpoint ));
89+ }
8390 }
8491 }
8592 }
8693
8794 public static class WriteBuilder
88- implements ExternalTransformBuilder <WriteBuilder .Configuration , PCollection <byte []>, KinesisIO .Write .Result > {
95+ implements ExternalTransformBuilder <
96+ WriteBuilder .Configuration , PCollection <byte []>, KinesisIO .Write .Result > {
8997
9098 public static class Configuration extends CrossLanguageConfiguration {
9199 private String partitionKey ;
@@ -96,18 +104,20 @@ public void setPartitionKey(String partitionKey) {
96104 }
97105
98106 @ Override
99- public PTransform <PCollection <byte []>, KinesisIO .Write .Result > buildExternal (Configuration configuration ) {
100- AwsBasicCredentials creds = AwsBasicCredentials .create (configuration .awsAccessKey , configuration .awsSecretKey );
107+ public PTransform <PCollection <byte []>, KinesisIO .Write .Result > buildExternal (
108+ Configuration configuration ) {
109+ AwsBasicCredentials creds =
110+ AwsBasicCredentials .create (configuration .awsAccessKey , configuration .awsSecretKey );
101111 StaticCredentialsProvider provider = StaticCredentialsProvider .create (creds );
102112 KinesisIO .Write <byte []> writeTransform =
103113 KinesisIO .<byte []>write ()
104114 .withStreamName (configuration .streamName )
105115 .withClientConfiguration (
106116 ClientConfiguration .builder ()
107- .credentialsProvider (provider )
108- .region (configuration .region )
109- .endpoint (configuration .serviceEndpoint )
110- .build ())
117+ .credentialsProvider (provider )
118+ .region (configuration .region )
119+ .endpoint (configuration .serviceEndpoint )
120+ .build ())
111121 .withPartitioner (p -> configuration .partitionKey );
112122
113123 return writeTransform ;
@@ -195,17 +205,18 @@ private enum WatermarkPolicy {
195205 @ Override
196206 public PTransform <PBegin , PCollection <byte []>> buildExternal (
197207 ReadDataBuilder .Configuration configuration ) {
198- AwsBasicCredentials creds = AwsBasicCredentials .create (configuration .awsAccessKey , configuration .awsSecretKey );
208+ AwsBasicCredentials creds =
209+ AwsBasicCredentials .create (configuration .awsAccessKey , configuration .awsSecretKey );
199210 StaticCredentialsProvider provider = StaticCredentialsProvider .create (creds );
200211 KinesisIO .Read readTransform =
201212 KinesisIO .read ()
202213 .withStreamName (configuration .streamName )
203214 .withClientConfiguration (
204215 ClientConfiguration .builder ()
205- .credentialsProvider (provider )
206- .region (configuration .region )
207- .endpoint (configuration .serviceEndpoint )
208- .build ());
216+ .credentialsProvider (provider )
217+ .region (configuration .region )
218+ .endpoint (configuration .serviceEndpoint )
219+ .build ());
209220
210221 if (configuration .maxNumRecords != null ) {
211222 readTransform = readTransform .withMaxNumRecords (configuration .maxNumRecords );
@@ -252,15 +263,34 @@ public PTransform<PBegin, PCollection<byte[]>> buildExternal(
252263 readTransform =
253264 readTransform .withInitialTimestampInStream (configuration .initialTimestampInStream );
254265 }
266+
267+ return new KinesisReadToBytes (readTransform );
268+ }
269+ }
270+
271+ public static class KinesisReadToBytes extends PTransform <PBegin , PCollection <byte []>> {
272+ private KinesisIO .Read readTransform ;
273+
274+ private KinesisReadToBytes (KinesisIO .Read readTransform ) {
275+ this .readTransform = readTransform ;
276+ }
277+
278+ @ Override
279+ public PCollection <byte []> expand (PBegin input ) {
255280 // Convert back to bytes to keep consistency with previous verison:
256281 // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
257- return readTransform .apply ("Convert to bytes" , ParDo .of (new DoFn <KiesisRecord , byte []>() {
258- @ ProcessElement
259- public void processElement (ProcessContext c ) {
260- KinesisRecord record = c .element ();
261- return record .getDataAsBytes ();
262- }
263- }));
282+ return input
283+ .apply (this .readTransform )
284+ .apply (
285+ "Convert to bytes" ,
286+ ParDo .of (
287+ new DoFn <KinesisRecord , byte []>() {
288+ @ ProcessElement
289+ public byte [] processElement (ProcessContext c ) {
290+ KinesisRecord record = c .element ();
291+ return record .getDataAsBytes ();
292+ }
293+ }));
264294 }
265295 }
266- }
296+ }
0 commit comments