Skip to content

Commit df44eb2

Browse files
committed
fill up remote class path
1 parent 2c784e6 commit df44eb2

File tree

3 files changed

+11
-15
lines changed

3 files changed

+11
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
288288
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
289289
tableEnv.registerTable(tableInfo.getName(), regTable);
290290
registerTableCache.put(tableInfo.getName(), regTable);
291-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
291+
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
292292
} else if (tableInfo instanceof TargetTableInfo) {
293293

294294
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
295295
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
296296
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
297-
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
297+
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
298298
} else if(tableInfo instanceof SideTableInfo){
299299

300300
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
301301
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
302-
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
302+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
303303
}else {
304304
throw new RuntimeException("not support table type:" + tableInfo.getType());
305305
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,19 @@ public static Properties stringToProperties(String str) throws IOException{
107107
return properties;
108108
}
109109

110-
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws Exception {
110+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
111111
String dirName = pluginType + tableType.toLowerCase();
112112
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
113-
String jarPath = remoteSqlRootDir + SP + dirName;
114-
String jarName = getCoreJarFileName(jarPath, prefix, false);
113+
String jarPath = localSqlPluginPath + SP + dirName;
114+
String jarName = getCoreJarFileName(jarPath, prefix);
115115
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
116116
}
117117

118-
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws Exception {
118+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
119119
String dirName = pluginType + sideOperator + tableType.toLowerCase();
120120
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
121-
String jarPath = remoteSqlRootDir + SP + dirName;
122-
String jarName = getCoreJarFileName(jarPath, prefix, false);
121+
String jarPath = localSqlPluginPath + SP + dirName;
122+
String jarName = getCoreJarFileName(jarPath, prefix);
123123
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
124124
}
125125

@@ -144,11 +144,7 @@ public static void addPluginJar(String pluginDir, DtClassLoader classLoader) thr
144144
}
145145
}
146146

147-
public static String getCoreJarFileName (String path, String prefix, boolean existCheck) throws Exception {
148-
if (!existCheck) {
149-
return prefix.toLowerCase() + ".jar";
150-
}
151-
147+
public static String getCoreJarFileName (String path, String prefix) throws Exception {
152148
String coreJarFileName = null;
153149
File pluginDir = new File(path);
154150
if (pluginDir.exists() && pluginDir.isDirectory()){

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class LauncherMain {
5959

6060

6161
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
62-
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR, true);
62+
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
6363
String corePath = localSqlRootJar + SP + jarPath;
6464
return corePath;
6565
}

0 commit comments

Comments
 (0)