Skip to content

Commit b683234

Browse files
optimize parameters
1 parent 5517bd8 commit b683234

File tree

9 files changed

+204
-231
lines changed

9 files changed

+204
-231
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.enums.ECacheType;
2525
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
2626
import com.dtstack.flink.sql.exec.FlinkSQLExec;
27+
import com.dtstack.flink.sql.option.OptionParser;
2728
import com.dtstack.flink.sql.parser.CreateFuncParser;
2829
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2930
import com.dtstack.flink.sql.parser.InsertSqlParser;
@@ -43,10 +44,6 @@
4344
import org.apache.calcite.config.Lex;
4445
import org.apache.calcite.sql.SqlInsert;
4546
import org.apache.calcite.sql.SqlNode;
46-
import org.apache.commons.cli.CommandLine;
47-
import org.apache.commons.cli.CommandLineParser;
48-
import org.apache.commons.cli.DefaultParser;
49-
import org.apache.commons.cli.Options;
5047
import org.apache.commons.io.Charsets;
5148
import org.apache.flink.api.common.ExecutionConfig;
5249
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -56,24 +53,20 @@
5653
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5754
import org.apache.flink.client.program.ContextEnvironment;
5855
import org.apache.flink.configuration.Configuration;
59-
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
6056
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
6157
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
6258
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
6359
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
6460
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
6561
import org.apache.flink.streaming.api.datastream.DataStream;
6662
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
67-
6863
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6964
import org.apache.flink.table.api.Table;
7065
import org.apache.flink.table.api.java.StreamTableEnvironment;
71-
7266
import org.apache.flink.table.sinks.TableSink;
7367
import org.apache.flink.types.Row;
7468
import org.slf4j.Logger;
7569
import org.slf4j.LoggerFactory;
76-
7770
import java.io.File;
7871
import java.io.IOException;
7972
import java.lang.reflect.Field;
@@ -112,32 +105,15 @@ public class Main {
112105

113106
public static void main(String[] args) throws Exception {
114107

115-
Options options = new Options();
116-
options.addOption("sql", true, "sql config");
117-
options.addOption("name", true, "job name");
118-
options.addOption("addjar", true, "add jar");
119-
options.addOption("localSqlPluginPath", true, "local sql plugin path");
120-
options.addOption("remoteSqlPluginPath", true, "remote sql plugin path");
121-
options.addOption("confProp", true, "env properties");
122-
options.addOption("mode", true, "deploy mode");
123-
124-
options.addOption("savePointPath", true, "Savepoint restore path");
125-
options.addOption("allowNonRestoredState", true, "Flag indicating whether non restored state is allowed if the savepoint");
126-
127-
CommandLineParser parser = new DefaultParser();
128-
CommandLine cl = parser.parse(options, args);
129-
String sql = cl.getOptionValue("sql");
130-
String name = cl.getOptionValue("name");
131-
String addJarListStr = cl.getOptionValue("addjar");
132-
String localSqlPluginPath = cl.getOptionValue("localSqlPluginPath");
133-
String remoteSqlPluginPath = cl.getOptionValue("remoteSqlPluginPath");
134-
String deployMode = cl.getOptionValue("mode");
135-
String confProp = cl.getOptionValue("confProp");
136-
137-
Preconditions.checkNotNull(sql, "parameters of sql is required");
138-
Preconditions.checkNotNull(name, "parameters of name is required");
139-
Preconditions.checkNotNull(localSqlPluginPath, "parameters of localSqlPluginPath is required");
140-
108+
OptionParser optionParser = new OptionParser(args);
109+
com.dtstack.flink.sql.option.Options options = optionParser.getOptions();
110+
String sql = options.getSql();
111+
String name = options.getName();
112+
String addJarListStr = options.getAddjar();
113+
String localSqlPluginPath = options.getLocalSqlPluginPath();
114+
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
115+
String deployMode = options.getMode();
116+
String confProp = options.getConfProp();
141117
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
142118
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
143119

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.option;
20+
21+
import avro.shaded.com.google.common.collect.Lists;
22+
import com.dtstack.flink.sql.util.PluginUtil;
23+
import org.apache.commons.cli.BasicParser;
24+
import org.apache.commons.cli.CommandLine;
25+
import org.apache.commons.cli.ParseException;
26+
import org.apache.commons.lang.StringUtils;
27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Field;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.io.File;
32+
import java.io.FileInputStream;
33+
import java.net.URLEncoder;
34+
import org.apache.commons.codec.Charsets;
35+
36+
37+
/**
38+
* The Parser of Launcher commandline options
39+
*
40+
* Company: www.dtstack.com
41+
* @author sishu.yss
42+
*/
43+
public class OptionParser {
44+
45+
public static final String OPTION_SQL = "sql";
46+
47+
private org.apache.commons.cli.Options options = new org.apache.commons.cli.Options();
48+
49+
private BasicParser parser = new BasicParser();
50+
51+
private Options properties = new Options();
52+
53+
public OptionParser(String[] args) throws Exception {
54+
initOptions(addOptions(args));
55+
}
56+
57+
private CommandLine addOptions(String[] args) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ParseException {
58+
Class cla = properties.getClass();
59+
Field[] fields = cla.getDeclaredFields();
60+
for(Field field:fields){
61+
String name = field.getName();
62+
OptionRequired optionRequired = field.getAnnotation(OptionRequired.class);
63+
if(optionRequired != null){
64+
options.addOption(name,optionRequired.hasArg(),optionRequired.description());
65+
}
66+
}
67+
CommandLine cl = parser.parse(options, args);
68+
return cl;
69+
}
70+
71+
private void initOptions(CommandLine cl) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ParseException {
72+
Class cla = properties.getClass();
73+
Field[] fields = cla.getDeclaredFields();
74+
for(Field field:fields){
75+
String name = field.getName();
76+
String value = cl.getOptionValue(name);
77+
OptionRequired optionRequired = field.getAnnotation(OptionRequired.class);
78+
if(optionRequired != null){
79+
if(optionRequired.required()&&StringUtils.isBlank(value)){
80+
throw new RuntimeException(String.format("parameters of %s is required",name));
81+
}
82+
}
83+
if(StringUtils.isNotBlank(value)){
84+
field.setAccessible(true);
85+
field.set(properties,value);
86+
}
87+
}
88+
}
89+
90+
public Options getOptions(){
91+
return properties;
92+
}
93+
94+
public List<String> getProgramExeArgList() throws Exception {
95+
Map<String,Object> mapConf = PluginUtil.ObjectToMap(properties);
96+
List<String> args = Lists.newArrayList();
97+
for(Map.Entry<String, Object> one : mapConf.entrySet()){
98+
String key = one.getKey();
99+
Object value = one.getValue();
100+
if(value == null){
101+
continue;
102+
}else if(OPTION_SQL.equalsIgnoreCase(key)){
103+
File file = new File(value.toString());
104+
FileInputStream in = new FileInputStream(file);
105+
byte[] filecontent = new byte[(int) file.length()];
106+
in.read(filecontent);
107+
String content = new String(filecontent, Charsets.UTF_8.name());
108+
value = URLEncoder.encode(content, Charsets.UTF_8.name());
109+
}
110+
args.add("-" + key);
111+
args.add(value.toString());
112+
}
113+
return args;
114+
}
115+
116+
public static void main(String[] args) throws Exception {
117+
OptionParser OptionParser = new OptionParser(args);
118+
System.out.println(OptionParser.getOptions());
119+
}
120+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.option;
19+
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/**
26+
*
27+
* Reason: TODO ADD REASON(可选)
28+
* Date: 2019年9月16日 下午1:24:39
29+
* Company: www.dtstack.com
30+
* @author sishu.yss
31+
*
32+
*/
33+
@Target({ElementType.FIELD})
34+
@Retention(RetentionPolicy.RUNTIME)
35+
public @interface OptionRequired {
36+
37+
boolean required() default false;
38+
39+
boolean hasArg() default true;
40+
41+
String description() default "";
42+
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptions.java renamed to core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,57 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.launcher;
19+
package com.dtstack.flink.sql.option;
2020

2121
import com.dtstack.flink.sql.ClusterMode;
2222

23+
2324
/**
2425
* This class define commandline options for the Launcher program
2526
*
2627
* Company: www.dtstack.com
2728
2829
*/
29-
public class LauncherOptions {
30+
public class Options {
3031

32+
@OptionRequired(description = "Running mode")
3133
private String mode = ClusterMode.local.name();
3234

35+
@OptionRequired(required = true,description = "Job name")
3336
private String name;
3437

38+
@OptionRequired(required = true,description = "Job sql file")
3539
private String sql;
3640

41+
@OptionRequired(description = "Flink configuration directory")
3742
private String flinkconf;
3843

44+
@OptionRequired(description = "Yarn and Hadoop configuration directory")
3945
private String yarnconf;
4046

47+
@OptionRequired(required = true,description = "Sql local plugin root")
4148
private String localSqlPluginPath;
4249

50+
@OptionRequired(required = true,description = "Sql remote plugin root")
4351
private String remoteSqlPluginPath ;
4452

53+
@OptionRequired(description = "sql ext jar,eg udf jar")
4554
private String addjar;
4655

47-
private String confProp;
56+
@OptionRequired(description = "sql ref prop,eg specify event time")
57+
private String confProp = "{}";
4858

59+
@OptionRequired(description = "Savepoint restore path")
4960
private String savePointPath;
5061

62+
@OptionRequired(description = "Flag indicating whether non restored state is allowed if the savepoint")
5163
private String allowNonRestoredState = "false";
5264

53-
//just use for per_job mode
65+
@OptionRequired(description = "flink jar path for submit of perjob mode")
5466
private String flinkJarPath;
5567

56-
private String queue;
68+
@OptionRequired(description = "yarn queue")
69+
private String queue = "default";
5770

5871
public String getMode() {
5972
return mode;

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,8 @@ public static Object parse(String str,Class clazz){
247247
}
248248
return object;
249249
}
250+
251+
public static String firstUpperCase(String str) {
252+
return str.substring(0, 1).toUpperCase() + str.substring(1);
253+
}
250254
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.launcher;
2020

21+
import com.dtstack.flink.sql.option.Options;
2122
import com.dtstack.flink.sql.util.PluginUtil;
2223
import com.dtstack.flink.yarn.JobParameter;
2324
import com.dtstack.flink.yarn.YarnClusterConfiguration;
@@ -60,7 +61,7 @@
6061
*/
6162
public class ClusterClientFactory {
6263

63-
public static ClusterClient createClusterClient(LauncherOptions launcherOptions) throws Exception {
64+
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
6465
String mode = launcherOptions.getMode();
6566
if(mode.equals(ClusterMode.standalone.name())) {
6667
return createStandaloneClient(launcherOptions);
@@ -70,7 +71,7 @@ public static ClusterClient createClusterClient(LauncherOptions launcherOptions)
7071
throw new IllegalArgumentException("Unsupported cluster client type: ");
7172
}
7273

73-
public static ClusterClient createStandaloneClient(LauncherOptions launcherOptions) throws Exception {
74+
public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
7475
String flinkConfDir = launcherOptions.getFlinkconf();
7576
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
7677
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
@@ -85,7 +86,7 @@ public static ClusterClient createStandaloneClient(LauncherOptions launcherOptio
8586
return clusterClient;
8687
}
8788

88-
public static ClusterClient createYarnClient(LauncherOptions launcherOptions,String mode) {
89+
public static ClusterClient createYarnClient(Options launcherOptions, String mode) {
8990
String flinkConfDir = launcherOptions.getFlinkconf();
9091
Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);
9192
String yarnConfDir = launcherOptions.getYarnconf();

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.dtstack.flink.sql.ClusterMode;
2727
import com.dtstack.flink.sql.Main;
2828
import com.dtstack.flink.sql.launcher.perjob.PerJobSubmitter;
29+
import com.dtstack.flink.sql.option.OptionParser;
30+
import com.dtstack.flink.sql.option.Options;
2931
import com.dtstack.flink.sql.util.PluginUtil;
3032
import org.apache.commons.lang.BooleanUtils;
3133
import org.apache.commons.lang.StringUtils;
@@ -68,9 +70,8 @@ public static void main(String[] args) throws Exception {
6870
if (args.length == 1 && args[0].endsWith(".json")){
6971
args = parseJson(args);
7072
}
71-
72-
LauncherOptionParser optionParser = new LauncherOptionParser(args);
73-
LauncherOptions launcherOptions = optionParser.getLauncherOptions();
73+
OptionParser optionParser = new OptionParser(args);
74+
Options launcherOptions = optionParser.getOptions();
7475
String mode = launcherOptions.getMode();
7576
List<String> argList = optionParser.getProgramExeArgList();
7677

0 commit comments

Comments
 (0)