Skip to content

Commit 4b478a7

Browse files
authored
Exercise Flink PVR tests on Flink 2.0 (#37538)
* Exercise Flink PVR tests * Fix avro compat test dep
1 parent f52cf93 commit 4b478a7

File tree

9 files changed

+28
-18
lines changed

9 files changed

+28
-18
lines changed

.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,13 @@ env:
6363

6464
jobs:
6565
beam_PostCommit_Java_PVR_Flink_Batch:
66-
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
66+
name: ${{ matrix.job_name }} (${{ matrix.flink_version }})
6767
strategy:
6868
matrix:
6969
job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
7070
job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
71+
# every major version
72+
flink_version: ['1.20', '2.0']
7173
timeout-minutes: 240
7274
runs-on: [self-hosted, ubuntu-20.04, highmem]
7375
if: |
@@ -83,13 +85,21 @@ jobs:
8385
with:
8486
comment_phrase: ${{ matrix.job_phrase }}
8587
github_token: ${{ secrets.GITHUB_TOKEN }}
86-
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
88+
github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }})
8789
- name: Setup environment
8890
uses: ./.github/actions/setup-environment-action
91+
- name: run validatesPortableRunnerBatchDataSet script
92+
uses: ./.github/actions/gradle-command-self-hosted-action
93+
if: startsWith(matrix.flink_version, '1')
94+
with:
95+
gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatchDataSet
96+
env:
97+
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
8998
- name: run validatesPortableRunnerBatch script
9099
uses: ./.github/actions/gradle-command-self-hosted-action
100+
if: startsWith(matrix.flink_version, '2')
91101
with:
92-
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
102+
gradle-command: :runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerBatch
93103
env:
94104
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
95105
- name: Archive JUnit Test Results
@@ -98,9 +108,4 @@ jobs:
98108
with:
99109
name: JUnit Test Results
100110
path: "**/build/reports/tests/"
101-
- name: Upload test report
102-
uses: actions/upload-artifact@v4
103-
with:
104-
name: java-code-coverage-report
105-
path: "**/build/test-results/**/*.xml"
106111
# TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2.

.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ env:
5252

5353
jobs:
5454
beam_PostCommit_Java_PVR_Flink_Streaming:
55-
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
55+
name: ${{ matrix.job_name }} (${{ matrix.flink_version }})
5656
runs-on: [self-hosted, ubuntu-20.04, main]
5757
timeout-minutes: 120
5858
strategy:
5959
matrix:
6060
job_name: [beam_PostCommit_Java_PVR_Flink_Streaming]
6161
job_phrase: [Run Java Flink PortableValidatesRunner Streaming]
62+
# every major version
63+
flink_version: [ '1.20', '2.0' ]
6264
if: |
6365
github.event_name == 'workflow_dispatch' ||
6466
github.event_name == 'pull_request_target' ||
@@ -71,13 +73,13 @@ jobs:
7173
with:
7274
comment_phrase: ${{ matrix.job_phrase }}
7375
github_token: ${{ secrets.GITHUB_TOKEN }}
74-
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
76+
github_job: ${{ matrix.job_name }} (${{ matrix.flink_version }})
7577
- name: Setup environment
7678
uses: ./.github/actions/setup-environment-action
7779
- name: run PostCommit Java Flink PortableValidatesRunner Streaming script
7880
uses: ./.github/actions/gradle-command-self-hosted-action
7981
with:
80-
gradle-command: runners:flink:1.20:job-server:validatesPortableRunnerStreaming
82+
gradle-command: runners:flink:${{ matrix.flink_version }}:job-server:validatesPortableRunnerStreaming
8183
- name: Archive JUnit Test Results
8284
uses: actions/upload-artifact@v4
8385
if: ${{ !success() }}

CHANGES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@
6161

6262
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6363
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
64-
* Flink 2.0 support for Java classic Flink runner ([#36947](https://github.com/apache/beam/issues/36947)).
65-
Also added intial, experimental support for Portable Flink runner since this Beam version.
64+
* Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)),
65+
experimental support for other SDK languages including Python.
6666

6767

6868
## I/Os

runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,8 +696,9 @@ private void translateImpulse(
696696
public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
697697
@Override
698698
public boolean test(RunnerApi.PTransform pTransform) {
699-
return STREAMING_IMPULSE_TRANSFORM_URN.equals(
700-
PTransformTranslation.urnForTransformOrNull(pTransform));
699+
String urn = PTransformTranslation.urnForTransformOrNull(pTransform);
700+
return STREAMING_IMPULSE_TRANSFORM_URN.equals(urn)
701+
|| PTransformTranslation.RESHUFFLE_URN.equals(urn);
701702
}
702703
}
703704

sdks/java/core/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ dependencies {
125125
shadowTest library.java.guava_testlib
126126
shadowTest library.java.mockito_core
127127
shadowTest library.java.hamcrest
128-
shadowTest "com.esotericsoftware.kryo:kryo:2.21"
129128
shadowTest library.java.quickcheck_core
130129
shadowTest library.java.quickcheck_generators
131130
shadowTest library.java.zstd_jni

sdks/java/extensions/avro/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ dependencies {
7575
testImplementation project(path: ":sdks:java:extensions:avro:vendored-test", configuration: "shadowTest")
7676
testImplementation library.java.junit
7777
testImplementation "org.tukaani:xz:1.9" // marked as optional in avro
78+
testImplementation "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test
7879
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
7980
testRuntimeOnly library.java.slf4j_jdk14
8081
avroVersions.each { k,v ->
@@ -86,6 +87,7 @@ dependencies {
8687
"avroVersion$k" project(path: ":runners:direct-java", configuration: "shadow")
8788
"avroVersion$k" library.java.slf4j_jdk14
8889
"avroVersion$k" "org.tukaani:xz:1.9" // marked as optional in avro
90+
"avroVersion$k" "com.esotericsoftware:kryo:5.6.2" // Used by Avro coder test
8991
"avroVersion$k" library.java.zstd_jni // marked as optional in avro
9092
"avroVersion$k" "org.apache.avro:avro:$v:tests"
9193
"avroVersion${k}Generate" "org.apache.avro:avro-tools:$v"

sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ public void testKryoSerialization() throws Exception {
286286

287287
// Kryo instantiation
288288
Kryo kryo = new Kryo();
289+
kryo.setRegistrationRequired(false);
289290
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
290291
kryo.addDefaultSerializer(AvroCoder.SerializableSchemaSupplier.class, JavaSerializer.class);
291292

sdks/java/extensions/euphoria/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dependencies {
3030
implementation library.java.joda_time
3131
implementation library.java.slf4j_api
3232
implementation library.java.vendored_guava_32_1_2_jre
33-
testImplementation project(":sdks:java:extensions:kryo")
33+
testImplementation project(path: ":sdks:java:extensions:kryo", configuration: "shadow")
3434
testImplementation library.java.slf4j_api
3535
testImplementation library.java.hamcrest
3636
testImplementation library.java.mockito_core

sdks/java/extensions/kryo/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
plugins { id 'org.apache.beam.module' }
2020

2121
ext {
22-
kryoVersion = '5.5.0'
22+
kryoVersion = '5.6.2'
2323
}
2424

2525
applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.kryo',

0 commit comments

Comments
 (0)