Skip to content

Commit f57e4db

Browse files
committed
Flink 2 support prerequisites
* Honor getUseDataStreamForBatch pipeline option for Flink portable runner * Refactor gradle scripts in preparation for Flink 2 support * Create a PostCommit run validate runner tests on legacy DataSet
1 parent dad0287 commit f57e4db

File tree

9 files changed

+222
-55
lines changed

9 files changed

+222
-55
lines changed

.github/workflows/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get
344344
| [ PostCommit Java Nexmark Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml) | N/A |`beam_PostCommit_Java_Nexmark_Direct.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml?query=event%3Aschedule) |
345345
| [ PostCommit Java Nexmark Flink ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml) | N/A |`beam_PostCommit_Java_Nexmark_Flink.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml?query=event%3Aschedule) |
346346
| [ PostCommit Java Nexmark Spark ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml) | N/A |`beam_PostCommit_Java_Nexmark_Spark.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml?query=event%3Aschedule) |
347+
| [ PostCommit Java PVR Flink Batch ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Batch.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml?query=event%3Aschedule) |
347348
| [ PostCommit Java PVR Flink Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Aschedule) |
348349
| [ PostCommit Java PVR Samza ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml) | N/A |`beam_PostCommit_Java_PVR_Samza.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Samza.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule) |
349350
| [ PostCommit Java SingleStoreIO IT ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml) | N/A |`beam_PostCommit_Java_SingleStoreIO_IT.json`| [![.github/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml?query=event%3Aschedule) |
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
name: PostCommit Java PVR Flink Batch
19+
20+
on:
21+
push:
22+
tags: ['v*']
23+
branches: ['master', 'release-*']
24+
paths:
25+
- 'runners/flink/**'
26+
- 'runners/java-fn-execution/**'
27+
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
28+
- '.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml'
29+
pull_request_target:
30+
branches: ['master', 'release-*']
31+
paths:
32+
- 'release/trigger_all_tests.json'
33+
- '.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json'
34+
schedule:
35+
- cron: '15 2/6 * * *'
36+
workflow_dispatch:
37+
38+
# This allows a subsequently queued workflow run to interrupt previous runs
39+
concurrency:
40+
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
41+
cancel-in-progress: true
42+
43+
#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
44+
permissions:
45+
actions: write
46+
pull-requests: write
47+
checks: write
48+
contents: read
49+
deployments: read
50+
id-token: none
51+
issues: write
52+
discussions: read
53+
packages: read
54+
pages: read
55+
repository-projects: read
56+
security-events: read
57+
statuses: read
58+
59+
env:
60+
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
61+
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
62+
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
63+
64+
jobs:
65+
beam_PostCommit_Java_PVR_Flink_Batch:
66+
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
67+
strategy:
68+
matrix:
69+
job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
70+
job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
71+
timeout-minutes: 240
72+
runs-on: [self-hosted, ubuntu-20.04, highmem]
73+
if: |
74+
github.event_name == 'push' ||
75+
github.event_name == 'pull_request_target' ||
76+
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
77+
github.event_name == 'workflow_dispatch' ||
78+
github.event.comment.body == 'Run Java_PVR_Flink_Batch PostCommit'
79+
steps:
80+
- uses: actions/checkout@v4
81+
- name: Setup repository
82+
uses: ./.github/actions/setup-action
83+
with:
84+
comment_phrase: ${{ matrix.job_phrase }}
85+
github_token: ${{ secrets.GITHUB_TOKEN }}
86+
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
87+
- name: Setup environment
88+
uses: ./.github/actions/setup-environment-action
89+
- name: run validatesPortableRunnerBatch script
90+
uses: ./.github/actions/gradle-command-self-hosted-action
91+
with:
92+
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
93+
env:
94+
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
95+
- name: Archive JUnit Test Results
96+
uses: actions/upload-artifact@v4
97+
if: ${{ !success() }}
98+
with:
99+
name: JUnit Test Results
100+
path: "**/build/reports/tests/"
101+
- name: Upload test report
102+
uses: actions/upload-artifact@v4
103+
with:
104+
name: java-code-coverage-report
105+
path: "**/build/test-results/**/*.xml"
106+
# TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2.

runners/flink/flink_runner.gradle

Lines changed: 83 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import groovy.json.JsonOutput
2828
def base_path = ".."
2929

3030
def overrides(versions, type, base_path) {
31-
versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
31+
// order is important
32+
["${base_path}/src/${type}/java"] + versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
3233
}
3334

3435
def all_versions = flink_versions.split(",")
@@ -49,7 +50,8 @@ applyJavaNature(
4950
automaticModuleName: 'org.apache.beam.runners.flink',
5051
archivesBaseName: archivesBaseName,
5152
// flink runner jars are in same package name. Publish javadoc once.
52-
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
53+
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()),
54+
requireJavaVersion: project.ext.flink_major.startsWith('2') ? JavaVersion.VERSION_11 : null
5355
)
5456

5557
description = "Apache Beam :: Runners :: Flink $flink_version"
@@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java")
6870
*/
6971
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()
7072

71-
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
72-
it.from main_source_overrides
73-
it.into "${sourceOverridesBase}/main/java"
74-
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
73+
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
74+
copyTask.from main_source_overrides
75+
copyTask.into "${sourceOverridesBase}/main/java"
76+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
77+
78+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
79+
project.ext.excluded_files.main.each { file ->
80+
copyTask.exclude "**/${file}"
81+
}
82+
}
7583
}
7684

7785
def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
@@ -80,10 +88,16 @@ def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
8088
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
8189
}
8290

83-
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
84-
it.from test_source_overrides
85-
it.into "${sourceOverridesBase}/test/java"
86-
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
91+
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask ->
92+
copyTask.from test_source_overrides
93+
copyTask.into "${sourceOverridesBase}/test/java"
94+
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
95+
96+
if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) {
97+
project.ext.excluded_files.test.each { file ->
98+
copyTask.exclude "**/${file}"
99+
}
100+
}
87101
}
88102

89103
def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
@@ -92,45 +106,69 @@ def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Co
92106
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
93107
}
94108

95-
// add dependency to gradle Java plugin defined tasks
96-
compileJava.dependsOn copySourceOverrides
97-
processResources.dependsOn copyResourcesOverrides
98-
compileTestJava.dependsOn copyTestSourceOverrides
99-
processTestResources.dependsOn copyTestResourcesOverrides
100-
101-
// add dependency BeamModulePlugin defined custom tasks
102-
// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc)
103-
def sourcesJar = project.tasks.findByName('sourcesJar')
104-
if (sourcesJar != null) {
105-
sourcesJar.dependsOn copySourceOverrides
106-
sourcesJar.dependsOn copyResourcesOverrides
107-
}
108-
def testSourcesJar = project.tasks.findByName('testSourcesJar')
109-
if (testSourcesJar != null) {
110-
testSourcesJar.dependsOn copyTestSourceOverrides
111-
testSourcesJar.dependsOn copyTestResourcesOverrides
112-
}
109+
def use_override = (flink_major != all_versions.first())
110+
def sourceBase = "${project.projectDir}/../src"
113111

114-
/*
112+
if (use_override) {
113+
// Copy original+version specific sources to a tmp dir and use it as sourceSet
114+
// add dependency to gradle Java plugin defined tasks
115+
compileJava.dependsOn copySourceOverrides
116+
processResources.dependsOn copyResourcesOverrides
117+
compileTestJava.dependsOn copyTestSourceOverrides
118+
processTestResources.dependsOn copyTestResourcesOverrides
119+
120+
// add dependency BeamModulePlugin defined custom tasks
121+
// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc)
122+
def sourcesJar = project.tasks.findByName('sourcesJar')
123+
if (sourcesJar != null) {
124+
sourcesJar.dependsOn copySourceOverrides
125+
sourcesJar.dependsOn copyResourcesOverrides
126+
}
127+
def testSourcesJar = project.tasks.findByName('testSourcesJar')
128+
if (testSourcesJar != null) {
129+
testSourcesJar.dependsOn copyTestSourceOverrides
130+
testSourcesJar.dependsOn copyTestResourcesOverrides
131+
}
132+
/*
115133
* We have to explicitly set all directories here to make sure each
116134
* version of Flink has the correct overrides set.
117135
*/
118-
def sourceBase = "${project.projectDir}/../src"
119-
sourceSets {
120-
main {
121-
java {
122-
srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
136+
sourceSets {
137+
main {
138+
java {
139+
srcDirs = ["${sourceOverridesBase}/main/java"]
140+
}
141+
resources {
142+
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
143+
}
123144
}
124-
resources {
125-
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
145+
test {
146+
java {
147+
srcDirs = ["${sourceOverridesBase}/test/java"]
148+
}
149+
resources {
150+
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
151+
}
126152
}
127153
}
128-
test {
129-
java {
130-
srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
154+
} else {
155+
// Use the original sources directly for the lowest supported Flink version.
156+
sourceSets {
157+
main {
158+
java {
159+
srcDirs = ["${sourceBase}/main/java"]
160+
}
161+
resources {
162+
srcDirs = ["${sourceBase}/main/resources"]
163+
}
131164
}
132-
resources {
133-
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
165+
test {
166+
java {
167+
srcDirs = ["${sourceBase}/test/java"]
168+
}
169+
resources {
170+
srcDirs = ["${sourceBase}/test/resources"]
171+
}
134172
}
135173
}
136174
}
@@ -196,7 +234,10 @@ dependencies {
196234

197235
implementation "org.apache.flink:flink-core:$flink_version"
198236
implementation "org.apache.flink:flink-metrics-core:$flink_version"
199-
implementation "org.apache.flink:flink-java:$flink_version"
237+
if (project.ext.flink_major.startsWith('1')) {
238+
// FLINK-36336: dataset API removed in Flink 2
239+
implementation "org.apache.flink:flink-java:$flink_version"
240+
}
200241

201242
implementation "org.apache.flink:flink-runtime:$flink_version"
202243
implementation "org.apache.flink:flink-metrics-core:$flink_version"

runners/flink/job-server-container/flink_job_server_container.gradle

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) {
5353
}
5454

5555
def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers")
56+
def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : "flink${project.parent.name}_job_server"
57+
def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version
58+
if (project.parent.name.startsWith("2")) {
59+
containerTag += "-flink${project.parent.name}"
60+
}
5661

5762
docker {
5863
name containerImageName(
59-
name: project.docker_image_default_repo_prefix + "flink${project.parent.name}_job_server",
64+
name: project.docker_image_default_repo_prefix + containerName,
6065
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
6166
project.rootProject["docker-repository-root"] :
6267
project.docker_image_default_repo_root,
63-
tag: project.rootProject.hasProperty(["docker-tag"]) ?
64-
project.rootProject["docker-tag"] : project.sdk_version)
68+
tag: containerTag)
6569
// tags used by dockerTag task
6670
tags containerImageTags()
6771
files "./build/"

runners/flink/job-server/flink_job_server.gradle

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ apply plugin: 'application'
2929
// we need to set mainClassName before applying shadow plugin
3030
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
3131

32+
// Resolve the Flink project name (and version) the job-server is based on
33+
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
34+
evaluationDependsOn(flinkRunnerProject)
35+
boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2')
36+
3237
applyJavaNature(
3338
automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
3439
archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName,
@@ -37,11 +42,9 @@ applyJavaNature(
3742
shadowClosure: {
3843
append "reference.conf"
3944
},
45+
requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null
4046
)
4147

42-
// Resolve the Flink project name (and version) the job-server is based on
43-
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
44-
4548
description = project(flinkRunnerProject).description + " :: Job Server"
4649

4750
/*
@@ -126,18 +129,22 @@ runShadow {
126129
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
127130
}
128131

129-
def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) {
132+
def portableValidatesRunnerTask(String name, String mode, boolean checkpointing, boolean docker) {
130133
def pipelineOptions = [
131134
// Limit resource consumption via parallelism
132135
"--parallelism=2",
133136
]
137+
boolean streaming = (mode == "streaming")
134138
if (streaming) {
135139
pipelineOptions += "--streaming"
136140
if (checkpointing) {
137141
pipelineOptions += "--checkpointingInterval=3000"
138142
pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
139143
}
140144
}
145+
if (mode == "batch") {
146+
pipelineOptions += "--useDataStreamForBatch=true"
147+
}
141148
createPortableValidatesRunnerTask(
142149
name: "validatesPortableRunner${name}",
143150
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
@@ -186,7 +193,9 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
186193
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
187194
return
188195
}
189-
196+
if (mode == "batch") {
197+
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
198+
}
190199
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
191200
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
192201
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
@@ -214,13 +223,17 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
214223
)
215224
}
216225

217-
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true)
218-
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false)
219-
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false)
220-
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
226+
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", "batch", false, true)
227+
project.ext.validatesPortableRunnerBatchDataSet = portableValidatesRunnerTask("BatchDataSet", "batch-dataset", false, false)
228+
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", "batch", false, false)
229+
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", "streaming", false, false)
230+
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false)
221231

222232
tasks.register("validatesPortableRunner") {
223233
dependsOn validatesPortableRunnerDocker
234+
if (!isFlink2) {
235+
dependsOn validatesPortableRunnerBatchDataSet
236+
}
224237
dependsOn validatesPortableRunnerBatch
225238
dependsOn validatesPortableRunnerStreaming
226239
dependsOn validatesPortableRunnerStreamingCheckpoint

0 commit comments

Comments
 (0)