Skip to content

Commit 7df8f46

Browse files
committed
plugin load mode code opt
1 parent 416e766 commit 7df8f46

File tree

7 files changed

+48
-41
lines changed

7 files changed

+48
-41
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2626
import com.dtstack.flink.sql.enums.ClusterMode;
2727
import com.dtstack.flink.sql.enums.ECacheType;
28-
import com.dtstack.flink.sql.enums.PluginLoadMode;
28+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2929
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
3030
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3131
import com.dtstack.flink.sql.option.OptionParser;
@@ -296,14 +296,14 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
296296
}
297297

298298
private static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
299-
if (StringUtils.equalsIgnoreCase(pluginLoadMode, PluginLoadMode.classpath.name())) {
299+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
300300
return PluginUtil.getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath);
301301
}
302302
return PluginUtil.getLocalJarFilePath(type, suffix, localSqlPluginPath);
303303
}
304304

305305
private static URL buildSidePathByLoadMode(String type, String operator, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
306-
if (StringUtils.equalsIgnoreCase(pluginLoadMode, PluginLoadMode.classpath.name())) {
306+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
307307
return PluginUtil.getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath);
308308
}
309309
return PluginUtil.getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath);
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dtstack.flink.sql.enums;
2+
3+
/**
4+
*
5+
* CLASSPATH: plugin jar depends on each machine node.
6+
* SHIPFILE: plugin jar only depends on the client submitted by the task.
7+
*
8+
*/
9+
public enum EPluginLoadMode {
10+
11+
CLASSPATH(0),
12+
SHIPFILE(1);
13+
14+
private int type;
15+
16+
EPluginLoadMode(int type){
17+
this.type = type;
18+
}
19+
20+
public int getType(){
21+
return this.type;
22+
}
23+
}

core/src/main/java/com/dtstack/flink/sql/enums/PluginLoadMode.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.dtstack.flink.sql.option;
2020

2121
import com.dtstack.flink.sql.enums.ClusterMode;
22-
import com.dtstack.flink.sql.enums.PluginLoadMode;
22+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2323

2424

2525
/**
@@ -73,7 +73,7 @@ public class Options {
7373
private String yarnSessionConf = "{}";
7474

7575
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
76-
private String pluginLoadMode = PluginLoadMode.classpath.name();
76+
private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
7777

7878
public String getMode() {
7979
return mode;

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -110,35 +110,37 @@ public static Properties stringToProperties(String str) throws IOException{
110110
}
111111

112112
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
113-
String dirName = pluginType + tableType.toLowerCase();
114-
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
115-
String jarPath = localSqlPluginPath + SP + dirName;
116-
String jarName = getCoreJarFileName(jarPath, prefix);
117-
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
113+
return buildFinalJarFilePath(pluginType, tableType, remoteSqlRootDir, localSqlPluginPath);
118114
}
119115

120116
public static URL getLocalJarFilePath(String pluginType, String tableType, String localSqlPluginPath) throws Exception {
117+
return buildFinalJarFilePath(pluginType, tableType, null, localSqlPluginPath);
118+
}
119+
120+
public static URL buildFinalJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
121121
String dirName = pluginType + tableType.toLowerCase();
122122
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
123123
String jarPath = localSqlPluginPath + SP + dirName;
124124
String jarName = getCoreJarFileName(jarPath, prefix);
125-
return new URL("file:" + jarPath + SP + jarName);
125+
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
126+
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
126127
}
127128

128129
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
129-
String dirName = pluginType + sideOperator + tableType.toLowerCase();
130-
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
131-
String jarPath = localSqlPluginPath + SP + dirName;
132-
String jarName = getCoreJarFileName(jarPath, prefix);
133-
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
130+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, remoteSqlRootDir, localSqlPluginPath);
134131
}
135132

136133
public static URL getLocalSideJarFilePath(String pluginType, String sideOperator, String tableType, String localSqlPluginPath) throws Exception {
134+
return buildFinalSideJarFilePath(pluginType, sideOperator, tableType, null, localSqlPluginPath);
135+
}
136+
137+
public static URL buildFinalSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
137138
String dirName = pluginType + sideOperator + tableType.toLowerCase();
138139
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
139140
String jarPath = localSqlPluginPath + SP + dirName;
140141
String jarName = getCoreJarFileName(jarPath, prefix);
141-
return new URL("file:" + jarPath + SP + jarName);
142+
String sqlRootDir = remoteSqlRootDir == null ? localSqlPluginPath : remoteSqlRootDir;
143+
return new URL("file:" + sqlRootDir + SP + dirName + SP + jarName);
142144
}
143145

144146
public static String upperCaseFirstChar(String str){

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.launcher.perjob;
2020

21+
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2122
import com.dtstack.flink.sql.launcher.YarnConfLoader;
2223
import com.dtstack.flink.sql.option.Options;
2324
import org.apache.commons.lang3.StringUtils;
@@ -47,8 +48,6 @@
4748
*/
4849

4950
public class PerJobClusterClientBuilder {
50-
private static final String FLINK_PLUGIN_CLASSPATH_LOAD = "classpath";
51-
5251
private YarnClient yarnClient;
5352

5453
private YarnConfiguration yarnConf;
@@ -93,11 +92,14 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties co
9392
}
9493
// classpath , all node need contain plugin jar
9594
String pluginLoadMode = launcherOptions.getPluginLoadMode();
96-
if (StringUtils.equalsIgnoreCase(pluginLoadMode, FLINK_PLUGIN_CLASSPATH_LOAD)){
95+
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
9796
fillJobGraphClassPath(jobGraph);
98-
} else {
97+
} else if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.SHIPFILE.name())) {
9998
List<File> pluginPaths = getPluginPathToShipFiles(jobGraph);
10099
shipFiles.addAll(pluginPaths);
100+
} else {
101+
throw new IllegalArgumentException("Unsupported plugin loading mode " + pluginLoadMode
102+
+ " Currently only classpath and shipfile are supported.");
101103
}
102104

103105
clusterDescriptor.addShipFiles(shipFiles);

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.dtstack.flink.sql.util.PluginUtil;
2323
import org.apache.commons.io.Charsets;
2424
import org.apache.commons.lang3.StringUtils;
25-
import org.apache.flink.api.common.cache.DistributedCache;
2625
import org.apache.flink.client.deployment.ClusterSpecification;
2726
import org.apache.flink.client.program.ClusterClient;
2827
import org.apache.flink.core.fs.Path;
@@ -31,13 +30,9 @@
3130
import org.apache.hadoop.yarn.api.records.ApplicationId;
3231
import org.slf4j.Logger;
3332
import org.slf4j.LoggerFactory;
34-
35-
import java.net.MalformedURLException;
36-
import java.net.URL;
3733
import java.net.URLDecoder;
3834
import java.util.Arrays;
3935
import java.util.List;
40-
import java.util.Map;
4136
import java.util.Properties;
4237

4338
/**

0 commit comments

Comments
 (0)