2020
2121import com .dtstack .flink .sql .launcher .LauncherOptions ;
2222import com .dtstack .flink .sql .util .PluginUtil ;
23+ import org .apache .commons .io .Charsets ;
24+ import org .apache .commons .lang3 .StringUtils ;
2325import org .apache .flink .client .deployment .ClusterSpecification ;
2426import org .apache .flink .client .program .ClusterClient ;
2527import org .apache .flink .runtime .jobgraph .JobGraph ;
28+ import org .apache .flink .shaded .guava18 .com .google .common .base .Strings ;
29+ import org .apache .flink .shaded .guava18 .com .google .common .collect .Sets ;
2630import org .apache .flink .yarn .AbstractYarnClusterDescriptor ;
2731import org .apache .hadoop .yarn .api .records .ApplicationId ;
2832import org .slf4j .Logger ;
2933import org .slf4j .LoggerFactory ;
3034
35+ import java .net .MalformedURLException ;
36+ import java .net .URL ;
37+ import java .net .URLDecoder ;
38+ import java .util .Map ;
3139import java .util .Properties ;
40+ import java .util .Set ;
3241
3342/**
3443 * per job mode submitter
@@ -43,7 +52,11 @@ public class PerJobSubmitter {
4352
4453 public static String submit (LauncherOptions launcherOptions , JobGraph jobGraph ) throws Exception {
4554
46- Properties confProperties = PluginUtil .jsonStrToObject (launcherOptions .getConfProp (), Properties .class );
55+ fillJobGraphClassPath (jobGraph );
56+
57+ String confProp = launcherOptions .getConfProp ();
58+ confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
59+ Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
4760 ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil .createClusterSpecification (confProperties );
4861
4962 PerJobClusterClientBuilder perJobClusterClientBuilder = new PerJobClusterClientBuilder ();
@@ -63,4 +76,33 @@ public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph)
6376
6477 return applicationId ;
6578 }
79+
80+ private static void fillJobGraphClassPath (JobGraph jobGraph ) throws MalformedURLException {
81+ Map <String , String > jobCacheFileConfig = jobGraph .getJobConfiguration ().toMap ();
82+ Set <String > classPathKeySet = Sets .newHashSet ();
83+
84+ for (Map .Entry <String , String > tmp : jobCacheFileConfig .entrySet ()){
85+ if (Strings .isNullOrEmpty (tmp .getValue ())){
86+ continue ;
87+ }
88+
89+ if (tmp .getValue ().startsWith ("class_path" )){
90+ //DISTRIBUTED_CACHE_FILE_NAME_1
91+ //DISTRIBUTED_CACHE_FILE_PATH_1
92+ String key = tmp .getKey ();
93+ String [] array = key .split ("_" );
94+ if (array .length < 5 ){
95+ continue ;
96+ }
97+
98+ array [3 ] = "PATH" ;
99+ classPathKeySet .add (StringUtils .join (array , "_" ));
100+ }
101+ }
102+
103+ for (String key : classPathKeySet ){
104+ String pathStr = jobCacheFileConfig .get (key );
105+ jobGraph .getClasspaths ().add (new URL ("file:" + pathStr ));
106+ }
107+ }
66108}
0 commit comments