Skip to content

Commit f2aa791

Browse files
authored
Exercise Python PVR tests on Flink 2.0 (#37313)
* Exercise Flink 2.0 Python Validates Runner tests * clean up TODOs: Move Flink 2.0 as latestFlinkVersion * Fix PortableJar PostCommit * Still run Go VR on Flink 1.20
1 parent 5d6cb04 commit f2aa791

File tree

15 files changed

+118
-61
lines changed

15 files changed

+118
-61
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1,
3+
"modification": 2,
44
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
55
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
66
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"modification": "#37313"
3+
}

.github/workflows/beam_PostCommit_Python_Portable_Flink.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ jobs:
6363
job_name: ["beam_PostCommit_Python_Portable_Flink"]
6464
job_phrase: ["Run Python Portable Flink"]
6565
# TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
66-
# environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
67-
environment_type: ['DOCKER', 'LOOPBACK']
66+
# all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
67+
# Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than 'LOOPBACK')
68+
environment_type: ['DOCKER']
6869
steps:
6970
- uses: actions/checkout@v4
7071
- name: Setup repository

.github/workflows/beam_PostCommit_Python_ValidatesRunner_Flink.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ jobs:
8888
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
8989
uses: ./.github/actions/gradle-command-self-hosted-action
9090
with:
91-
gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flinkValidatesRunner
91+
gradle-command: :sdks:python:test-suites:portable:py${{steps.set_py_ver_clean.outputs.py_ver_clean}}:flink1ValidatesRunner
9292
arguments: |
9393
-PpythonVersion=${{ matrix.python_version }} \
9494
- name: Archive Python Test Results

.github/workflows/beam_PreCommit_Python_PVR_Flink.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ jobs:
106106
env:
107107
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
108108
with:
109+
# Run Flink 2 tests. Flink 1.20 is covered by PostCommit_Python_ValidatesRunner_Flink
109110
gradle-command: :sdks:python:test-suites:portable:py313:flinkValidatesRunner
110111
arguments: |
111112
-PpythonVersion=3.13 \

CHANGES.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@
6161

6262
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6363
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
64-
* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)),
65-
experimental support for other SDK languages including Python.
64+
* Flink 2.0 support ([#36947](https://github.com/apache/beam/issues/36947)).
6665

6766

6867
## I/Os

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,8 +552,7 @@ class BeamModulePlugin implements Plugin<Project> {
552552
project.ext.currentJavaVersion = getSupportedJavaVersion()
553553

554554
project.ext.allFlinkVersions = project.flink_versions.split(',')
555-
// TODO(https://github.com/apache/beam/issues/36947): Move to use project.ext.allFlinkVersions.last() when Flink 2 support completed
556-
project.ext.latestFlinkVersion = '1.20'
555+
project.ext.latestFlinkVersion = project.ext.allFlinkVersions.last()
557556

558557
project.ext.nativeArchitecture = {
559558
// Best guess as to this system's normalized native architecture name.

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
101101
import org.apache.beam.sdk.values.WindowingStrategy;
102102
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
103+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
103104
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
104105
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
105106
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -987,8 +988,11 @@ private <T> void translateTestStream(
987988
// stage
988989
String sideInputTag = sideInputId.getLocalName();
989990
String collectionId =
990-
components
991-
.getTransformsOrThrow(sideInputId.getTransformId())
991+
MoreObjects.firstNonNull(
992+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
993+
// In the case of optimized pipeline, side input transform may not be found in
994+
// component proto
995+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
992996
.getInputsOrThrow(sideInputId.getLocalName());
993997
RunnerApi.WindowingStrategy windowingStrategyProto =
994998
components.getWindowingStrategiesOrThrow(
@@ -1045,8 +1049,11 @@ private TransformedSideInputs transformSideInputs(
10451049
tagToIntMapping.put(tag, count);
10461050
count++;
10471051
String collectionId =
1048-
components
1049-
.getTransformsOrThrow(sideInput.getKey().getTransformId())
1052+
MoreObjects.firstNonNull(
1053+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1054+
stagePayload
1055+
.getComponents()
1056+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
10501057
.getInputsOrThrow(sideInput.getKey().getLocalName());
10511058
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
10521059
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1078,7 +1085,11 @@ private TransformedSideInputs transformSideInputs(
10781085
TupleTag<?> tag = sideInput.getValue().getTagInternal();
10791086
final int intTag = tagToIntMapping.get(tag);
10801087
RunnerApi.PTransform pTransform =
1081-
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
1088+
MoreObjects.firstNonNull(
1089+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1090+
stagePayload
1091+
.getComponents()
1092+
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
10821093
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
10831094
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);
10841095

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
101101
import org.apache.beam.sdk.values.WindowingStrategy;
102102
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
103+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
103104
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
104105
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
105106
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -986,8 +987,11 @@ private <T> void translateTestStream(
986987
// stage
987988
String sideInputTag = sideInputId.getLocalName();
988989
String collectionId =
989-
components
990-
.getTransformsOrThrow(sideInputId.getTransformId())
990+
MoreObjects.firstNonNull(
991+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
992+
// In the case of optimized pipeline, side input transform may not be found in
993+
// component proto
994+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
991995
.getInputsOrThrow(sideInputId.getLocalName());
992996
RunnerApi.WindowingStrategy windowingStrategyProto =
993997
components.getWindowingStrategiesOrThrow(
@@ -1044,8 +1048,11 @@ private TransformedSideInputs transformSideInputs(
10441048
tagToIntMapping.put(tag, count);
10451049
count++;
10461050
String collectionId =
1047-
components
1048-
.getTransformsOrThrow(sideInput.getKey().getTransformId())
1051+
MoreObjects.firstNonNull(
1052+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1053+
stagePayload
1054+
.getComponents()
1055+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
10491056
.getInputsOrThrow(sideInput.getKey().getLocalName());
10501057
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
10511058
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1077,7 +1084,11 @@ private TransformedSideInputs transformSideInputs(
10771084
TupleTag<?> tag = sideInput.getValue().getTagInternal();
10781085
final int intTag = tagToIntMapping.get(tag);
10791086
RunnerApi.PTransform pTransform =
1080-
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
1087+
MoreObjects.firstNonNull(
1088+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1089+
stagePayload
1090+
.getComponents()
1091+
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
10811092
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
10821093
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);
10831094

0 commit comments

Comments
 (0)