1616 * limitations under the License.
1717 */
1818
19-
19+
2020
2121package com .dtstack .flink .sql .util ;
2222
3434import org .apache .flink .table .api .java .StreamTableEnvironment ;
3535import org .apache .flink .table .functions .ScalarFunction ;
3636import org .apache .flink .table .functions .TableFunction ;
37+ import org .apache .flink .table .functions .AggregateFunction ;
38+
3739import org .slf4j .Logger ;
3840import org .slf4j .LoggerFactory ;
3941
@@ -156,10 +158,8 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
156158 }
157159
158160
159-
160161 /**
161- * FIXME 暂时不支持 UDF 实现类--有参构造方法
162- * TABLE|SCALA
162+ * TABLE|SCALA|AGGREGATE
163163 * 注册UDF到table env
164164 */
165165 public static void registerUDF (String type , String classPath , String funcName , TableEnvironment tableEnv ,
@@ -168,10 +168,11 @@ public static void registerUDF(String type, String classPath, String funcName, T
168168 registerScalaUDF (classPath , funcName , tableEnv , classLoader );
169169 }else if ("TABLE" .equalsIgnoreCase (type )){
170170 registerTableUDF (classPath , funcName , tableEnv , classLoader );
171+ }else if ("AGGREGATE" .equalsIgnoreCase (type )){
172+ registerAggregateUDF (classPath , funcName , tableEnv , classLoader );
171173 }else {
172- throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA)" );
174+ throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA, AGGREGATE )" );
173175 }
174-
175176 }
176177
177178 /**
@@ -195,7 +196,7 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
195196
196197 /**
197198 * 注册自定义TABLEFFUNC方法到env上
198- * TODO 对User-Defined Aggregate Functions的支持
199+ *
199200 * @param classPath
200201 * @param funcName
201202 * @param tableEnv
@@ -206,11 +207,11 @@ public static void registerTableUDF(String classPath, String funcName, TableEnvi
206207 TableFunction udfFunc = Class .forName (classPath , false , classLoader )
207208 .asSubclass (TableFunction .class ).newInstance ();
208209
209- if (tableEnv instanceof StreamTableEnvironment ){
210- ((StreamTableEnvironment )tableEnv ).registerFunction (funcName , udfFunc );
211- }else if (tableEnv instanceof BatchTableEnvironment ){
212- ((BatchTableEnvironment )tableEnv ).registerFunction (funcName , udfFunc );
213- }else {
210+ if (tableEnv instanceof StreamTableEnvironment ) {
211+ ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
212+ } else if (tableEnv instanceof BatchTableEnvironment ) {
213+ ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
214+ } else {
214215 throw new RuntimeException ("no support tableEnvironment class for " + tableEnv .getClass ().getName ());
215216 }
216217
@@ -221,6 +222,33 @@ public static void registerTableUDF(String classPath, String funcName, TableEnvi
221222 }
222223 }
223224
225+ /**
226+ * 注册自定义Aggregate FUNC方法到env上
227+ *
228+ * @param classPath
229+ * @param funcName
230+ * @param tableEnv
231+ */
232+ public static void registerAggregateUDF (String classPath , String funcName , TableEnvironment tableEnv ,
233+ ClassLoader classLoader ) {
234+ try {
235+ AggregateFunction udfFunc = Class .forName (classPath , false , classLoader )
236+ .asSubclass (AggregateFunction .class ).newInstance ();
237+
238+ if (tableEnv instanceof StreamTableEnvironment ) {
239+ ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
240+ } else if (tableEnv instanceof BatchTableEnvironment ) {
241+ ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
242+ } else {
243+ throw new RuntimeException ("no support tableEnvironment class for " + tableEnv .getClass ().getName ());
244+ }
245+
246+ logger .info ("register Aggregate function:{} success." , funcName );
247+ } catch (Exception e ) {
248+ logger .error ("" , e );
249+ throw new RuntimeException ("register Aggregate UDF exception:" , e );
250+ }
251+ }
224252
225253 /**
226254 *
0 commit comments