Skip to content

Commit 2969771

Browse files
committed
flinksql增加udaf函数功能
1 parent b683234 commit 2969771

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.util;
2222

@@ -33,6 +33,9 @@
3333
import org.apache.flink.table.api.java.StreamTableEnvironment;
3434
import org.apache.flink.table.functions.ScalarFunction;
3535
import org.apache.flink.table.functions.TableFunction;
36+
import org.apache.flink.table.functions.UserDefinedFunction;
37+
import org.apache.flink.table.functions.AggregateFunction;
38+
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841

@@ -192,13 +195,25 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
192195
public static void registerTableUDF(String classPath, String funcName, TableEnvironment tableEnv,
193196
ClassLoader classLoader){
194197
try {
195-
TableFunction udfFunc = Class.forName(classPath, false, classLoader)
196-
.asSubclass(TableFunction.class).newInstance();
198+
UserDefinedFunction udfFunc = Class.forName(classPath,false, classLoader)
199+
.asSubclass(UserDefinedFunction.class).newInstance();
197200

198201
if(tableEnv instanceof StreamTableEnvironment){
199-
((StreamTableEnvironment)tableEnv).registerFunction(funcName, udfFunc);
202+
if (udfFunc instanceof AggregateFunction){
203+
((StreamTableEnvironment) tableEnv).registerFunction(funcName, (AggregateFunction)udfFunc);
204+
}else if (udfFunc instanceof TableFunction) {
205+
((StreamTableEnvironment) tableEnv).registerFunction(funcName, (TableFunction)udfFunc);
206+
}else{
207+
throw new RuntimeException("no support UserDefinedFunction class for " + udfFunc.getClass().getName());
208+
}
200209
}else if(tableEnv instanceof BatchTableEnvironment){
201-
((BatchTableEnvironment)tableEnv).registerFunction(funcName, udfFunc);
210+
if (udfFunc instanceof AggregateFunction){
211+
((BatchTableEnvironment) tableEnv).registerFunction(funcName, (AggregateFunction)udfFunc);
212+
}else if (udfFunc instanceof TableFunction) {
213+
((BatchTableEnvironment) tableEnv).registerFunction(funcName, (TableFunction)udfFunc);
214+
}else{
215+
throw new RuntimeException("no support UserDefinedFunction class for " + udfFunc.getClass().getName());
216+
}
202217
}else{
203218
throw new RuntimeException("no support tableEnvironment class for " + tableEnv.getClass().getName());
204219
}

0 commit comments

Comments
 (0)