2222
2323
2424import com .dtstack .flink .sql .classloader .ClassLoaderManager ;
25+ import com .dtstack .flink .sql .constrant .ConfigConstrant ;
2526import org .apache .commons .lang3 .StringUtils ;
2627import org .apache .flink .api .common .typeinfo .TypeInformation ;
2728import org .apache .flink .runtime .state .filesystem .FsStateBackend ;
3233import org .apache .flink .table .api .TableEnvironment ;
3334import org .apache .flink .table .api .java .BatchTableEnvironment ;
3435import org .apache .flink .table .api .java .StreamTableEnvironment ;
36+ import org .apache .flink .table .functions .AggregateFunction ;
3537import org .apache .flink .table .functions .ScalarFunction ;
3638import org .apache .flink .table .functions .TableFunction ;
3739import org .slf4j .Logger ;
@@ -148,17 +150,18 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
148150
149151
150152 /**
151- * FIXME 暂时不支持 UDF 实现类--有参构造方法
152- * TABLE|SCALA
153+ * TABLE|SCALA|AGGREGATE
153154 * 注册UDF到table env
154155 */
155- public static void registerUDF (String type , String classPath , String funcName , TableEnvironment tableEnv , List < URL > jarURList ){
156+ public static void registerUDF (String type , String classPath , String funcName , TableEnvironment tableEnv , ClassLoader classLoader ){
156157 if ("SCALA" .equalsIgnoreCase (type )){
157- registerScalaUDF (classPath , funcName , tableEnv , jarURList );
158+ registerScalaUDF (classPath , funcName , tableEnv , classLoader );
158159 }else if ("TABLE" .equalsIgnoreCase (type )){
159- registerTableUDF (classPath , funcName , tableEnv , jarURList );
160+ registerTableUDF (classPath , funcName , tableEnv , classLoader );
161+ }else if ("AGGREGATE" .equalsIgnoreCase (type )){
162+ registerAggregateUDF (classPath , funcName , tableEnv , classLoader );
160163 }else {
161- throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA)" );
164+ throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA, AGGREGATE )" );
162165 }
163166
164167 }
@@ -169,9 +172,10 @@ public static void registerUDF(String type, String classPath, String funcName, T
169172 * @param funcName
170173 * @param tableEnv
171174 */
172- public static void registerScalaUDF (String classPath , String funcName , TableEnvironment tableEnv , List < URL > jarURList ){
175+ public static void registerScalaUDF (String classPath , String funcName , TableEnvironment tableEnv , ClassLoader classLoader ){
173176 try {
174- ScalarFunction udfFunc = ClassLoaderManager .newInstance (jarURList , (cl ) -> cl .loadClass (classPath ).asSubclass (ScalarFunction .class ).newInstance ());
177+ ScalarFunction udfFunc = Class .forName (classPath , false , classLoader )
178+ .asSubclass (ScalarFunction .class ).newInstance ();
175179 tableEnv .registerFunction (funcName , udfFunc );
176180 logger .info ("register scala function:{} success." , funcName );
177181 }catch (Exception e ){
@@ -182,14 +186,15 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
182186
183187 /**
184188 * 注册自定义TABLEFFUNC方法到env上
185- * TODO 对User-Defined Aggregate Functions的支持
189+ *
186190 * @param classPath
187191 * @param funcName
188192 * @param tableEnv
189193 */
190- public static void registerTableUDF (String classPath , String funcName , TableEnvironment tableEnv , List < URL > jarURList ){
194+ public static void registerTableUDF (String classPath , String funcName , TableEnvironment tableEnv , ClassLoader classLoader ){
191195 try {
192- TableFunction udfFunc = ClassLoaderManager .newInstance (jarURList , (cl ) -> cl .loadClass (classPath ).asSubclass (TableFunction .class ).newInstance ());
196+ TableFunction udfFunc = Class .forName (classPath , false , classLoader )
197+ .asSubclass (TableFunction .class ).newInstance ();
193198 if (tableEnv instanceof StreamTableEnvironment ){
194199 ((StreamTableEnvironment )tableEnv ).registerFunction (funcName , udfFunc );
195200 }else if (tableEnv instanceof BatchTableEnvironment ){
@@ -205,6 +210,31 @@ public static void registerTableUDF(String classPath, String funcName, TableEnvi
205210 }
206211 }
207212
213+ /**
214+ * 注册自定义Aggregate FUNC方法到env上
215+ *
216+ * @param classPath
217+ * @param funcName
218+ * @param tableEnv
219+ */
220+ public static void registerAggregateUDF (String classPath , String funcName , TableEnvironment tableEnv , ClassLoader classLoader ) {
221+ try {
222+ AggregateFunction udfFunc = Class .forName (classPath , false , classLoader )
223+ .asSubclass (AggregateFunction .class ).newInstance ();
224+ if (tableEnv instanceof StreamTableEnvironment ) {
225+ ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
226+ } else if (tableEnv instanceof BatchTableEnvironment ) {
227+ ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
228+ } else {
229+ throw new RuntimeException ("no support tableEnvironment class for " + tableEnv .getClass ().getName ());
230+ }
231+
232+ logger .info ("register Aggregate function:{} success." , funcName );
233+ } catch (Exception e ) {
234+ logger .error ("" , e );
235+ throw new RuntimeException ("register Aggregate UDF exception:" , e );
236+ }
237+ }
208238
209239 /**
210240 *
0 commit comments