@@ -67,6 +67,8 @@ public class LauncherOptionParser {
6767
6868 public static final String OPTION_FLINK_JAR_PATH = "flinkJarPath" ;
6969
70+ public static final String OPTION_QUEUE = "queue" ;
71+
7072 private Options options = new Options ();
7173
7274 private BasicParser parser = new BasicParser ();
@@ -87,6 +89,7 @@ public LauncherOptionParser(String[] args) throws Exception {
8789 options .addOption (OPTION_SAVE_POINT_PATH , true , "Savepoint restore path" );
8890 options .addOption (OPTION_ALLOW_NON_RESTORED_STATE , true , "Flag indicating whether non restored state is allowed if the savepoint" );
8991 options .addOption (OPTION_FLINK_JAR_PATH , true , "flink jar path for submit of perjob mode" );
92+ options .addOption (OPTION_QUEUE , true , "flink runing yarn queue" );
9093 CommandLine cl = parser .parse (options , args );
9194 String mode = cl .getOptionValue (OPTION_MODE , ClusterMode .local .name ());
9295 //check mode
@@ -145,6 +148,10 @@ public LauncherOptionParser(String[] args) throws Exception {
145148 if (StringUtils .isNotBlank (flinkJarPath )){
146149 properties .setFlinkJarPath (flinkJarPath );
147150 }
151+ String queue = cl .getOptionValue (OPTION_QUEUE );
152+ if (StringUtils .isNotBlank (queue )){
153+ properties .setQueue (queue );
154+ }
148155 }
149156
150157 public LauncherOptions getLauncherOptions (){
@@ -159,7 +166,8 @@ public List<String> getProgramExeArgList() throws Exception {
159166 String key = one .getKey ();
160167 if (OPTION_FLINK_CONF_DIR .equalsIgnoreCase (key )
161168 || OPTION_YARN_CONF_DIR .equalsIgnoreCase (key )
162- || OPTION_FLINK_JAR_PATH .equalsIgnoreCase (key )){
169+ || OPTION_FLINK_JAR_PATH .equalsIgnoreCase (key )
170+ || OPTION_QUEUE .equalsIgnoreCase (key )){
163171 continue ;
164172 }
165173
0 commit comments