Skip to content

Commit 8c44b4d

Browse files
committed
Merge branch 'v1.5.0_dev_addqueue' into 'v1.5.0_dev'
add queue parameter See merge request !15
2 parents 606c041 + 9ec682f commit 8c44b4d

File tree

3 files changed

+71
-4
lines changed

3 files changed

+71
-4
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public class LauncherMain {
6060

6161
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
6262
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
63-
return jarPath;
63+
String corePath = localSqlRootJar + SP + jarPath;
64+
return corePath;
6465
}
6566

6667
public static void main(String[] args) throws Exception {

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class LauncherOptionParser {
6767

6868
public static final String OPTION_FLINK_JAR_PATH = "flinkJarPath";
6969

70+
public static final String OPTION_QUEUE = "queue";
71+
7072
private Options options = new Options();
7173

7274
private BasicParser parser = new BasicParser();
@@ -87,6 +89,7 @@ public LauncherOptionParser(String[] args) {
8789
options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path");
8890
options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint");
8991
options.addOption(OPTION_FLINK_JAR_PATH, true, "flink jar path for submit of perjob mode");
92+
options.addOption(OPTION_QUEUE, true, "flink runing yarn queue");
9093

9194
try {
9295
CommandLine cl = parser.parse(options, args);
@@ -145,6 +148,10 @@ public LauncherOptionParser(String[] args) {
145148
properties.setFlinkJarPath(flinkJarPath);
146149
}
147150

151+
String queue = cl.getOptionValue(OPTION_QUEUE);
152+
if(StringUtils.isNotBlank(queue)){
153+
properties.setQueue(queue);
154+
}
148155
} catch (Exception e) {
149156
throw new RuntimeException(e);
150157
}
@@ -161,7 +168,8 @@ public List<String> getProgramExeArgList() throws Exception {
161168
String key = one.getKey();
162169
if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key)
163170
|| OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)
164-
|| OPTION_FLINK_JAR_PATH.equalsIgnoreCase(key)){
171+
|| OPTION_FLINK_JAR_PATH.equalsIgnoreCase(key)
172+
|| OPTION_QUEUE.equalsIgnoreCase(key)){
165173
continue;
166174
}
167175

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,23 @@
2020

2121
import com.dtstack.flink.sql.launcher.LauncherOptions;
2222
import com.dtstack.flink.sql.util.PluginUtil;
23+
import org.apache.commons.io.Charsets;
24+
import org.apache.commons.lang3.StringUtils;
2325
import org.apache.flink.client.deployment.ClusterSpecification;
2426
import org.apache.flink.client.program.ClusterClient;
27+
import org.apache.flink.core.fs.Path;
2528
import org.apache.flink.runtime.jobgraph.JobGraph;
29+
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
2631
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
2732
import org.apache.hadoop.yarn.api.records.ApplicationId;
2833
import org.slf4j.Logger;
2934
import org.slf4j.LoggerFactory;
3035

31-
import java.util.Properties;
36+
import java.net.MalformedURLException;
37+
import java.net.URL;
38+
import java.net.URLDecoder;
39+
import java.util.*;
3240

3341
/**
3442
* per job mode submitter
@@ -43,7 +51,20 @@ public class PerJobSubmitter {
4351

4452
public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph) throws Exception {
4553

46-
Properties confProperties = PluginUtil.jsonStrToObject(launcherOptions.getConfProp(), Properties.class);
54+
fillJobGraphClassPath(jobGraph);
55+
56+
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
57+
if (StringUtils.isNotBlank(addjarPath) ){
58+
List<String> paths = getJarPaths(addjarPath);
59+
paths.forEach( path ->{
60+
jobGraph.addJar(new Path("file://" + path));
61+
});
62+
63+
}
64+
65+
String confProp = launcherOptions.getConfProp();
66+
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
67+
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
4768
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
4869

4970
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
@@ -63,4 +84,41 @@ public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph)
6384

6485
return applicationId;
6586
}
87+
88+
private static List<String> getJarPaths(String addjarPath) {
89+
if (addjarPath.length() > 2) {
90+
addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"","");
91+
}
92+
List<String> paths = Arrays.asList(addjarPath.split(","));
93+
return paths;
94+
}
95+
96+
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
97+
Map<String, String> jobCacheFileConfig = jobGraph.getJobConfiguration().toMap();
98+
Set<String> classPathKeySet = Sets.newHashSet();
99+
100+
for(Map.Entry<String, String> tmp : jobCacheFileConfig.entrySet()){
101+
if(Strings.isNullOrEmpty(tmp.getValue())){
102+
continue;
103+
}
104+
105+
if(tmp.getValue().startsWith("class_path")){
106+
//DISTRIBUTED_CACHE_FILE_NAME_1
107+
//DISTRIBUTED_CACHE_FILE_PATH_1
108+
String key = tmp.getKey();
109+
String[] array = key.split("_");
110+
if(array.length < 5){
111+
continue;
112+
}
113+
114+
array[3] = "PATH";
115+
classPathKeySet.add(StringUtils.join(array, "_"));
116+
}
117+
}
118+
119+
for(String key : classPathKeySet){
120+
String pathStr = jobCacheFileConfig.get(key);
121+
jobGraph.getClasspaths().add(new URL("file:" + pathStr));
122+
}
123+
}
66124
}

0 commit comments

Comments
 (0)