Skip to content

Commit 872cb6d

Browse files
xiuzhu9527tiezhu
authored andcommitted
[hotfix] fix running failure in k8s appliation mode
1 parent da3c487 commit 872cb6d

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/Main.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@
7171
import org.slf4j.LoggerFactory;
7272

7373
import java.net.URL;
74-
import java.net.URLDecoder;
75-
import java.nio.charset.StandardCharsets;
7674
import java.util.Arrays;
7775
import java.util.List;
7876
import java.util.Optional;
@@ -95,7 +93,7 @@ public static void main(String[] args) throws Exception {
9593
LOG.info("-------------------------------------------");
9694

9795
Options options = new OptionParser(args).getOptions();
98-
String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
96+
String job = options.getJob();
9997
Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
10098
StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
10199
StreamTableEnvironment tEnv =
@@ -315,7 +313,7 @@ private static void configStreamExecutionEnvironment(
315313
factoryHelper.setRemotePluginPath(options.getRemoteFlinkxDistDir());
316314
factoryHelper.setPluginLoadMode(options.getPluginLoadMode());
317315
factoryHelper.setEnv(env);
318-
factoryHelper.setExecutionMode(options.getMode());
316+
factoryHelper.setOptions(options);
319317

320318
DirtyConf dirtyConf = DirtyConfUtil.parse(options);
321319
factoryHelper.registerCachedFile(

flinkx-core/src/main/java/com/dtstack/flinkx/util/FactoryHelper.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.dtstack.flinkx.util;
1919

2020
import com.dtstack.flinkx.constants.ConstantValue;
21+
import com.dtstack.flinkx.options.Options;
2122

2223
import org.apache.flink.configuration.ConfigOption;
2324
import org.apache.flink.configuration.ConfigOptions;
@@ -60,8 +61,8 @@ public class FactoryHelper {
6061
protected List<URL> classPathSet = new ArrayList<>();
6162
/** shipfile需要的jar的classPath index */
6263
protected int classFileNameIndex = 0;
63-
/** 任务执行模式 */
64-
protected String executionMode;
64+
/** 任务启动参数 */
65+
protected Options options;
6566

6667
public FactoryHelper() {}
6768

@@ -93,7 +94,7 @@ public void registerCachedFile(
9394
this.classFileNameIndex++;
9495
}
9596
}
96-
PluginUtil.setPipelineOptionsToEnvConfig(this.env, urlList, executionMode);
97+
PluginUtil.setPipelineOptionsToEnvConfig(this.env, urlList, options);
9798
} catch (Exception e) {
9899
LOG.warn("can't add jar in {} to cachedFile, e = {}", urlSet, e.getMessage());
99100
}
@@ -115,11 +116,11 @@ public void setEnv(StreamExecutionEnvironment env) {
115116
this.env = env;
116117
}
117118

118-
public String getExecutionMode() {
119-
return executionMode;
119+
public Options getOptions() {
120+
return options;
120121
}
121122

122-
public void setExecutionMode(String executionMode) {
123-
this.executionMode = executionMode;
123+
public void setOptions(Options options) {
124+
this.options = options;
124125
}
125126
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/PluginUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ public static void registerPluginUrlToCachedFile(
346346
e);
347347
}
348348

349-
config.setSyncJarList(setPipelineOptionsToEnvConfig(env, urlList, options.getMode()));
349+
config.setSyncJarList(setPipelineOptionsToEnvConfig(env, urlList, options));
350350
}
351351

352352
/**
@@ -358,7 +358,7 @@ public static void registerPluginUrlToCachedFile(
358358
*/
359359
@SuppressWarnings("all")
360360
public static List<String> setPipelineOptionsToEnvConfig(
361-
StreamExecutionEnvironment env, List<String> urlList, String executionMode) {
361+
StreamExecutionEnvironment env, List<String> urlList, Options options) {
362362
try {
363363
Configuration configuration =
364364
(Configuration)
@@ -370,6 +370,7 @@ public static List<String> setPipelineOptionsToEnvConfig(
370370
jarList.addAll(urlList);
371371

372372
List<String> pipelineJars = new ArrayList();
373+
String executionMode = options.getMode();
373374
LOG.info("Flinkx executionMode: " + executionMode);
374375
if (ClusterMode.getByName(executionMode) == ClusterMode.kubernetesApplication) {
375376
for (String jarUrl : jarList) {

0 commit comments

Comments
 (0)