Skip to content

Commit cd0e854

Browse files
committed
Merge branch '1.8.0_dev_feature_pluginjar' into 'v1.8.0_dev'
可选的插件加载方式 See merge request !98
2 parents 7937ecb + 236312b commit cd0e854

File tree

7 files changed

+201
-44
lines changed

7 files changed

+201
-44
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2626
import com.dtstack.flink.sql.enums.ClusterMode;
2727
import com.dtstack.flink.sql.enums.ECacheType;
28+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2829
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
2930
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3031
import com.dtstack.flink.sql.option.OptionParser;
@@ -45,10 +46,10 @@
4546
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
4647
import com.dtstack.flink.sql.util.FlinkUtil;
4748
import com.dtstack.flink.sql.util.PluginUtil;
48-
import org.apache.calcite.config.Lex;
4949
import org.apache.calcite.sql.SqlInsert;
5050
import org.apache.calcite.sql.SqlNode;
5151
import org.apache.commons.io.Charsets;
52+
import org.apache.commons.lang3.StringUtils;
5253
import org.apache.flink.api.common.ExecutionConfig;
5354
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
5455
import org.apache.flink.api.common.time.Time;
@@ -84,7 +85,6 @@
8485
import java.util.Set;
8586
import java.util.concurrent.TimeUnit;
8687
import com.dtstack.flink.sql.option.Options;
87-
import org.apache.calcite.sql.parser.SqlParser.Config;
8888

8989
/**
9090
* Date: 2018/6/26
@@ -110,8 +110,10 @@ public static void main(String[] args) throws Exception {
110110
String addJarListStr = options.getAddjar();
111111
String localSqlPluginPath = options.getLocalSqlPluginPath();
112112
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
113+
String pluginLoadMode = options.getPluginLoadMode();
113114
String deployMode = options.getMode();
114115
String confProp = options.getConfProp();
116+
115117
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
116118
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
117119

@@ -141,9 +143,9 @@ public static void main(String[] args) throws Exception {
141143
//register udf
142144
registerUDF(sqlTree, jarURList, tableEnv);
143145
//register table schema
144-
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
146+
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
145147

146-
sqlTranslation(options,tableEnv,sqlTree,sideTableMap,registerTableCache);
148+
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
147149

148150
if(env instanceof MyLocalStreamEnvironment) {
149151
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -152,9 +154,9 @@ public static void main(String[] args) throws Exception {
152154
env.execute(name);
153155
}
154156

155-
private static void sqlTranslation(Options options,StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
157+
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
156158
SideSqlExec sideSqlExec = new SideSqlExec();
157-
sideSqlExec.setLocalSqlPluginPath(options.getLocalSqlPluginPath());
159+
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
158160
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
159161
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
160162
}
@@ -230,9 +232,8 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTabl
230232
}
231233

232234

233-
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv,
234-
String localSqlPluginPath, String remoteSqlPluginPath,
235-
Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
235+
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
236+
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
236237
Set<URL> classPathSet = Sets.newHashSet();
237238
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
238239
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
@@ -267,18 +268,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
267268
LOG.info("registe table {} success.", tableInfo.getName());
268269
}
269270
registerTableCache.put(tableInfo.getName(), regTable);
270-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
271+
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
271272
} else if (tableInfo instanceof TargetTableInfo) {
272273

273274
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
274275
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
275276
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
276-
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
277+
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
277278
} else if(tableInfo instanceof SideTableInfo){
278279

279280
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
280281
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
281-
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
282+
classPathSet.add(buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
282283
}else {
283284
throw new RuntimeException("not support table type:" + tableInfo.getType());
284285
}
@@ -294,6 +295,20 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
294295
}
295296
}
296297

298+
private static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
299+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
300+
return PluginUtil.getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath);
301+
}
302+
return PluginUtil.getLocalJarFilePath(type, suffix, localSqlPluginPath);
303+
}
304+
305+
private static URL buildSidePathByLoadMode(String type, String operator, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
306+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
307+
return PluginUtil.getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath);
308+
}
309+
return PluginUtil.getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath);
310+
}
311+
297312
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
298313
confProperties = PropertiesUtils.propertiesTrim(confProperties);
299314

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
*
23+
* CLASSPATH: plugin jar depends on each machine node.
24+
* SHIPFILE: plugin jar only depends on the client submitted by the task.
25+
*
26+
*/
27+
public enum EPluginLoadMode {
28+
29+
CLASSPATH(0),
30+
SHIPFILE(1);
31+
32+
private int type;
33+
34+
EPluginLoadMode(int type){
35+
this.type = type;
36+
}
37+
38+
public int getType(){
39+
return this.type;
40+
}
41+
}

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.option;
2020

2121
import com.dtstack.flink.sql.enums.ClusterMode;
22+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2223

2324

2425
/**
@@ -71,6 +72,9 @@ public class Options {
7172
@OptionRequired(description = "yarn session configuration,such as yid")
7273
private String yarnSessionConf = "{}";
7374

75+
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
76+
private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
77+
7478
public String getMode() {
7579
return mode;
7680
}
@@ -182,4 +186,12 @@ public String getYarnSessionConf() {
182186
public void setYarnSessionConf(String yarnSessionConf) {
183187
this.yarnSessionConf = yarnSessionConf;
184188
}
189+
190+
public String getPluginLoadMode() {
191+
return pluginLoadMode;
192+
}
193+
194+
public void setPluginLoadMode(String pluginLoadMode) {
195+
this.pluginLoadMode = pluginLoadMode;
196+
}
185197
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,37 @@ public static Properties stringToProperties(String str) throws IOException{
110110
}
111111

112112
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
113+
return buildFinalJarFilePath(pluginType, tableType, remoteSqlRootDir, localSqlPluginPath);
114+
}
115+
116+
public static URL getLocalJarFilePath(String pluginType, String tableType, String localSqlPluginPath) throws Exception {
117+
return buildFinalJarFilePath(pluginType, tableType, null, localSqlPluginPath);
118+
}
119+
120+
public static URL buildFinalJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
113121
String dirName = pluginType + tableType.toLowerCase();
114122
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
115123
String jarPath = localSqlPluginPath + SP + dirName;
116124
String jarName = getCoreJarFileName(jarPath, prefix);
117-
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
125+
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
126+
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
118127
}
119128

120129
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
130+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, remoteSqlRootDir, localSqlPluginPath);
131+
}
132+
133+
public static URL getLocalSideJarFilePath(String pluginType, String sideOperator, String tableType, String localSqlPluginPath) throws Exception {
134+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, null, localSqlPluginPath);
135+
}
136+
137+
public static URL buildFinalSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
121138
String dirName = pluginType + sideOperator + tableType.toLowerCase();
122139
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
123140
String jarPath = localSqlPluginPath + SP + dirName;
124141
String jarName = getCoreJarFileName(jarPath, prefix);
125-
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
142+
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
143+
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
126144
}
127145

128146
public static String upperCaseFirstChar(String str){

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
package com.dtstack.flink.sql.launcher.perjob;
2020

21+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2122
import com.dtstack.flink.sql.launcher.YarnConfLoader;
23+
import com.dtstack.flink.sql.option.Options;
2224
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.cache.DistributedCache;
2326
import org.apache.flink.configuration.Configuration;
2427
import org.apache.flink.hadoop.shaded.com.google.common.base.Strings;
28+
import org.apache.flink.runtime.jobgraph.JobGraph;
2529
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
2630
import org.apache.flink.yarn.YarnClusterDescriptor;
2731
import org.apache.hadoop.fs.Path;
@@ -30,8 +34,10 @@
3034

3135
import java.io.File;
3236
import java.net.MalformedURLException;
37+
import java.net.URL;
3338
import java.util.ArrayList;
3439
import java.util.List;
40+
import java.util.Map;
3541
import java.util.Properties;
3642

3743
/**
@@ -42,7 +48,6 @@
4248
*/
4349

4450
public class PerJobClusterClientBuilder {
45-
4651
private YarnClient yarnClient;
4752

4853
private YarnConfiguration yarnConf;
@@ -60,43 +65,71 @@ public void init(String yarnConfDir){
6065
System.out.println("----init yarn success ----");
6166
}
6267

63-
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, String queue) throws MalformedURLException {
68+
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties confProp, String flinkJarPath, Options launcherOptions, JobGraph jobGraph) throws MalformedURLException {
6469
Configuration newConf = new Configuration();
65-
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()) );
70+
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()));
6671

6772
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf, yarnConf, ".");
6873

6974
if (StringUtils.isNotBlank(flinkJarPath)) {
70-
7175
if (!new File(flinkJarPath).exists()) {
7276
throw new RuntimeException("The Flink jar path is not exist");
7377
}
74-
7578
}
7679

77-
List<File> shipFiles = new ArrayList<>();
80+
List<File> shipFiles = new ArrayList<>();
7881
if (flinkJarPath != null) {
7982
File[] jars = new File(flinkJarPath).listFiles();
80-
81-
for (File file : jars){
82-
if (file.toURI().toURL().toString().contains("flink-dist")){
83+
for (File file : jars) {
84+
if (file.toURI().toURL().toString().contains("flink-dist")) {
8385
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
8486
} else {
8587
shipFiles.add(file);
8688
}
8789
}
88-
8990
} else {
9091
throw new RuntimeException("The Flink jar path is null");
9192
}
92-
clusterDescriptor.addShipFiles(shipFiles);
93+
// classpath , all node need contain plugin jar
94+
String pluginLoadMode = launcherOptions.getPluginLoadMode();
95+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
96+
fillJobGraphClassPath(jobGraph);
97+
} else if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.SHIPFILE.name())) {
98+
List<File> pluginPaths = getPluginPathToShipFiles(jobGraph);
99+
shipFiles.addAll(pluginPaths);
100+
} else {
101+
throw new IllegalArgumentException("Unsupported plugin loading mode " + pluginLoadMode
102+
+ " Currently only classpath and shipfile are supported.");
103+
}
93104

94-
if(!Strings.isNullOrEmpty(queue)){
105+
clusterDescriptor.addShipFiles(shipFiles);
106+
String queue = launcherOptions.getQueue();
107+
if (!Strings.isNullOrEmpty(queue)) {
95108
clusterDescriptor.setQueue(queue);
96109
}
97110
return clusterDescriptor;
98111
}
99112

113+
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
114+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
115+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
116+
if(tmp.getKey().startsWith("class_path")){
117+
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
118+
}
119+
}
120+
}
121+
122+
private List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
123+
List<File> shipFiles = new ArrayList<>();
124+
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
125+
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
126+
if(tmp.getKey().startsWith("class_path")){
127+
shipFiles.add(new File(tmp.getValue().filePath));
128+
}
129+
}
130+
return shipFiles;
131+
}
132+
100133
private AbstractYarnClusterDescriptor getClusterDescriptor(
101134
Configuration configuration,
102135
YarnConfiguration yarnConfiguration,

0 commit comments

Comments
 (0)