Skip to content

Commit b732d3d

Browse files
committed
Add Flink2 support for pipeline source connector.
1 parent 1b360d5 commit b732d3d

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.util.Collection;
6767
import java.util.Collections;
6868
import java.util.List;
69+
import java.util.Objects;
6970
import java.util.concurrent.TimeUnit;
7071
import java.util.concurrent.TimeoutException;
7172
import java.util.function.Function;
@@ -161,7 +162,12 @@ private int getParallelism() {
161162
protected String flinkVersion = getFlinkVersion();
162163

163164
public static String getFlinkVersion() {
164-
return "1.20.3";
165+
String flinkVersion = System.getProperty("specifiedFlinkVersion");
166+
if (Objects.isNull(flinkVersion)) {
167+
throw new IllegalArgumentException(
168+
"No Flink version specified to run this test. Please use -DspecifiedFlinkVersion to pass one.");
169+
}
170+
return flinkVersion;
165171
}
166172

167173
protected List<String> copyJarToFlinkLib() {
@@ -398,6 +404,17 @@ public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
398404
}
399405

400406
protected String getFlinkDockerImageTag() {
407+
String customImage = System.getProperty("flink.docker.image");
408+
if (customImage != null && !customImage.isEmpty()) {
409+
return customImage;
410+
}
411+
String registry = System.getProperty("flink.docker.registry", "");
412+
if (!registry.isEmpty()) {
413+
if (System.getProperty("java.specification.version").equals("17")) {
414+
return String.format("%s/flink:%s-scala_2.12-java17", registry, flinkVersion);
415+
}
416+
return String.format("%s/flink:%s-scala_2.12-java11", registry, flinkVersion);
417+
}
401418
if (System.getProperty("java.specification.version").equals("17")) {
402419
return String.format("flink:%s-scala_2.12-java17", flinkVersion);
403420
}

0 commit comments

Comments
 (0)