Skip to content

Commit 6c7bbde

Browse files
authored
Correctly override apache/beam containers for RC on Dataflow runner job submission (#36199)
1 parent 7cffae0 commit 6c7bbde

File tree

2 files changed

+53
-65
lines changed

2 files changed

+53
-65
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1040,9 +1040,12 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
10401040
&& !updated
10411041
// don't update if the container image is already configured by DataflowRunner
10421042
&& !containerImage.equals(getContainerImageForJob(options))) {
1043+
String imageAndTag =
1044+
normalizeDataflowImageAndTag(
1045+
containerImage.substring(containerImage.lastIndexOf("/")));
10431046
containerImage =
10441047
DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository()
1045-
+ containerImage.substring(containerImage.lastIndexOf("/"));
1048+
+ imageAndTag;
10461049
}
10471050
environmentBuilder.setPayload(
10481051
RunnerApi.DockerPayload.newBuilder()
@@ -1055,6 +1058,23 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
10551058
return pipelineBuilder.build();
10561059
}
10571060

1061+
static String normalizeDataflowImageAndTag(String imageAndTag) {
1062+
if (imageAndTag.startsWith("/beam_java")
1063+
|| imageAndTag.startsWith("/beam_python")
1064+
|| imageAndTag.startsWith("/beam_go_")) {
1065+
int tagIdx = imageAndTag.lastIndexOf(":");
1066+
if (tagIdx > 0) {
1067+
// For release candidates, apache/beam_ images has rc tag while Dataflow does not
1068+
String tag = imageAndTag.substring(tagIdx); // e,g, ":2.xx.0rc1"
1069+
int mayRc = tag.toLowerCase().lastIndexOf("rc");
1070+
if (mayRc > 0) {
1071+
imageAndTag = imageAndTag.substring(0, tagIdx) + tag.substring(0, mayRc);
1072+
}
1073+
}
1074+
}
1075+
return imageAndTag;
1076+
}
1077+
10581078
@VisibleForTesting
10591079
protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
10601080
RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java

Lines changed: 32 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,45 +1224,32 @@ public void testNoStagingLocationAndNoTempLocationFails() {
12241224
DataflowRunner.fromOptions(options);
12251225
}
12261226

1227+
private static RunnerApi.Pipeline containerUrlToPipeline(String url) {
1228+
return RunnerApi.Pipeline.newBuilder()
1229+
.setComponents(
1230+
RunnerApi.Components.newBuilder()
1231+
.putEnvironments(
1232+
"env",
1233+
RunnerApi.Environment.newBuilder()
1234+
.setUrn(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
1235+
.setPayload(
1236+
RunnerApi.DockerPayload.newBuilder()
1237+
.setContainerImage(url)
1238+
.build()
1239+
.toByteString())
1240+
.build()))
1241+
.build();
1242+
}
1243+
12271244
@Test
12281245
public void testApplySdkEnvironmentOverrides() throws IOException {
12291246
DataflowPipelineOptions options = buildPipelineOptions();
12301247
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest";
12311248
String gcrPythonContainerUrl = "gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest";
12321249
options.setSdkHarnessContainerImageOverrides(".*python.*," + gcrPythonContainerUrl);
12331250
DataflowRunner runner = DataflowRunner.fromOptions(options);
1234-
RunnerApi.Pipeline pipeline =
1235-
RunnerApi.Pipeline.newBuilder()
1236-
.setComponents(
1237-
RunnerApi.Components.newBuilder()
1238-
.putEnvironments(
1239-
"env",
1240-
RunnerApi.Environment.newBuilder()
1241-
.setUrn(
1242-
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
1243-
.setPayload(
1244-
RunnerApi.DockerPayload.newBuilder()
1245-
.setContainerImage(dockerHubPythonContainerUrl)
1246-
.build()
1247-
.toByteString())
1248-
.build()))
1249-
.build();
1250-
RunnerApi.Pipeline expectedPipeline =
1251-
RunnerApi.Pipeline.newBuilder()
1252-
.setComponents(
1253-
RunnerApi.Components.newBuilder()
1254-
.putEnvironments(
1255-
"env",
1256-
RunnerApi.Environment.newBuilder()
1257-
.setUrn(
1258-
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
1259-
.setPayload(
1260-
RunnerApi.DockerPayload.newBuilder()
1261-
.setContainerImage(gcrPythonContainerUrl)
1262-
.build()
1263-
.toByteString())
1264-
.build()))
1265-
.build();
1251+
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
1252+
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
12661253
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
12671254
}
12681255

@@ -1272,38 +1259,19 @@ public void testApplySdkEnvironmentOverridesByDefault() throws IOException {
12721259
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest";
12731260
String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:latest";
12741261
DataflowRunner runner = DataflowRunner.fromOptions(options);
1275-
RunnerApi.Pipeline pipeline =
1276-
RunnerApi.Pipeline.newBuilder()
1277-
.setComponents(
1278-
RunnerApi.Components.newBuilder()
1279-
.putEnvironments(
1280-
"env",
1281-
RunnerApi.Environment.newBuilder()
1282-
.setUrn(
1283-
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
1284-
.setPayload(
1285-
RunnerApi.DockerPayload.newBuilder()
1286-
.setContainerImage(dockerHubPythonContainerUrl)
1287-
.build()
1288-
.toByteString())
1289-
.build()))
1290-
.build();
1291-
RunnerApi.Pipeline expectedPipeline =
1292-
RunnerApi.Pipeline.newBuilder()
1293-
.setComponents(
1294-
RunnerApi.Components.newBuilder()
1295-
.putEnvironments(
1296-
"env",
1297-
RunnerApi.Environment.newBuilder()
1298-
.setUrn(
1299-
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
1300-
.setPayload(
1301-
RunnerApi.DockerPayload.newBuilder()
1302-
.setContainerImage(gcrPythonContainerUrl)
1303-
.build()
1304-
.toByteString())
1305-
.build()))
1306-
.build();
1262+
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
1263+
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
1264+
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
1265+
}
1266+
1267+
@Test
1268+
public void testApplySdkEnvironmentOverridesRcByDefault() throws IOException {
1269+
DataflowPipelineOptions options = buildPipelineOptions();
1270+
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:2.68.0rc2";
1271+
String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:2.68.0";
1272+
DataflowRunner runner = DataflowRunner.fromOptions(options);
1273+
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
1274+
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
13071275
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
13081276
}
13091277

0 commit comments

Comments
 (0)