Skip to content

Commit f29e3d3

Browse files
committed
Exercise Flink 2.0 Python Validates Runner tests
1 parent 9ea7380 commit f29e3d3

File tree

9 files changed

+81
-44
lines changed

9 files changed

+81
-44
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}

.github/workflows/beam_PostCommit_Python_Portable_Flink.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ jobs:
6363
job_name: ["beam_PostCommit_Python_Portable_Flink"]
6464
job_phrase: ["Run Python Portable Flink"]
6565
# TODO: Enable PROCESS https://github.com/apache/beam/issues/35702
66-
# environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
67-
environment_type: ['DOCKER', 'LOOPBACK']
66+
# all environment_type: ['DOCKER', 'LOOPBACK', 'PROCESS']
67+
# Run modes not covered by PreCommit_Python_PVR_Flink (i.e. other than 'LOOPBACK')
68+
environment_type: ['DOCKER']
6869
steps:
6970
- uses: actions/checkout@v4
7071
- name: Setup repository
@@ -83,7 +84,7 @@ jobs:
8384
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
8485
uses: ./.github/actions/gradle-command-self-hosted-action
8586
with:
86-
gradle-command: :sdks:python:test-suites:portable:py310:flinkCompatibilityMatrix${{ matrix.environment_type }}
87+
gradle-command: :sdks:python:test-suites:portable:py310:flink2CompatibilityMatrix${{ matrix.environment_type }}
8788
arguments: |
8889
-PpythonVersion=3.10 \
8990
- name: Archive Python Test Results

.github/workflows/beam_PreCommit_Python_PVR_Flink.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ jobs:
106106
env:
107107
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
108108
with:
109-
gradle-command: :sdks:python:test-suites:portable:py313:flinkValidatesRunner
109+
# Run Flink 2 tests. Flink 1.20 is covered by PostCommit_Python_ValidatesRunner_Flink
110+
gradle-command: :sdks:python:test-suites:portable:py313:flink2ValidatesRunner
110111
arguments: |
111112
-PpythonVersion=3.13 \
112113
- name: Archive Python Test Results

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
101101
import org.apache.beam.sdk.values.WindowingStrategy;
102102
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
103+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
103104
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
104105
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
105106
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -987,8 +988,11 @@ private <T> void translateTestStream(
987988
// stage
988989
String sideInputTag = sideInputId.getLocalName();
989990
String collectionId =
990-
components
991-
.getTransformsOrThrow(sideInputId.getTransformId())
991+
MoreObjects.firstNonNull(
992+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
993+
// In the case of optimized pipeline, side input transform may not be found in
994+
// component proto
995+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
992996
.getInputsOrThrow(sideInputId.getLocalName());
993997
RunnerApi.WindowingStrategy windowingStrategyProto =
994998
components.getWindowingStrategiesOrThrow(
@@ -1045,8 +1049,11 @@ private TransformedSideInputs transformSideInputs(
10451049
tagToIntMapping.put(tag, count);
10461050
count++;
10471051
String collectionId =
1048-
components
1049-
.getTransformsOrThrow(sideInput.getKey().getTransformId())
1052+
MoreObjects.firstNonNull(
1053+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1054+
stagePayload
1055+
.getComponents()
1056+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
10501057
.getInputsOrThrow(sideInput.getKey().getLocalName());
10511058
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
10521059
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1078,7 +1085,11 @@ private TransformedSideInputs transformSideInputs(
10781085
TupleTag<?> tag = sideInput.getValue().getTagInternal();
10791086
final int intTag = tagToIntMapping.get(tag);
10801087
RunnerApi.PTransform pTransform =
1081-
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
1088+
MoreObjects.firstNonNull(
1089+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1090+
stagePayload
1091+
.getComponents()
1092+
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
10821093
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
10831094
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);
10841095

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder;
101101
import org.apache.beam.sdk.values.WindowingStrategy;
102102
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
103+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
103104
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
104105
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultiset;
105106
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -986,8 +987,11 @@ private <T> void translateTestStream(
986987
// stage
987988
String sideInputTag = sideInputId.getLocalName();
988989
String collectionId =
989-
components
990-
.getTransformsOrThrow(sideInputId.getTransformId())
990+
MoreObjects.firstNonNull(
991+
components.getTransformsOrDefault(sideInputId.getTransformId(), null),
992+
// In the case of optimized pipeline, side input transform may not be found in
993+
// component proto
994+
stagePayload.getComponents().getTransformsOrThrow(sideInputId.getTransformId()))
991995
.getInputsOrThrow(sideInputId.getLocalName());
992996
RunnerApi.WindowingStrategy windowingStrategyProto =
993997
components.getWindowingStrategiesOrThrow(
@@ -1044,8 +1048,11 @@ private TransformedSideInputs transformSideInputs(
10441048
tagToIntMapping.put(tag, count);
10451049
count++;
10461050
String collectionId =
1047-
components
1048-
.getTransformsOrThrow(sideInput.getKey().getTransformId())
1051+
MoreObjects.firstNonNull(
1052+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1053+
stagePayload
1054+
.getComponents()
1055+
.getTransformsOrThrow(sideInput.getKey().getTransformId()))
10491056
.getInputsOrThrow(sideInput.getKey().getLocalName());
10501057
DataStream<Object> sideInputStream = context.getDataStreamOrThrow(collectionId);
10511058
TypeInformation<Object> tpe = sideInputStream.getType();
@@ -1077,7 +1084,11 @@ private TransformedSideInputs transformSideInputs(
10771084
TupleTag<?> tag = sideInput.getValue().getTagInternal();
10781085
final int intTag = tagToIntMapping.get(tag);
10791086
RunnerApi.PTransform pTransform =
1080-
components.getTransformsOrThrow(sideInput.getKey().getTransformId());
1087+
MoreObjects.firstNonNull(
1088+
components.getTransformsOrDefault(sideInput.getKey().getTransformId(), null),
1089+
stagePayload
1090+
.getComponents()
1091+
.getTransformsOrThrow(sideInput.getKey().getTransformId()));
10811092
String collectionId = pTransform.getInputsOrThrow(sideInput.getKey().getLocalName());
10821093
DataStream<WindowedValue<?>> sideInputStream = context.getDataStreamOrThrow(collectionId);
10831094

sdks/java/extensions/sql/expansion-service/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,4 @@ shadowJar {
5656
manifest {
5757
attributes(["Multi-Release": true])
5858
}
59-
outputs.upToDateWhen { false }
6059
}

sdks/python/apache_beam/runners/portability/flink_runner_test.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import argparse
2020
import logging
21+
import platform
2122
import shlex
2223
import typing
2324
import unittest
@@ -139,6 +140,7 @@ def _create_conf_dir(cls):
139140
cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
140141

141142
# path to write Flink configuration to
143+
# Flink 1.x conf:
142144
conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
143145
file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
144146
with open(conf_path, 'w') as f:
@@ -149,6 +151,19 @@ def _create_conf_dir(cls):
149151
'metrics.reporter.file.path: %s' % cls.test_metrics_path,
150152
'metrics.scope.operator: <operator_name>',
151153
]))
154+
# Flink 2.x conf:
155+
conf_path_2 = path.join(cls.conf_dir, 'config.yaml')
156+
with open(conf_path_2, 'w') as f:
157+
f.write(
158+
'''metrics:
159+
reporters: file
160+
reporter:
161+
file:
162+
class: %s
163+
path: %s
164+
scope:
165+
operator: <operator_name>
166+
''' % (file_reporter, cls.test_metrics_path))
152167

153168
@classmethod
154169
def _subprocess_command(cls, job_port, expansion_port):
@@ -158,11 +173,12 @@ def _subprocess_command(cls, job_port, expansion_port):
158173

159174
cls._create_conf_dir()
160175
cls.expansion_port = expansion_port
161-
176+
platform_specific_opts = []
177+
if platform.system() == 'Linux':
178+
# UseContainerSupport is supported in Linux and turned on by default
179+
platform_specific_opts.append('-XX:-UseContainerSupport')
162180
try:
163-
return [
164-
'java',
165-
'-XX:-UseContainerSupport',
181+
return ['java'] + platform_specific_opts + [
166182
'--add-opens=java.base/java.lang=ALL-UNNAMED',
167183
'--add-opens=java.base/java.nio=ALL-UNNAMED',
168184
'--add-opens=java.base/java.util=ALL-UNNAMED',

sdks/python/test-suites/portable/common.gradle

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,33 @@ import org.apache.tools.ant.taskdefs.condition.Os
2222

2323
def pythonRootDir = "${rootDir}/sdks/python"
2424
def pythonVersionSuffix = project.ext.pythonVersion.replace('.', '')
25+
// TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support
2526
def latestFlinkVersion = project.ext.latestFlinkVersion
27+
def latestFlink2Version = '2.0'
2628
def currentJavaVersion = project.ext.currentJavaVersion
2729

2830
ext {
2931
pythonContainerTask = ":sdks:python:container:py${pythonVersionSuffix}:docker"
3032
}
3133

32-
def createFlinkRunnerTestTask(String workerType) {
33-
def taskName = "flinkCompatibilityMatrix${workerType}"
34-
// project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
35-
def jobServerJar = "${rootDir}/runners/flink/${latestFlinkVersion}/job-server/build/libs/beam-runners-flink-${latestFlinkVersion}-job-server-${version}.jar"
34+
def createFlinkRunnerTestTask(String workerType, String flinkVersion) {
35+
String taskName
36+
37+
// project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
38+
def jobServerJar = "${rootDir}/runners/flink/${flinkVersion}/job-server/build/libs/beam-runners-flink-${flinkVersion}-job-server-${version}.jar"
3639
def options = "--flink_job_server_jar=${jobServerJar} --environment_type=${workerType}"
40+
if (flinkVersion.startsWith('2')) {
41+
taskName = "flink2CompatibilityMatrix${workerType}"
42+
} else {
43+
taskName = "flinkCompatibilityMatrix${workerType}"
44+
}
3745
if (workerType == 'PROCESS') {
3846
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
3947
}
4048
def task = toxTask(taskName, 'flink-runner-test', options)
4149
// Through the Flink job server, we transitively add dependencies on the expansion services needed in tests.
4250
task.configure {
43-
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
51+
dependsOn ":runners:flink:${flinkVersion}:job-server:shadowJar"
4452
// The Java SDK worker is required to execute external transforms.
4553
def suffix = getSupportedJavaVersion()
4654
dependsOn ":sdks:java:container:${suffix}:docker"
@@ -53,31 +61,19 @@ def createFlinkRunnerTestTask(String workerType) {
5361
return task
5462
}
5563

56-
createFlinkRunnerTestTask('DOCKER')
57-
createFlinkRunnerTestTask('PROCESS')
58-
createFlinkRunnerTestTask('LOOPBACK')
64+
createFlinkRunnerTestTask('DOCKER', latestFlinkVersion)
65+
createFlinkRunnerTestTask('PROCESS', latestFlinkVersion)
66+
createFlinkRunnerTestTask('LOOPBACK', latestFlinkVersion)
67+
createFlinkRunnerTestTask('DOCKER', latestFlink2Version)
68+
createFlinkRunnerTestTask('PROCESS', latestFlink2Version)
69+
createFlinkRunnerTestTask('LOOPBACK', latestFlink2Version)
5970

6071
task flinkValidatesRunner() {
6172
dependsOn 'flinkCompatibilityMatrixLOOPBACK'
6273
}
6374

64-
// TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit.
65-
tasks.register("flinkTriggerTranscript") {
66-
dependsOn 'setupVirtualenv'
67-
dependsOn ":runners:flink:${latestFlinkVersion}:job-server:shadowJar"
68-
doLast {
69-
exec {
70-
executable 'sh'
71-
args '-c', """
72-
. ${envdir}/bin/activate \\
73-
&& cd ${pythonRootDir} \\
74-
&& pip install -e .[test] \\
75-
&& pytest \\
76-
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
77-
--test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath}'
78-
"""
79-
}
80-
}
75+
task flink2ValidatesRunner() {
76+
dependsOn 'flink2CompatibilityMatrixLOOPBACK'
8177
}
8278

8379
// Verifies BEAM-10702.

0 commit comments

Comments
 (0)