Skip to content

Commit 9f42ef3

Browse files
committed
Merge branch 'v1.5.0_dev_udfclassloader' into 'v1.5.0_dev'
udf类加载器问题 See merge request !96
2 parents d60ac39 + 561474b commit 9f42ef3

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.lang.reflect.InvocationTargetException;
7878
import java.lang.reflect.Method;
7979
import java.net.URL;
80+
import java.net.URLClassLoader;
8081
import java.net.URLDecoder;
8182
import java.util.List;
8283
import java.util.Map;
@@ -237,11 +238,18 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
237238
}
238239

239240
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
240-
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
241+
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
241242
//register urf
243+
// udf和tableEnv须由同一个类加载器加载
244+
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
245+
URLClassLoader classLoader = null;
242246
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
243247
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
244-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
248+
//classloader
249+
if (classLoader == null) {
250+
classLoader = FlinkUtil.loadExtraJar(jarURList, (URLClassLoader)levelClassLoader);
251+
}
252+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
245253
}
246254
}
247255

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,11 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
152152
* TABLE|SCALA
153153
* 注册UDF到table env
154154
*/
155-
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
155+
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader){
156156
if("SCALA".equalsIgnoreCase(type)){
157-
registerScalaUDF(classPath, funcName, tableEnv, jarURList);
157+
registerScalaUDF(classPath, funcName, tableEnv, classLoader);
158158
}else if("TABLE".equalsIgnoreCase(type)){
159-
registerTableUDF(classPath, funcName, tableEnv, jarURList);
159+
registerTableUDF(classPath, funcName, tableEnv, classLoader);
160160
}else{
161161
throw new RuntimeException("not support of UDF which is not in (TABLE, SCALA)");
162162
}
@@ -169,9 +169,10 @@ public static void registerUDF(String type, String classPath, String funcName, T
169169
* @param funcName
170170
* @param tableEnv
171171
*/
172-
public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
172+
public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader){
173173
try{
174-
ScalarFunction udfFunc = ClassLoaderManager.newInstance(jarURList, (cl) -> cl.loadClass(classPath).asSubclass(ScalarFunction.class).newInstance());
174+
ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
175+
.asSubclass(ScalarFunction.class).newInstance();
175176
tableEnv.registerFunction(funcName, udfFunc);
176177
logger.info("register scala function:{} success.", funcName);
177178
}catch (Exception e){
@@ -187,9 +188,10 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
187188
* @param funcName
188189
* @param tableEnv
189190
*/
190-
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
191+
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv, ClassLoader classLoader){
191192
try {
192-
TableFunction udfFunc = ClassLoaderManager.newInstance(jarURList, (cl) -> cl.loadClass(classPath).asSubclass(TableFunction.class).newInstance());
193+
ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
194+
.asSubclass(ScalarFunction.class).newInstance();
193195
if(tableEnv instanceof StreamTableEnvironment){
194196
((StreamTableEnvironment)tableEnv).registerFunction(funcName, udfFunc);
195197
}else if(tableEnv instanceof BatchTableEnvironment){

0 commit comments

Comments
 (0)