Skip to content

Commit bc39433

Browse files
committed
add plugin load mode classpath or shipfile
1 parent 7c3d531 commit bc39433

File tree

6 files changed

+112
-35
lines changed

6 files changed

+112
-35
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2626
import com.dtstack.flink.sql.enums.ClusterMode;
2727
import com.dtstack.flink.sql.enums.ECacheType;
28+
import com.dtstack.flink.sql.enums.PluginLoadMode;
2829
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
2930
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3031
import com.dtstack.flink.sql.option.OptionParser;
@@ -45,10 +46,10 @@
4546
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
4647
import com.dtstack.flink.sql.util.FlinkUtil;
4748
import com.dtstack.flink.sql.util.PluginUtil;
48-
import org.apache.calcite.config.Lex;
4949
import org.apache.calcite.sql.SqlInsert;
5050
import org.apache.calcite.sql.SqlNode;
5151
import org.apache.commons.io.Charsets;
52+
import org.apache.commons.lang3.StringUtils;
5253
import org.apache.flink.api.common.ExecutionConfig;
5354
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
5455
import org.apache.flink.api.common.time.Time;
@@ -84,7 +85,6 @@
8485
import java.util.Set;
8586
import java.util.concurrent.TimeUnit;
8687
import com.dtstack.flink.sql.option.Options;
87-
import org.apache.calcite.sql.parser.SqlParser.Config;
8888

8989
/**
9090
* Date: 2018/6/26
@@ -109,9 +109,9 @@ public static void main(String[] args) throws Exception {
109109
String name = options.getName();
110110
String addJarListStr = options.getAddjar();
111111
String localSqlPluginPath = options.getLocalSqlPluginPath();
112-
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
113112
String deployMode = options.getMode();
114113
String confProp = options.getConfProp();
114+
115115
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
116116
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
117117

@@ -141,7 +141,7 @@ public static void main(String[] args) throws Exception {
141141
//register udf
142142
registerUDF(sqlTree, jarURList, tableEnv);
143143
//register table schema
144-
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
144+
registerTable(sqlTree, env, tableEnv, options, sideTableMap, registerTableCache);
145145

146146
sqlTranslation(options,tableEnv,sqlTree,sideTableMap,registerTableCache);
147147

@@ -230,8 +230,7 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
230230
}
231231

232232

233-
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv,
234-
String localSqlPluginPath, String remoteSqlPluginPath,
233+
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, Options options,
235234
Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
236235
Set<URL> classPathSet = Sets.newHashSet();
237236
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
@@ -240,7 +239,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
240239
if (tableInfo instanceof SourceTableInfo) {
241240

242241
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
243-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
242+
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, options.getLocalSqlPluginPath());
244243
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
245244
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
246245
//Create table in which the function is arranged only need adaptation sql
@@ -267,18 +266,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
267266
LOG.info("registe table {} success.", tableInfo.getName());
268267
}
269268
registerTableCache.put(tableInfo.getName(), regTable);
270-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
269+
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, options));
271270
} else if (tableInfo instanceof TargetTableInfo) {
272271

273-
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
272+
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, options.getLocalSqlPluginPath());
274273
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
275274
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
276-
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
275+
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, options));
277276
} else if(tableInfo instanceof SideTableInfo){
278277

279278
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
280279
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
281-
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
280+
classPathSet.add(buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, options));
282281
}else {
283282
throw new RuntimeException("not support table type:" + tableInfo.getType());
284283
}
@@ -294,6 +293,22 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
294293
}
295294
}
296295

296+
private static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, Options options) throws Exception {
297+
String pluginLoadMode = options.getPluginLoadMode();
298+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, PluginLoadMode.classpath.name())) {
299+
return PluginUtil.getRemoteJarFilePath(type, suffix, options.getRemoteSqlPluginPath(), options.getLocalSqlPluginPath());
300+
}
301+
return PluginUtil.getLocalJarFilePath(type, suffix, options.getLocalSqlPluginPath());
302+
}
303+
304+
private static URL buildSidePathByLoadMode(String type, String operator, String suffix, Options options) throws Exception {
305+
String pluginLoadMode = options.getPluginLoadMode();
306+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, PluginLoadMode.classpath.name())) {
307+
return PluginUtil.getRemoteSideJarFilePath(type, operator, suffix, options.getRemoteSqlPluginPath(), options.getLocalSqlPluginPath());
308+
}
309+
return PluginUtil.getLocalSideJarFilePath(type, operator, suffix, options.getLocalSqlPluginPath());
310+
}
311+
297312
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
298313
confProperties = PropertiesUtils.propertiesTrim(confProperties);
299314

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.dtstack.flink.sql.enums;
2+
3+
public enum PluginLoadMode {
4+
classpath(0),shipfile(1);
5+
6+
private int type;
7+
8+
PluginLoadMode(int type){
9+
this.type = type;
10+
}
11+
12+
public int getType(){
13+
return this.type;
14+
}
15+
}

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.option;
2020

2121
import com.dtstack.flink.sql.enums.ClusterMode;
22+
import com.dtstack.flink.sql.enums.PluginLoadMode;
2223

2324

2425
/**
@@ -71,6 +72,9 @@ public class Options {
7172
@OptionRequired(description = "yarn session configuration,such as yid")
7273
private String yarnSessionConf = "{}";
7374

75+
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
76+
private String pluginLoadMode = PluginLoadMode.classpath.name();
77+
7478
public String getMode() {
7579
return mode;
7680
}
@@ -182,4 +186,12 @@ public String getYarnSessionConf() {
182186
public void setYarnSessionConf(String yarnSessionConf) {
183187
this.yarnSessionConf = yarnSessionConf;
184188
}
189+
190+
public String getPluginLoadMode() {
191+
return pluginLoadMode;
192+
}
193+
194+
public void setPluginLoadMode(String pluginLoadMode) {
195+
this.pluginLoadMode = pluginLoadMode;
196+
}
185197
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ public static URL getRemoteJarFilePath(String pluginType, String tableType, Stri
117117
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
118118
}
119119

120+
public static URL getLocalJarFilePath(String pluginType, String tableType, String localSqlPluginPath) throws Exception {
121+
String dirName = pluginType + tableType.toLowerCase();
122+
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
123+
String jarPath = localSqlPluginPath + SP + dirName;
124+
String jarName = getCoreJarFileName(jarPath, prefix);
125+
return new URL("file:" + jarPath + SP + jarName);
126+
}
127+
120128
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
121129
String dirName = pluginType + sideOperator + tableType.toLowerCase();
122130
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
@@ -125,6 +133,14 @@ public static URL getRemoteSideJarFilePath(String pluginType, String sideOperato
125133
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
126134
}
127135

136+
public static URL getLocalSideJarFilePath(String pluginType, String sideOperator, String tableType, String localSqlPluginPath) throws Exception {
137+
String dirName = pluginType + sideOperator + tableType.toLowerCase();
138+
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
139+
String jarPath = localSqlPluginPath + SP + dirName;
140+
String jarName = getCoreJarFileName(jarPath, prefix);
141+
return new URL("file:" + jarPath + SP + jarName);
142+
}
143+
128144
public static String upperCaseFirstChar(String str){
129145
return str.substring(0, 1).toUpperCase() + str.substring(1);
130146
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
package com.dtstack.flink.sql.launcher.perjob;
2020

2121
import com.dtstack.flink.sql.launcher.YarnConfLoader;
22+
import com.dtstack.flink.sql.option.Options;
2223
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.cache.DistributedCache;
2325
import org.apache.flink.configuration.Configuration;
2426
import org.apache.flink.hadoop.shaded.com.google.common.base.Strings;
27+
import org.apache.flink.runtime.jobgraph.JobGraph;
2528
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
2629
import org.apache.flink.yarn.YarnClusterDescriptor;
2730
import org.apache.hadoop.fs.Path;
@@ -30,8 +33,10 @@
3033

3134
import java.io.File;
3235
import java.net.MalformedURLException;
36+
import java.net.URL;
3337
import java.util.ArrayList;
3438
import java.util.List;
39+
import java.util.Map;
3540
import java.util.Properties;
3641

3742
/**
@@ -42,6 +47,7 @@
4247
*/
4348

4449
public class PerJobClusterClientBuilder {
50+
private static final String FLINK_PLUGIN_CLASSPATH_LOAD = "classpath";
4551

4652
private YarnClient yarnClient;
4753

@@ -60,43 +66,68 @@ public void init(String yarnConfDir){
6066
System.out.println("----init yarn success ----");
6167
}
6268

63-
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, String queue) throws MalformedURLException {
69+
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph) throws MalformedURLException {
6470
Configuration newConf = new Configuration();
65-
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()) );
71+
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()));
6672

6773
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf, yarnConf, ".");
6874

6975
if (StringUtils.isNotBlank(flinkJarPath)) {
70-
7176
if (!new File(flinkJarPath).exists()) {
7277
throw new RuntimeException("The Flink jar path is not exist");
7378
}
74-
7579
}
7680

77-
List<File> shipFiles = new ArrayList<>();
81+
List<File> shipFiles = new ArrayList<>();
7882
if (flinkJarPath != null) {
7983
File[] jars = new File(flinkJarPath).listFiles();
80-
81-
for (File file : jars){
82-
if (file.toURI().toURL().toString().contains("flink-dist")){
84+
for (File file : jars) {
85+
if (file.toURI().toURL().toString().contains("flink-dist")) {
8386
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
8487
} else {
8588
shipFiles.add(file);
8689
}
8790
}
88-
8991
} else {
9092
throw new RuntimeException("The Flink jar path is null");
9193
}
92-
clusterDescriptor.addShipFiles(shipFiles);
94+
// classpath , all node need contain plugin jar
95+
String pluginLoadMode = launcherOptions.getPluginLoadMode();
96+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, FLINK_PLUGIN_CLASSPATH_LOAD)){
97+
fillJobGraphClassPath(jobGraph);
98+
} else {
99+
List<File> pluginPaths = getPluginPathToShipFiles(jobGraph);
100+
shipFiles.addAll(pluginPaths);
101+
}
93102

94-
if(!Strings.isNullOrEmpty(queue)){
103+
clusterDescriptor.addShipFiles(shipFiles);
104+
String queue = launcherOptions.getQueue();
105+
if (!Strings.isNullOrEmpty(queue)) {
95106
clusterDescriptor.setQueue(queue);
96107
}
97108
return clusterDescriptor;
98109
}
99110

111+
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
112+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
113+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
114+
if(tmp.getKey().startsWith("class_path")){
115+
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
116+
}
117+
}
118+
}
119+
120+
private List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
121+
List<File> shipFiles = new ArrayList<>();
122+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
123+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
124+
if(tmp.getKey().startsWith("class_path")){
125+
shipFiles.add(new File(tmp.getValue().filePath));
126+
}
127+
}
128+
return shipFiles;
129+
}
130+
100131
private AbstractYarnClusterDescriptor getClusterDescriptor(
101132
Configuration configuration,
102133
YarnConfiguration yarnConfiguration,

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ public class PerJobSubmitter {
5252
private static final Logger LOG = LoggerFactory.getLogger(PerJobSubmitter.class);
5353

5454
public static String submit(Options launcherOptions, JobGraph jobGraph) throws Exception {
55-
56-
fillJobGraphClassPath(jobGraph);
5755
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
5856
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
5957
List<String> paths = getJarPaths(addjarPath);
@@ -62,8 +60,6 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
6260
});
6361
}
6462

65-
66-
6763
String confProp = launcherOptions.getConfProp();
6864
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
6965
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
@@ -74,7 +70,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph) throws E
7470

7571
String flinkJarPath = launcherOptions.getFlinkJarPath();
7672

77-
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions.getQueue());
73+
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(confProperties, flinkJarPath, launcherOptions, jobGraph);
7874
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
7975

8076
String applicationId = clusterClient.getClusterId().toString();
@@ -95,12 +91,4 @@ private static List<String> getJarPaths(String addjarPath) {
9591
return paths;
9692
}
9793

98-
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
99-
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
100-
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
101-
if(tmp.getKey().startsWith("class_path")){
102-
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
103-
}
104-
}
105-
}
10694
}

0 commit comments

Comments
 (0)