Skip to content

Commit bf0a388

Browse files
yarnper submit bugfix
1 parent 4540349 commit bf0a388

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static ClusterClient createClusterClient(LauncherOptions launcherOptions)
6363
String mode = launcherOptions.getMode();
6464
if(mode.equals(ClusterMode.standalone.name())) {
6565
return createStandaloneClient(launcherOptions);
66-
} else if(mode.equals(ClusterMode.yarn.name()) || mode.equals(ClusterMode.yarnPer.name())) {
66+
} else if(mode.equals(ClusterMode.yarn.name())) {
6767
return createYarnClient(launcherOptions,mode);
6868
}
6969
throw new IllegalArgumentException("Unsupported cluster client type: ");

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020

2121
import com.dtstack.flink.sql.options.LauncherOptions;
2222
import com.dtstack.flink.sql.util.PluginUtil;
23+
import org.apache.flink.api.common.cache.DistributedCache;
2324
import org.apache.flink.client.deployment.ClusterSpecification;
2425
import org.apache.flink.client.program.ClusterClient;
2526
import org.apache.flink.runtime.jobgraph.JobGraph;
2627
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
2728
import org.apache.hadoop.yarn.api.records.ApplicationId;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
30-
31+
import java.net.MalformedURLException;
32+
import java.net.URL;
33+
import java.util.Map;
3134
import java.util.Properties;
3235

3336
/**
@@ -39,28 +42,33 @@
3942

4043
public class PerJobSubmitter {
4144

45+
private final static String CLASS_FILE_NAME_PRESTR = "class_path";
46+
4247
private static final Logger LOG = LoggerFactory.getLogger(PerJobSubmitter.class);
4348

4449
public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph) throws Exception {
45-
4650
Properties confProperties = PluginUtil.jsonStrToObject(launcherOptions.getConfProp(), Properties.class);
4751
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
48-
4952
PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder();
5053
perJobClusterClientBuilder.init(launcherOptions.getYarnconf());
51-
5254
String flinkJarPath = launcherOptions.getFlinkJarPath();
53-
5455
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions.getQueue());
55-
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
56-
56+
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification,fillJobGraphClassPath(jobGraph),true);
5757
String applicationId = clusterClient.getClusterId().toString();
5858
String flinkJobId = jobGraph.getJobID().toString();
59-
6059
String tips = String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId);
6160
System.out.println(tips);
6261
LOG.info(tips);
63-
6462
return applicationId;
6563
}
64+
65+
private static JobGraph fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
66+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
67+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
68+
if(tmp.getKey().startsWith(CLASS_FILE_NAME_PRESTR)){
69+
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
70+
}
71+
}
72+
return jobGraph;
73+
}
6674
}

0 commit comments

Comments
 (0)