Skip to content

Commit 1ee55c3

Browse files
committed
fix flink-conf is not obtained in local mode
1 parent c9a2732 commit 1ee55c3

File tree

2 files changed

+31
-15
lines changed

2 files changed

+31
-15
lines changed

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public PipelineExecution.ExecutionInfo run() throws Exception {
7676
case YARN_APPLICATION:
7777
return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor());
7878
case LOCAL:
79-
return deployWithComposer(FlinkPipelineComposer.ofMiniCluster());
79+
return deployWithComposer(
80+
FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars));
8081
case REMOTE:
8182
case YARN_SESSION:
8283
return deployWithComposer(

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.composer.flink;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2122
import org.apache.flink.cdc.common.configuration.Configuration;
2223
import org.apache.flink.cdc.common.event.Event;
2324
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -59,32 +60,28 @@ public class FlinkPipelineComposer implements PipelineComposer {
5960
public static FlinkPipelineComposer ofRemoteCluster(
6061
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) {
6162
StreamExecutionEnvironment env = new StreamExecutionEnvironment(flinkConfig);
62-
additionalJars.forEach(
63-
jarPath -> {
64-
try {
65-
FlinkEnvironmentUtils.addJar(
66-
env,
67-
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
68-
} catch (Exception e) {
69-
throw new RuntimeException(
70-
String.format(
71-
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment",
72-
jarPath),
73-
e);
74-
}
75-
});
63+
addAdditionalJars(env, additionalJars);
7664
return new FlinkPipelineComposer(env, false);
7765
}
7866

7967
public static FlinkPipelineComposer ofApplicationCluster(StreamExecutionEnvironment env) {
8068
return new FlinkPipelineComposer(env, false);
8169
}
8270

71+
@VisibleForTesting
8372
public static FlinkPipelineComposer ofMiniCluster() {
8473
return new FlinkPipelineComposer(
8574
StreamExecutionEnvironment.getExecutionEnvironment(), true);
8675
}
8776

77+
public static FlinkPipelineComposer ofMiniCluster(
78+
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) {
79+
StreamExecutionEnvironment localEnvironment =
80+
StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
81+
addAdditionalJars(localEnvironment, additionalJars);
82+
return new FlinkPipelineComposer(localEnvironment, true);
83+
}
84+
8885
private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
8986
this.env = env;
9087
this.isBlocking = isBlocking;
@@ -254,4 +251,22 @@ private Optional<URL> getContainingJar(Class<?> clazz) throws Exception {
254251
}
255252
return Optional.of(container);
256253
}
254+
255+
private static void addAdditionalJars(
256+
StreamExecutionEnvironment env, List<Path> additionalJars) {
257+
additionalJars.forEach(
258+
jarPath -> {
259+
try {
260+
FlinkEnvironmentUtils.addJar(
261+
env,
262+
jarPath.makeQualified(jarPath.getFileSystem()).toUri().toURL());
263+
} catch (Exception e) {
264+
throw new RuntimeException(
265+
String.format(
266+
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment",
267+
jarPath),
268+
e);
269+
}
270+
});
271+
}
257272
}

0 commit comments

Comments
 (0)