@@ -22,9 +22,9 @@ import org.apache.tools.ant.taskdefs.condition.Os
2222
2323def pythonRootDir = " ${ rootDir} /sdks/python"
2424def pythonVersionSuffix = project. ext. pythonVersion. replace(' .' , ' ' )
25- // TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support
2625def latestFlinkVersion = project. ext. latestFlinkVersion
27- def latestFlink2Version = ' 2.0'
26+ // TODO(https://github.com/apache/beam/issues/36947): Remove when dropping Flink 1.x support
27+ def latestFlink1Version = ' 1.20'
2828def currentJavaVersion = project. ext. currentJavaVersion
2929
3030ext {
@@ -37,8 +37,8 @@ def createFlinkRunnerTestTask(String workerType, String flinkVersion) {
3737 // project(":runners:flink:${flinkVersion}:job-server").shadowJar.archivePath is not resolvable until runtime, so hard-code it here.
3838 def jobServerJar = " ${ rootDir} /runners/flink/${ flinkVersion} /job-server/build/libs/beam-runners-flink-${ flinkVersion} -job-server-${ version} .jar"
3939 def options = " --flink_job_server_jar=${ jobServerJar} --environment_type=${ workerType} "
40- if (flinkVersion. startsWith(' 2 ' )) {
41- taskName = " flink2CompatibilityMatrix ${ workerType} "
40+ if (flinkVersion. startsWith(' 1 ' )) {
41+ taskName = " flink1CompatibilityMatrix ${ workerType} "
4242 } else {
4343 taskName = " flinkCompatibilityMatrix${ workerType} "
4444 }
@@ -64,9 +64,9 @@ def createFlinkRunnerTestTask(String workerType, String flinkVersion) {
6464createFlinkRunnerTestTask(' DOCKER' , latestFlinkVersion)
6565createFlinkRunnerTestTask(' PROCESS' , latestFlinkVersion)
6666createFlinkRunnerTestTask(' LOOPBACK' , latestFlinkVersion)
67- createFlinkRunnerTestTask(' DOCKER' , latestFlink2Version )
68- createFlinkRunnerTestTask(' PROCESS' , latestFlink2Version )
69- createFlinkRunnerTestTask(' LOOPBACK' , latestFlink2Version )
67+ createFlinkRunnerTestTask(' DOCKER' , latestFlink1Version )
68+ createFlinkRunnerTestTask(' PROCESS' , latestFlink1Version )
69+ createFlinkRunnerTestTask(' LOOPBACK' , latestFlink1Version )
7070
7171task flinkValidatesRunner () {
7272 dependsOn ' flinkCompatibilityMatrixLOOPBACK'
@@ -253,7 +253,7 @@ tasks.register("prismTriggerTranscript") {
253253
254254project. tasks. register(" preCommitPy${ pythonVersionSuffix} " ) {
255255 dependsOn = [" :sdks:python:container:py${ pythonVersionSuffix} :docker" ,
256- " :runners:flink:${ latestFlinkVersion } :job-server:shadowJar" ,
256+ " :runners:flink:${ latestFlink1Version } :job-server:shadowJar" ,
257257 ' portableWordCountFlinkRunnerBatch' ,
258258 ' portableWordCountFlinkRunnerStreaming' ]
259259}
@@ -272,7 +272,7 @@ project.tasks.register("flinkExamples") {
272272 dependsOn = [
273273 ' setupVirtualenv' ,
274274 ' installGcpTest' ,
275- " :runners:flink:${ latestFlinkVersion } :job-server:shadowJar"
275+ " :runners:flink:${ latestFlink1Version } :job-server:shadowJar"
276276 ]
277277 doLast {
278278 def testOpts = [
@@ -284,7 +284,7 @@ project.tasks.register("flinkExamples") {
284284 " --project=apache-beam-testing" ,
285285 " --environment_type=LOOPBACK" ,
286286 " --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it" ,
287- " --flink_job_server_jar=${ project(":runners:flink:${latestFlinkVersion }:job-server").shadowJar.archivePath} " ,
287+ " --flink_job_server_jar=${ project(":runners:flink:${latestFlink1Version }:job-server").shadowJar.archivePath} " ,
288288 " --flink_conf_dir=${ flink_conf_dir} " ,
289289 ' --sdk_harness_log_level_overrides=' +
290290 // suppress info level flink.runtime log flood
@@ -384,7 +384,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
384384 dependsOn = [
385385 ' setupVirtualenv' ,
386386 ' installGcpTest' ,
387- " :runners:flink:${ latestFlinkVersion } :job-server:shadowJar" ,
387+ " :runners:flink:${ latestFlink1Version } :job-server:shadowJar" ,
388388 " :sdks:java:container:${ fork_java_version} :docker" ,
389389 ' :sdks:java:testing:kafka-service:buildTestKafkaServiceJar' ,
390390 ' :sdks:java:io:expansion-service:shadowJar' ,
@@ -408,7 +408,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
408408 " --project=apache-beam-testing" ,
409409 " --environment_type=LOOPBACK" ,
410410 " --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it" ,
411- " --flink_job_server_jar=${ project(":runners:flink:${latestFlinkVersion }:job-server").shadowJar.archivePath} " ,
411+ " --flink_job_server_jar=${ project(":runners:flink:${latestFlink1Version }:job-server").shadowJar.archivePath} " ,
412412 " --flink_conf_dir=${ flink_conf_dir} " ,
413413 ' --sdk_harness_log_level_overrides=' +
414414 // suppress info level flink.runtime log flood
@@ -440,7 +440,7 @@ project.tasks.register("xlangSpannerIOIT") {
440440 dependsOn = [
441441 ' setupVirtualenv' ,
442442 ' installGcpTest' ,
443- " :runners:flink:${ latestFlinkVersion } :job-server:shadowJar" ,
443+ " :runners:flink:${ latestFlink1Version } :job-server:shadowJar" ,
444444 " :sdks:java:container:${ currentJavaVersion} :docker" ,
445445 ' :sdks:java:io:expansion-service:shadowJar' ,
446446 ' :sdks:java:io:google-cloud-platform:expansion-service:shadowJar' ,
@@ -459,7 +459,7 @@ project.tasks.register("xlangSpannerIOIT") {
459459 " --project=apache-beam-testing" ,
460460 " --environment_type=LOOPBACK" ,
461461 " --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it" ,
462- " --flink_job_server_jar=${ project(":runners:flink:${latestFlinkVersion }:job-server").shadowJar.archivePath} " ,
462+ " --flink_job_server_jar=${ project(":runners:flink:${latestFlink1Version }:job-server").shadowJar.archivePath} " ,
463463 ' --sdk_harness_log_level_overrides=' +
464464 // suppress info level flink.runtime log flood
465465 ' {\\ "org.apache.flink.runtime\\ ":\\ "WARN\\ ",' +
@@ -504,20 +504,20 @@ def addTestJavaJarCreator(String runner, Task jobServerJarTask) {
504504}
505505
506506// TODO(BEAM-11333) Update and test multiple Flink versions.
507- addTestJavaJarCreator(" FlinkRunner" , tasks. getByPath(" :runners:flink:${ latestFlinkVersion } :job-server:shadowJar" ))
507+ addTestJavaJarCreator(" FlinkRunner" , tasks. getByPath(" :runners:flink:${ latestFlink1Version } :job-server:shadowJar" ))
508508addTestJavaJarCreator(" SparkRunner" , tasks. getByPath(" :runners:spark:3:job-server:shadowJar" ))
509509
510510def addTestFlinkUberJar (boolean saveMainSession ) {
511511 project. tasks. register(" testUberJarFlinkRunner${ saveMainSession ? 'SaveMainSession' : ''} " ) {
512- dependsOn " :runners:flink:${ latestFlinkVersion } :job-server:shadowJar"
513- dependsOn " :runners:flink:${ latestFlinkVersion } :job-server:miniCluster"
512+ dependsOn " :runners:flink:${ latestFlink1Version } :job-server:shadowJar"
513+ dependsOn " :runners:flink:${ latestFlink1Version } :job-server:miniCluster"
514514 dependsOn pythonContainerTask
515515 doLast{
516516 exec {
517517 executable " sh"
518518 def options = [
519- " --flink_job_server_jar ${ tasks.getByPath(":runners:flink:${latestFlinkVersion }:job-server:shadowJar").archivePath} " ,
520- " --flink_mini_cluster_jar ${ tasks.getByPath(":runners:flink:${latestFlinkVersion }:job-server:miniCluster").archivePath} " ,
519+ " --flink_job_server_jar ${ tasks.getByPath(":runners:flink:${latestFlink1Version }:job-server:shadowJar").archivePath} " ,
520+ " --flink_mini_cluster_jar ${ tasks.getByPath(":runners:flink:${latestFlink1Version }:job-server:miniCluster").archivePath} " ,
521521 " --env_dir ${ project.rootProject.buildDir} /gradleenv/${ project.path.hashCode()} " ,
522522 " --python_root_dir ${ project.rootDir} /sdks/python" ,
523523 " --python_version ${ project.ext.pythonVersion} " ,
0 commit comments