Skip to content

Commit a4aa6f1

Browse files
committed
Allow configuration to be serialized
1 parent 3f01f70 commit a4aa6f1

File tree

1 file changed

+24
-11
lines changed

1 file changed

+24
-11
lines changed

sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private abstract static class CrossLanguageConfiguration {
6262
String awsAccessKey;
6363
String awsSecretKey;
6464
Region region;
65-
@Nullable URI serviceEndpoint;
65+
@Nullable String serviceEndpoint;
6666

6767
public void setStreamName(String streamName) {
6868
this.streamName = streamName;
@@ -81,14 +81,7 @@ public void setRegion(String region) {
8181
}
8282

8383
public void setServiceEndpoint(@Nullable String serviceEndpoint) {
84-
if (serviceEndpoint != null) {
85-
try {
86-
this.serviceEndpoint = new URI(serviceEndpoint);
87-
} catch (URISyntaxException ex) {
88-
throw new RuntimeException(
89-
String.format("Service endpoint must be URI format, got: %s", serviceEndpoint));
90-
}
91-
}
84+
this.serviceEndpoint = serviceEndpoint;
9285
}
9386
}
9487

@@ -111,14 +104,24 @@ public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal(
111104
AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey);
112105
StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds);
113106
SerializableFunction<byte[], byte[]> serializer = v -> v;
107+
@Nullable URI endpoint;
108+
if (configuration.serviceEndpoint != null) {
109+
try {
110+
endpoint = configuration.serviceEndpoint;
111+
}
112+
catch (URISyntaxException ex) {
113+
throw new RuntimeException(
114+
String.format("Service endpoint must be URI format, got: %s", serviceEndpoint));
115+
}
116+
}
114117
KinesisIO.Write<byte[]> writeTransform =
115118
KinesisIO.<byte[]>write()
116119
.withStreamName(configuration.streamName)
117120
.withClientConfiguration(
118121
ClientConfiguration.builder()
119122
.credentialsProvider(provider)
120123
.region(configuration.region)
121-
.endpoint(configuration.serviceEndpoint)
124+
.endpoint(endpoint)
122125
.build())
123126
.withPartitioner(p -> configuration.partitionKey)
124127
.withSerializer(serializer);
@@ -211,14 +214,24 @@ public PTransform<PBegin, PCollection<byte[]>> buildExternal(
211214
AwsBasicCredentials creds =
212215
AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey);
213216
StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds);
217+
@Nullable URI endpoint;
218+
if (configuration.serviceEndpoint != null) {
219+
try {
220+
endpoint = configuration.serviceEndpoint;
221+
}
222+
catch (URISyntaxException ex) {
223+
throw new RuntimeException(
224+
String.format("Service endpoint must be URI format, got: %s", serviceEndpoint));
225+
}
226+
}
214227
KinesisIO.Read readTransform =
215228
KinesisIO.read()
216229
.withStreamName(configuration.streamName)
217230
.withClientConfiguration(
218231
ClientConfiguration.builder()
219232
.credentialsProvider(provider)
220233
.region(configuration.region)
221-
.endpoint(configuration.serviceEndpoint)
234+
.endpoint(endpoint)
222235
.build());
223236

224237
if (configuration.maxNumRecords != null) {

0 commit comments

Comments
 (0)