Skip to content

Commit 3ebdaea

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java
2 parents bd847ae + 9f42ef3 commit 3ebdaea

File tree

2 files changed

+33
-50
lines changed

2 files changed

+33
-50
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.lang.reflect.InvocationTargetException;
7777
import java.lang.reflect.Method;
7878
import java.net.URL;
79+
import java.net.URLClassLoader;
7980
import java.net.URLDecoder;
8081
import java.util.List;
8182
import java.util.Map;
@@ -213,11 +214,18 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
213214
}
214215

215216
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
216-
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
217+
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
217218
//register urf
219+
// udf和tableEnv须由同一个类加载器加载
220+
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
221+
URLClassLoader classLoader = null;
218222
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
219223
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
220-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
224+
//classloader
225+
if (classLoader == null) {
226+
classLoader = FlinkUtil.loadExtraJar(jarURList, (URLClassLoader)levelClassLoader);
227+
}
228+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
221229
}
222230
}
223231

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.util;
2222

2323

2424
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
25-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2625
import org.apache.commons.lang3.StringUtils;
2726
import org.apache.flink.api.common.typeinfo.TypeInformation;
2827
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -35,8 +34,6 @@
3534
import org.apache.flink.table.api.java.StreamTableEnvironment;
3635
import org.apache.flink.table.functions.ScalarFunction;
3736
import org.apache.flink.table.functions.TableFunction;
38-
import org.apache.flink.table.functions.AggregateFunction;
39-
4037
import org.slf4j.Logger;
4138
import org.slf4j.LoggerFactory;
4239

@@ -71,14 +68,12 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
7168
}
7269

7370
//设置了时间间隔才表明开启了checkpoint
74-
if(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY) == null && properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null){
71+
if(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY) == null){
7572
return;
7673
}else{
77-
Long sql_interval = Long.valueOf(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_INTERVAL_KEY,"0"));
78-
Long flink_interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY, "0"));
79-
long checkpointInterval = Math.max(sql_interval, flink_interval);
74+
Long interval = Long.valueOf(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_INTERVAL_KEY));
8075
//start checkpoint every ${interval}
81-
env.enableCheckpointing(checkpointInterval);
76+
env.enableCheckpointing(interval);
8277
}
8378

8479
String checkMode = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_MODE_KEY);
@@ -106,14 +101,7 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
106101
env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrCheckpoints);
107102
}
108103

109-
Boolean sqlCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.SQL_CHECKPOINT_CLEANUPMODE_KEY), false);
110-
Boolean flinkCleanMode = MathUtil.getBoolean(properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY), false);
111-
112-
String cleanupModeStr = "false";
113-
if (sqlCleanMode || flinkCleanMode ){
114-
cleanupModeStr = "true";
115-
}
116-
104+
String cleanupModeStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY);
117105
if ("true".equalsIgnoreCase(cleanupModeStr)){
118106
env.getCheckpointConfig().enableExternalizedCheckpoints(
119107
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
@@ -149,7 +137,6 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
149137
if(characteristicStr.equalsIgnoreCase(tmp.toString())){
150138
env.setStreamTimeCharacteristic(tmp);
151139
flag = true;
152-
break;
153140
}
154141
}
155142

@@ -159,20 +146,21 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
159146
}
160147

161148

149+
162150
/**
163-
* TABLE|SCALA|AGGREGATE
151+
* FIXME 暂时不支持 UDF 实现类--有参构造方法
152+
* TABLE|SCALA
164153
* 注册UDF到table env
165154
*/
166155
public static void registerUDF(String type, String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList){
167156
if("SCALA".equalsIgnoreCase(type)){
168157
registerScalaUDF(classPath, funcName, tableEnv, jarURList);
169158
}else if("TABLE".equalsIgnoreCase(type)){
170159
registerTableUDF(classPath, funcName, tableEnv, jarURList);
171-
}else if("AGGREGATE".equalsIgnoreCase(type)){
172-
registerAggregateUDF(classPath, funcName, tableEnv, jarURList);
173160
}else{
174-
throw new RuntimeException("not support of UDF which is not in (TABLE, SCALA, AGGREGATE)");
161+
throw new RuntimeException("not support of UDF which is not in (TABLE, SCALA)");
175162
}
163+
176164
}
177165

178166
/**
@@ -194,7 +182,7 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
194182

195183
/**
196184
* 注册自定义TABLEFFUNC方法到env上
197-
*
185+
* TODO 对User-Defined Aggregate Functions的支持
198186
* @param classPath
199187
* @param funcName
200188
* @param tableEnv
@@ -217,31 +205,6 @@ public static void registerTableUDF(String classPath, String funcName, TableEnvi
217205
}
218206
}
219207

220-
/**
221-
* 注册自定义Aggregate FUNC方法到env上
222-
*
223-
* @param classPath
224-
* @param funcName
225-
* @param tableEnv
226-
*/
227-
public static void registerAggregateUDF(String classPath, String funcName, TableEnvironment tableEnv, List<URL> jarURList) {
228-
try {
229-
AggregateFunction udfFunc = ClassLoaderManager.newInstance(jarURList, (cl) -> cl.loadClass(classPath).asSubclass(AggregateFunction.class).newInstance());
230-
231-
if (tableEnv instanceof StreamTableEnvironment) {
232-
((StreamTableEnvironment) tableEnv).registerFunction(funcName, udfFunc);
233-
} else if (tableEnv instanceof BatchTableEnvironment) {
234-
((BatchTableEnvironment) tableEnv).registerFunction(funcName, udfFunc);
235-
} else {
236-
throw new RuntimeException("no support tableEnvironment class for " + tableEnv.getClass().getName());
237-
}
238-
239-
logger.info("register Aggregate function:{} success.", funcName);
240-
} catch (Exception e) {
241-
logger.error("", e);
242-
throw new RuntimeException("register Aggregate UDF exception:", e);
243-
}
244-
}
245208

246209
/**
247210
*
@@ -276,9 +239,21 @@ public static long getBufferTimeoutMillis(Properties properties){
276239
}
277240

278241
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
242+
243+
int size = 0;
244+
for(URL url : jarURLList){
245+
if(url.toString().endsWith(".jar")){
246+
size++;
247+
}
248+
}
249+
250+
URL[] urlArray = new URL[size];
251+
int i=0;
279252
for(URL url : jarURLList){
280253
if(url.toString().endsWith(".jar")){
254+
urlArray[i] = url;
281255
urlClassLoaderAddUrl(classLoader, url);
256+
i++;
282257
}
283258
}
284259

0 commit comments

Comments
 (0)