Skip to content

Commit 323b2bc

Browse files
lvyanquanxiaoxiong.duan@zznode.com
andauthored
[FLINK-38729] Add support for Flink 2.2.0 (apache#4294)
Co-authored-by: xiaoxiong.duan@zznode.com <xiaoxiong.duan@zznode.com>
1 parent db595e4 commit 323b2bc

File tree

88 files changed

+1620
-83
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+1620
-83
lines changed

.github/workflows/flink_cdc_base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ jobs:
108108
109109
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"
110110
111-
mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify
111+
mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify
112112
113113
- name: Print JVM thread dumps when cancelled
114114
if: ${{ failure() }}

.github/workflows/flink_cdc_ci.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ jobs:
6363
with:
6464
java-versions: "[11]"
6565
modules: "['core']"
66+
common_2_x:
67+
name: Common Unit Tests 2.x
68+
uses: ./.github/workflows/flink_cdc_base.yml
69+
with:
70+
java-versions: "[11]"
71+
flink-versions: "['2.2.0']"
72+
custom-maven-parameter: "-Pflink2"
73+
modules: "['core_2.x']"
6674
pipeline-ut:
6775
name: Pipeline Unit Tests
6876
uses: ./.github/workflows/flink_cdc_base.yml
@@ -88,6 +96,20 @@ jobs:
8896
flink-versions: "['1.19.3', '1.20.3']"
8997
modules: "['pipeline_e2e']"
9098
parallelism: ${{ matrix.parallelism }}
99+
pipeline_e2e_2_x:
100+
strategy:
101+
max-parallel: 2
102+
fail-fast: false
103+
matrix:
104+
parallelism: [ 1, 4 ]
105+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
106+
uses: ./.github/workflows/flink_cdc_base.yml
107+
with:
108+
java-versions: "[11]"
109+
flink-versions: "['2.2.0']"
110+
custom-maven-parameter: "-Pflink2"
111+
modules: "['pipeline_e2e']"
112+
parallelism: ${{ matrix.parallelism }}
91113
source_e2e:
92114
name: Source E2E Tests
93115
uses: ./.github/workflows/flink_cdc_base.yml

.github/workflows/flink_cdc_ci_nightly.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ jobs:
5454
with:
5555
java-versions: "[17]"
5656
modules: "['core']"
57+
common_2_x:
58+
if: github.repository == 'apache/flink-cdc'
59+
name: Common Unit Tests 2.x
60+
uses: ./.github/workflows/flink_cdc_base.yml
61+
with:
62+
java-versions: "[17]"
63+
flink-versions: "['2.2.0']"
64+
custom-maven-parameter: "-Pflink2"
65+
modules: "['core_2.x']"
5766
pipeline-ut:
5867
if: github.repository == 'apache/flink-cdc'
5968
name: Pipeline Unit Tests
@@ -82,6 +91,21 @@ jobs:
8291
flink-versions: "['1.19.3', '1.20.3']"
8392
modules: "['pipeline_e2e']"
8493
parallelism: ${{ matrix.parallelism }}
94+
pipeline_e2e_2_x:
95+
if: github.repository == 'apache/flink-cdc'
96+
strategy:
97+
max-parallel: 2
98+
fail-fast: false
99+
matrix:
100+
parallelism: [ 1, 4 ]
101+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
102+
uses: ./.github/workflows/flink_cdc_base.yml
103+
with:
104+
java-versions: "[17]"
105+
custom-maven-parameter: "-Pflink2"
106+
flink-versions: "['2.2.0']"
107+
modules: "['pipeline_e2e_2.x']"
108+
parallelism: ${{ matrix.parallelism }}
85109
source_e2e:
86110
if: github.repository == 'apache/flink-cdc'
87111
name: Source E2E Tests

.github/workflows/modules.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
1818
]
1919

20+
MODULES_CORE_2_X = [
21+
"flink-cdc-cli",
22+
"flink-cdc-common",
23+
"flink-cdc-composer",
24+
"flink-cdc-runtime",
25+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
26+
]
27+
2028
MODULES_PIPELINE_CONNECTORS = [
2129
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris",
2230
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch",
@@ -142,6 +150,7 @@
142150

143151
ALL_MODULES = set(
144152
MODULES_CORE +
153+
MODULES_CORE_2_X +
145154
MODULES_PIPELINE_CONNECTORS +
146155
MODULES_MYSQL_SOURCE +
147156
MODULES_MYSQL_PIPELINE +
@@ -173,7 +182,7 @@
173182
compile_modules = set()
174183

175184
for module in INPUT_MODULES.split(', '):
176-
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_')])
185+
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_').replace('.', '_')])
177186
test_modules |= module_list
178187
if module == 'source_e2e' or module == 'pipeline_e2e':
179188
compile_modules |= ALL_MODULES

flink-cdc-cli/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,33 @@ limitations under the License.
8282

8383
</dependencies>
8484

85+
<profiles>
86+
<profile>
87+
<id>flink2</id>
88+
<properties>
89+
<flink.version>${flink.2.x.version}</flink.version>
90+
</properties>
91+
<dependencies>
92+
<dependency>
93+
<groupId>org.apache.flink</groupId>
94+
<artifactId>flink-cdc-flink2-compat</artifactId>
95+
<version>${project.version}</version>
96+
<scope>provided</scope>
97+
</dependency>
98+
</dependencies>
99+
<build>
100+
<plugins>
101+
<plugin>
102+
<groupId>org.apache.maven.plugins</groupId>
103+
<artifactId>maven-dependency-plugin</artifactId>
104+
</plugin>
105+
<plugin>
106+
<groupId>org.apache.maven.plugins</groupId>
107+
<artifactId>maven-surefire-plugin</artifactId>
108+
</plugin>
109+
</plugins>
110+
</build>
111+
</profile>
112+
</profiles>
113+
85114
</project>

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.cdc.composer.PipelineExecution;
2828
import org.apache.flink.configuration.DeploymentOptions;
2929
import org.apache.flink.core.fs.Path;
30-
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
3130
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3231

3332
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
@@ -173,7 +172,13 @@ private static SavepointRestoreSettings createSavepointRestoreSettings(
173172
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
174173
ConfigurationUtils.getClaimModeClass());
175174
} else {
176-
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
175+
try {
176+
restoreMode =
177+
ConfigurationUtils.getClaimModeClass().getField("DEFAULT").get(null);
178+
} catch (NoSuchFieldException | IllegalAccessException e) {
179+
throw new RuntimeException(
180+
"Failed to get DEFAULT value from RestoreMode class.", e);
181+
}
177182
}
178183
// allowNonRestoredState is always false because all operators are predefined.
179184

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,16 @@ private static Map<String, String> flattenConfigMap(
6767

6868
public static Class<?> getClaimModeClass() {
6969
try {
70-
return Class.forName("org.apache.flink.core.execution.RestoreMode");
71-
} catch (ClassNotFoundException ignored) {
70+
return Class.forName("org.apache.flink.core.execution.RecoveryClaimMode");
71+
} catch (ClassNotFoundException classNotFoundException) {
7272
try {
73-
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
74-
} catch (ClassNotFoundException e) {
75-
throw new RuntimeException(e);
73+
return Class.forName("org.apache.flink.core.execution.RestoreMode");
74+
} catch (ClassNotFoundException ignored) {
75+
try {
76+
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
77+
} catch (ClassNotFoundException e) {
78+
throw new RuntimeException(e);
79+
}
7680
}
7781
}
7882
}

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ void testSavePointConfiguration() throws Exception {
107107
"-n");
108108
assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH))
109109
.isEqualTo(flinkHome() + "/savepoints/savepoint-1");
110-
assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM);
110+
assertThat(executor.getFlinkConfig().get(RESTORE_MODE).toString())
111+
.isEqualTo(RestoreMode.NO_CLAIM.toString());
111112
assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue();
112113
}
113114

flink-cdc-common/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,33 @@ limitations under the License.
4444
</plugins>
4545
</build>
4646

47+
<profiles>
48+
<profile>
49+
<id>flink2</id>
50+
<properties>
51+
<flink.version>${flink.2.x.version}</flink.version>
52+
</properties>
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-dependency-plugin</artifactId>
58+
</plugin>
59+
<plugin>
60+
<groupId>org.apache.maven.plugins</groupId>
61+
<artifactId>maven-surefire-plugin</artifactId>
62+
</plugin>
63+
</plugins>
64+
</build>
65+
<dependencies>
66+
<dependency>
67+
<groupId>org.apache.flink</groupId>
68+
<artifactId>flink-cdc-flink2-compat</artifactId>
69+
<version>${project.version}</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
</dependencies>
73+
</profile>
74+
</profiles>
75+
4776
</project>

flink-cdc-composer/pom.xml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,47 @@ limitations under the License.
9696
</dependency>
9797
</dependencies>
9898

99+
<profiles>
100+
<profile>
101+
<id>flink2</id>
102+
<properties>
103+
<flink.version>${flink.2.x.version}</flink.version>
104+
</properties>
105+
<build>
106+
<plugins>
107+
<plugin>
108+
<groupId>org.apache.maven.plugins</groupId>
109+
<artifactId>maven-dependency-plugin</artifactId>
110+
</plugin>
111+
<plugin>
112+
<groupId>org.apache.maven.plugins</groupId>
113+
<artifactId>maven-surefire-plugin</artifactId>
114+
</plugin>
115+
</plugins>
116+
</build>
117+
<dependencies>
118+
<dependency>
119+
<groupId>org.apache.flink</groupId>
120+
<artifactId>flink-cdc-flink2-compat</artifactId>
121+
<version>${project.version}</version>
122+
<scope>provided</scope>
123+
</dependency>
124+
</dependencies>
125+
</profile>
126+
<profile>
127+
<id>flink1</id>
128+
<activation>
129+
<activeByDefault>true</activeByDefault>
130+
</activation>
131+
<dependencies>
132+
<dependency>
133+
<groupId>org.apache.flink</groupId>
134+
<artifactId>flink-cdc-flink1-compat</artifactId>
135+
<version>${project.version}</version>
136+
<scope>provided</scope>
137+
</dependency>
138+
</dependencies>
139+
</profile>
140+
</profiles>
141+
99142
</project>

0 commit comments

Comments
 (0)