2424import org .apache .commons .lang3 .StringUtils ;
2525import org .apache .flink .client .deployment .ClusterSpecification ;
2626import org .apache .flink .client .program .ClusterClient ;
27+ import org .apache .flink .core .fs .Path ;
2728import org .apache .flink .runtime .jobgraph .JobGraph ;
2829import org .apache .flink .shaded .guava18 .com .google .common .base .Strings ;
2930import org .apache .flink .shaded .guava18 .com .google .common .collect .Sets ;
3536import java .net .MalformedURLException ;
3637import java .net .URL ;
3738import java .net .URLDecoder ;
38- import java .util .Map ;
39- import java .util .Properties ;
40- import java .util .Set ;
39+ import java .util .*;
4140
4241/**
4342 * per job mode submitter
@@ -52,9 +51,18 @@ public class PerJobSubmitter {
5251
5352 public static String submit (LauncherOptions launcherOptions , JobGraph jobGraph ) throws Exception {
5453
55- fillJobGraphClassPath (jobGraph );
54+ fillJobGraphClassPath (jobGraph );
5655
57- String confProp = launcherOptions .getConfProp ();
56+ String addjarPath = URLDecoder .decode (launcherOptions .getAddjar (), Charsets .UTF_8 .toString ());
57+ if (StringUtils .isNotBlank (addjarPath ) ){
58+ List <String > paths = getJarPaths (addjarPath );
59+ paths .forEach ( path ->{
60+ jobGraph .addJar (new Path ("file://" + path ));
61+ });
62+
63+ }
64+
65+ String confProp = launcherOptions .getConfProp ();
5866 confProp = URLDecoder .decode (confProp , Charsets .UTF_8 .toString ());
5967 Properties confProperties = PluginUtil .jsonStrToObject (confProp , Properties .class );
6068 ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil .createClusterSpecification (confProperties );
@@ -77,32 +85,40 @@ public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph)
7785 return applicationId ;
7886 }
7987
80- private static void fillJobGraphClassPath (JobGraph jobGraph ) throws MalformedURLException {
81- Map <String , String > jobCacheFileConfig = jobGraph .getJobConfiguration ().toMap ();
82- Set <String > classPathKeySet = Sets .newHashSet ();
83-
84- for (Map .Entry <String , String > tmp : jobCacheFileConfig .entrySet ()){
85- if (Strings .isNullOrEmpty (tmp .getValue ())){
86- continue ;
87- }
88-
89- if (tmp .getValue ().startsWith ("class_path" )){
90- //DISTRIBUTED_CACHE_FILE_NAME_1
91- //DISTRIBUTED_CACHE_FILE_PATH_1
92- String key = tmp .getKey ();
93- String [] array = key .split ("_" );
94- if (array .length < 5 ){
95- continue ;
96- }
97-
98- array [3 ] = "PATH" ;
99- classPathKeySet .add (StringUtils .join (array , "_" ));
100- }
101- }
102-
103- for (String key : classPathKeySet ){
104- String pathStr = jobCacheFileConfig .get (key );
105- jobGraph .getClasspaths ().add (new URL ("file:" + pathStr ));
106- }
107- }
88+ private static List <String > getJarPaths (String addjarPath ) {
89+ if (addjarPath .length () > 2 ) {
90+ addjarPath = addjarPath .substring (1 ,addjarPath .length ()-1 ).replace ("\" " ,"" );
91+ }
92+ List <String > paths = Arrays .asList (addjarPath .split ("," ));
93+ return paths ;
94+ }
95+
96+ private static void fillJobGraphClassPath (JobGraph jobGraph ) throws MalformedURLException {
97+ Map <String , String > jobCacheFileConfig = jobGraph .getJobConfiguration ().toMap ();
98+ Set <String > classPathKeySet = Sets .newHashSet ();
99+
100+ for (Map .Entry <String , String > tmp : jobCacheFileConfig .entrySet ()){
101+ if (Strings .isNullOrEmpty (tmp .getValue ())){
102+ continue ;
103+ }
104+
105+ if (tmp .getValue ().startsWith ("class_path" )){
106+ //DISTRIBUTED_CACHE_FILE_NAME_1
107+ //DISTRIBUTED_CACHE_FILE_PATH_1
108+ String key = tmp .getKey ();
109+ String [] array = key .split ("_" );
110+ if (array .length < 5 ){
111+ continue ;
112+ }
113+
114+ array [3 ] = "PATH" ;
115+ classPathKeySet .add (StringUtils .join (array , "_" ));
116+ }
117+ }
118+
119+ for (String key : classPathKeySet ){
120+ String pathStr = jobCacheFileConfig .get (key );
121+ jobGraph .getClasspaths ().add (new URL ("file:" + pathStr ));
122+ }
123+ }
108124}
0 commit comments