@@ -22,25 +22,43 @@ import org.apache.tools.ant.taskdefs.condition.Os
2222
2323def pythonRootDir = " ${ rootDir} /sdks/python"
2424def pythonVersionSuffix = project. ext. pythonVersion. replace(' .' , ' ' )
25+ // TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support
2526def latestFlinkVersion = project. ext. latestFlinkVersion
27+ def latestFlink2Version = ' 2.0'
2628def currentJavaVersion = project. ext. currentJavaVersion
2729
2830ext {
2931 pythonContainerTask = " :sdks:python:container:py${ pythonVersionSuffix} :docker"
3032}
3133
32- def createFlinkRunnerTestTask (String workerType ) {
33- def taskName = " flinkCompatibilityMatrix${ workerType} "
34- // project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
35- def jobServerJar = " ${ rootDir} /runners/flink/${ latestFlinkVersion} /job-server/build/libs/beam-runners-flink-${ latestFlinkVersion} -job-server-${ version} .jar"
34+ def createFlink2ConfigDir = tasks. register(" createFlink2ConfigDir" , Copy ) {
35+ from file(" ${ project.rootDir} /runners/flink/2.0/src/test/resources/flink-test-config.yaml" )
36+ into file(" ${ project.buildDir} /flinkConf" )
37+ rename { String fileName -> ' config.yaml' }
38+ }
39+
40+ def createFlinkRunnerTestTask (String workerType , String flinkVersion ) {
41+ String taskName
42+
43+ // project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
44+ def jobServerJar = " ${ rootDir} /runners/flink/${ flinkVersion} /job-server/build/libs/beam-runners-flink-${ flinkVersion} -job-server-${ version} .jar"
3645 def options = " --flink_job_server_jar=${ jobServerJar} --environment_type=${ workerType} "
46+ if flinkVersion. startsWith(' 2' ) {
47+ taskName = " flink2CompatibilityMatrix${ workerType} "
48+ options + = " --flink_conf_dir=${ buildDir.absolutePath} /flinkConf"
49+ } else {
50+ taskName = " flinkCompatibilityMatrix${ workerType} "
51+ }
3752 if (workerType == ' PROCESS' ) {
3853 options + = " --environment_options=process_command=${ buildDir.absolutePath} /sdk_worker.sh"
3954 }
4055 def task = toxTask(taskName, ' flink-runner-test' , options)
4156 // Through the Flink job server, we transitively add dependencies on the expansion services needed in tests.
4257 task. configure {
43- dependsOn " :runners:flink:${ latestFlinkVersion} :job-server:shadowJar"
58+ if flinkVersion. startsWith(' 2' ) {
59+ dependsOn createFlink2ConfigDir
60+ }
61+ dependsOn " :runners:flink:${ flinkVersion} :job-server:shadowJar"
4462 // The Java SDK worker is required to execute external transforms.
4563 def suffix = getSupportedJavaVersion()
4664 dependsOn " :sdks:java:container:${ suffix} :docker"
@@ -53,31 +71,19 @@ def createFlinkRunnerTestTask(String workerType) {
5371 return task
5472}
5573
56- createFlinkRunnerTestTask(' DOCKER' )
57- createFlinkRunnerTestTask(' PROCESS' )
58- createFlinkRunnerTestTask(' LOOPBACK' )
74+ createFlinkRunnerTestTask(' DOCKER' , latestFlinkVersion)
75+ createFlinkRunnerTestTask(' PROCESS' , latestFlinkVersion)
76+ createFlinkRunnerTestTask(' LOOPBACK' , latestFlinkVersion)
77+ createFlinkRunnerTestTask(' DOCKER' , latestFlink2Version)
78+ createFlinkRunnerTestTask(' PROCESS' , latestFlink2Version)
79+ createFlinkRunnerTestTask(' LOOPBACK' , latestFlink2Version)
5980
6081task flinkValidatesRunner () {
6182 dependsOn ' flinkCompatibilityMatrixLOOPBACK'
6283}
6384
64- // TODO(https://github.com/apache/beam/issues/19962): Enable on pre-commit.
65- tasks. register(" flinkTriggerTranscript" ) {
66- dependsOn ' setupVirtualenv'
67- dependsOn " :runners:flink:${ latestFlinkVersion} :job-server:shadowJar"
68- doLast {
69- exec {
70- executable ' sh'
71- args ' -c' , """
72- . ${ envdir} /bin/activate \\
73- && cd ${ pythonRootDir} \\
74- && pip install -e .[test] \\
75- && pytest \\
76- apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
77- --test-pipeline-options='--runner=FlinkRunner --environment_type=LOOPBACK --flink_job_server_jar=${ project(":runners:flink:${latestFlinkVersion}:job-server:").shadowJar.archivePath} '
78- """
79- }
80- }
85+ task flink2ValidatesRunner () {
86+ dependsOn ' flink2CompatibilityMatrixLOOPBACK'
8187}
8288
8389// Verifies BEAM-10702.
0 commit comments