Skip to content

Commit 54c39a0

Browse files
committed
[Draft] Flink 2.0 support
1 parent 63e5f2c commit 54c39a0

File tree

65 files changed

+16670
-47
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+16670
-47
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
3939
docker_image_default_repo_prefix=beam_
4040

4141
# supported flink versions
42-
flink_versions=1.17,1.18,1.19,1.20
42+
flink_versions=1.17,1.18,1.19,1.20,2.0
4343
# supported python versions
4444
python_versions=3.10,3.11,3.12,3.13

runners/flink/2.0/build.gradle

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
project.ext {
20+
flink_major = '2.0'
21+
flink_version = '2.0.1'
22+
excluded_files = [
23+
'main': [
24+
// Used by DataSet API only
25+
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java",
26+
"org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java",
27+
"org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java",
28+
"org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java",
29+
"org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java",
30+
// Moved to org.apache.flink.runtime.state.StateBackendFactory
31+
"org/apache/beam/runners/flink/FlinkStateBackendFactory.java",
32+
],
33+
'test': [
34+
// Used by DataSet API only
35+
"org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java",
36+
"org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java",
37+
"org/apache/beam/runners/flink/batch/ReshuffleTest.java",
38+
]
39+
]
40+
}
41+
42+
// Load the main build script which contains all build logic.
43+
apply from: "../flink_runner.gradle"
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
def basePath = '../../job-server-container'
20+
21+
project.ext {
22+
resource_path = basePath
23+
}
24+
25+
// Load the main build script which contains all build logic.
26+
apply from: "$basePath/flink_job_server_container.gradle"
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
def basePath = '../../job-server'
20+
21+
project.ext {
22+
// Look for the source code in the parent module
23+
main_source_dirs = ["$basePath/src/main/java"]
24+
test_source_dirs = ["$basePath/src/test/java"]
25+
main_resources_dirs = ["$basePath/src/main/resources"]
26+
test_resources_dirs = ["$basePath/src/test/resources"]
27+
archives_base_name = 'beam-runners-flink-2.0-job-server'
28+
}
29+
30+
// Load the main build script which contains all build logic.
31+
apply from: "$basePath/flink_job_server.gradle"
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.flink;
19+
20+
import java.util.Map;
21+
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
22+
import org.apache.beam.runners.flink.translation.utils.CountingPipelineVisitor;
23+
import org.apache.beam.runners.flink.translation.utils.LookupPipelineVisitor;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.coders.Coder;
26+
import org.apache.beam.sdk.options.PipelineOptions;
27+
import org.apache.beam.sdk.runners.AppliedPTransform;
28+
import org.apache.beam.sdk.transforms.PTransform;
29+
import org.apache.beam.sdk.values.PCollection;
30+
import org.apache.beam.sdk.values.PValue;
31+
import org.apache.beam.sdk.values.TupleTag;
32+
import org.apache.beam.sdk.values.WindowedValue;
33+
import org.apache.beam.sdk.values.WindowedValues;
34+
import org.apache.beam.sdk.values.WindowingStrategy;
35+
import org.apache.flink.api.common.typeinfo.TypeInformation;
36+
37+
/**
38+
* Helper for {@link FlinkBatchPipelineTranslator} and translators in {@link
39+
* FlinkBatchTransformTranslators}.
40+
*/
41+
@SuppressWarnings({
42+
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
43+
})
44+
class FlinkBatchTranslationContext {
45+
private final PipelineOptions options;
46+
47+
private AppliedPTransform<?, ?, ?> currentTransform;
48+
49+
private final CountingPipelineVisitor countingPipelineVisitor = new CountingPipelineVisitor();
50+
private final LookupPipelineVisitor lookupPipelineVisitor = new LookupPipelineVisitor();
51+
52+
// ------------------------------------------------------------------------
53+
54+
FlinkBatchTranslationContext(PipelineOptions options) {
55+
this.options = options;
56+
}
57+
58+
void init(Pipeline pipeline) {
59+
pipeline.traverseTopologically(countingPipelineVisitor);
60+
pipeline.traverseTopologically(lookupPipelineVisitor);
61+
}
62+
63+
public PipelineOptions getPipelineOptions() {
64+
return options;
65+
}
66+
67+
/**
68+
* Sets the AppliedPTransform which carries input/output.
69+
*
70+
* @param currentTransform Current transformation.
71+
*/
72+
void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
73+
this.currentTransform = currentTransform;
74+
}
75+
76+
AppliedPTransform<?, ?, ?> getCurrentTransform() {
77+
return currentTransform;
78+
}
79+
80+
Map<TupleTag<?>, Coder<?>> getOutputCoders(PTransform<?, ?> transform) {
81+
return lookupPipelineVisitor.getOutputCoders(transform);
82+
}
83+
84+
<T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
85+
return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
86+
}
87+
88+
<T> TypeInformation<WindowedValue<T>> getTypeInfo(
89+
Coder<T> coder, WindowingStrategy<?, ?> windowingStrategy) {
90+
WindowedValues.FullWindowedValueCoder<T> windowedValueCoder =
91+
WindowedValues.getFullCoder(coder, windowingStrategy.getWindowFn().windowCoder());
92+
93+
return new CoderTypeInformation<>(windowedValueCoder, options);
94+
}
95+
96+
Map<TupleTag<?>, PCollection<?>> getInputs(PTransform<?, ?> transform) {
97+
return lookupPipelineVisitor.getInputs(transform);
98+
}
99+
100+
<T extends PValue> T getInput(PTransform<T, ?> transform) {
101+
return lookupPipelineVisitor.getInput(transform);
102+
}
103+
104+
Map<TupleTag<?>, PCollection<?>> getOutputs(PTransform<?, ?> transform) {
105+
return lookupPipelineVisitor.getOutputs(transform);
106+
}
107+
108+
<T extends PValue> T getOutput(PTransform<?, T> transform) {
109+
return lookupPipelineVisitor.getOutput(transform);
110+
}
111+
112+
/** {@link CountingPipelineVisitor#getNumConsumers(PValue)}. */
113+
int getNumConsumers(PValue value) {
114+
return countingPipelineVisitor.getNumConsumers(value);
115+
}
116+
}

0 commit comments

Comments
 (0)