Skip to content

Commit 73c7684

Browse files
committed
Flink 2 support prerequisites
* Honor getUseDataStreamForBatch pipeline option for Flink portable runner * Refactor gradle scripts in preparation for Flink 2 support
1 parent d7cd81a commit 73c7684

File tree

7 files changed

+50
-22
lines changed

7 files changed

+50
-22
lines changed

runners/flink/flink_runner.gradle

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import groovy.json.JsonOutput
2828
def base_path = ".."
2929

3030
def overrides(versions, type, base_path) {
31-
versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
31+
// order is important
32+
["${base_path}/src/${type}/java"] + versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
3233
}
3334

3435
def all_versions = flink_versions.split(",")
@@ -49,7 +50,8 @@ applyJavaNature(
4950
automaticModuleName: 'org.apache.beam.runners.flink',
5051
archivesBaseName: archivesBaseName,
5152
// flink runner jars are in same package name. Publish javadoc once.
52-
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
53+
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()),
54+
requireJavaVersion: project.ext.flink_major.startsWith('2') ? JavaVersion.VERSION_11 : null
5355
)
5456

5557
description = "Apache Beam :: Runners :: Flink $flink_version"
@@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java")
6870
*/
6971
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
7072

71-
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
72-
it.from main_source_overrides
73-
it.into "${sourceOverridesBase}/main/java"
74-
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
73+
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
74+
copyTask.from main_source_overrides
75+
copyTask.into "${sourceOverridesBase}/main/java"
76+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
77+
78+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
79+
project.ext.excluded_files.main.each { file ->
80+
copyTask.exclude "**/${file}"
81+
}
82+
}
7583
}
7684

7785
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
@@ -119,15 +127,15 @@ def sourceBase = "${project.projectDir}/../src"
119127
sourceSets {
120128
main {
121129
java {
122-
srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
130+
srcDirs = ["${sourceOverridesBase}/main/java"]
123131
}
124132
resources {
125133
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
126134
}
127135
}
128136
test {
129137
java {
130-
srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
138+
srcDirs = ["${sourceOverridesBase}/test/java"]
131139
}
132140
resources {
133141
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
@@ -196,7 +204,10 @@ dependencies {
196204

197205
implementation "org.apache.flink:flink-core:$flink_version"
198206
implementation "org.apache.flink:flink-metrics-core:$flink_version"
199-
implementation "org.apache.flink:flink-java:$flink_version"
207+
if (project.ext.flink_major.startsWith('1')) {
208+
// FLINK-36336: dataset API removed in Flink 2
209+
implementation "org.apache.flink:flink-java:$flink_version"
210+
}
200211

201212
implementation "org.apache.flink:flink-runtime:$flink_version"
202213
implementation "org.apache.flink:flink-metrics-core:$flink_version"

runners/flink/job-server-container/flink_job_server_container.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) {
5353
}
5454

5555
def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers")
56+
def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : "flink${project.parent.name}_job_server"
57+
def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version
58+
if (project.parent.name.startsWith("2")) {
59+
containerTag += "-flink" + ${project.parent.name}
60+
}
5661

5762
docker {
5863
name containerImageName(
59-
name: project.docker_image_default_repo_prefix + "flink${project.parent.name}_job_server",
64+
name: project.docker_image_default_repo_prefix + containerName,
6065
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
6166
project.rootProject["docker-repository-root"] :
6267
project.docker_image_default_repo_root,
63-
tag: project.rootProject.hasProperty(["docker-tag"]) ?
64-
project.rootProject["docker-tag"] : project.sdk_version)
68+
tag: containerTag)
6569
// tags used by dockerTag task
6670
tags containerImageTags()
6771
files "./build/"

runners/flink/job-server/flink_job_server.gradle

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ apply plugin: 'application'
2929
// we need to set mainClassName before applying shadow plugin
3030
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
3131

32+
// Resolve the Flink project name (and version) the job-server is based on
33+
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
34+
evaluationDependsOn(flinkRunnerProject)
35+
boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2')
36+
3237
applyJavaNature(
3338
automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
3439
archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName,
@@ -37,11 +42,9 @@ applyJavaNature(
3742
shadowClosure: {
3843
append "reference.conf"
3944
},
45+
requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null
4046
)
4147

42-
// Resolve the Flink project name (and version) the job-server is based on
43-
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
44-
4548
description = project(flinkRunnerProject).description + " :: Job Server"
4649

4750
/*
@@ -126,18 +129,22 @@ runShadow {
126129
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
127130
}
128131

129-
def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) {
132+
def portableValidatesRunnerTask(String name, String mode, boolean checkpointing, boolean docker) {
130133
def pipelineOptions = [
131134
// Limit resource consumption via parallelism
132135
"--parallelism=2",
133136
]
137+
boolean streaming = (mode == "streaming")
134138
if (streaming) {
135139
pipelineOptions += "--streaming"
136140
if (checkpointing) {
137141
pipelineOptions += "--checkpointingInterval=3000"
138142
pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
139143
}
140144
}
145+
if (mode == "batch-datastream") {
146+
pipelineOptions += "--useDataStreamForBatch=true"
147+
}
141148
createPortableValidatesRunnerTask(
142149
name: "validatesPortableRunner${name}",
143150
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
@@ -214,14 +221,18 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
214221
)
215222
}
216223

217-
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true)
218-
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false)
219-
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false)
220-
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
224+
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", "batch", false, true)
225+
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", "batch", false, false)
226+
project.ext.validatesPortableRunnerBatchDataStream = portableValidatesRunnerTask("Batch", "batch-datastream", false, false)
227+
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", "streaming", false, false)
228+
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false)
221229

222230
tasks.register("validatesPortableRunner") {
223231
dependsOn validatesPortableRunnerDocker
224-
dependsOn validatesPortableRunnerBatch
232+
if (!isFlink2) {
233+
dependsOn validatesPortableRunnerBatch
234+
}
235+
dependsOn validatesPortableRunnerBatchDataStream
225236
dependsOn validatesPortableRunnerStreaming
226237
dependsOn validatesPortableRunnerStreamingCheckpoint
227238
}

runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) thro
8484
SdkHarnessOptions.getConfiguredLoggerFromOptions(pipelineOptions.as(SdkHarnessOptions.class));
8585

8686
FlinkPortablePipelineTranslator<?> translator;
87-
if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
87+
if (!pipelineOptions.getUseDataStreamForBatch()
88+
&& !pipelineOptions.isStreaming()
89+
&& !hasUnboundedPCollections(pipeline)) {
8890
// TODO: Do we need to inspect for unbounded sources before fusing?
8991
translator = FlinkBatchPortablePipelineTranslator.createTranslator();
9092
} else {

runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java renamed to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java

File renamed without changes.

runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java renamed to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java

File renamed without changes.

runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java renamed to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java

File renamed without changes.

0 commit comments

Comments
 (0)