Skip to content

Commit 1763e7b

Browse files
authored
Fix loopback (#34678)
* added the tests * updated the test * fixed the comments
1 parent 9fd7d09 commit 1763e7b

File tree

3 files changed

+29
-11
lines changed

3 files changed

+29
-11
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,20 +219,17 @@ public static Environment createDockerEnvironment(String dockerImageUrl) {
219219
}
220220

221221
private static Environment createExternalEnvironment(String externalServiceAddress) {
222-
if (externalServiceAddress.isEmpty()) {
223-
throw new IllegalArgumentException(
224-
String.format(
225-
"External service address must not be empty (set it using '--environmentOptions=%s=...'?).",
226-
externalServiceAddressOption));
222+
// Create the payload builder. If the address is empty, the payload will be empty,
223+
// acting as a placeholder for late binding. For example, in the LOOPBACK case,
224+
// the address is populated by PortableRunner#run before this method is called.
225+
ExternalPayload.Builder payloadBuilder = ExternalPayload.newBuilder();
226+
if (!externalServiceAddress.isEmpty()) {
227+
payloadBuilder.setEndpoint(
228+
ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build());
227229
}
228230
return Environment.newBuilder()
229231
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL))
230-
.setPayload(
231-
ExternalPayload.newBuilder()
232-
.setEndpoint(
233-
ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build())
234-
.build()
235-
.toByteString())
232+
.setPayload(payloadBuilder.build().toByteString())
236233
.build();
237234
}
238235

sdks/java/extensions/python/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
testImplementation library.java.junit
3434
testImplementation library.java.hamcrest
3535
testImplementation project(":sdks:java:core").sourceSets.test.output
36+
testImplementation project(path: ":runners:direct-java", configuration: "shadowTest")
3637
testRuntimeOnly library.java.slf4j_simple
3738
}
3839

sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
2828
import org.apache.beam.sdk.Pipeline;
2929
import org.apache.beam.sdk.coders.StringUtf8Coder;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.options.PortablePipelineOptions;
3032
import org.apache.beam.sdk.schemas.Schema;
3133
import org.apache.beam.sdk.schemas.SchemaTranslation;
3234
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
@@ -371,4 +373,22 @@ public void generateKwargsFromMap() {
371373
assertEquals(12L, (long) receivedRow.getInt64("longField"));
372374
assertEquals(15.6, (double) receivedRow.getDouble("doubleField"), 0);
373375
}
376+
377+
@Test
378+
public void testLoopbackEnvironmentWithPythonExternalTransform() {
379+
PortablePipelineOptions options =
380+
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
381+
options.setDefaultEnvironmentType("LOOPBACK");
382+
383+
Pipeline p = Pipeline.create(options);
384+
385+
PCollection<String> output =
386+
p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z")))
387+
.apply(
388+
PythonExternalTransform
389+
.<PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>
390+
from("apache_beam.GroupByKey"))
391+
.apply(Keys.create());
392+
PAssert.that(output).containsInAnyOrder("A", "B");
393+
}
374394
}

0 commit comments

Comments
 (0)