3333import org .apache .flink .table .api .java .StreamTableEnvironment ;
3434import org .apache .flink .table .functions .ScalarFunction ;
3535import org .apache .flink .table .functions .TableFunction ;
36- import org .apache .flink .table .functions .UserDefinedFunction ;
3736import org .apache .flink .table .functions .AggregateFunction ;
3837
3938import org .slf4j .Logger ;
@@ -148,10 +147,8 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
148147 }
149148
150149
151-
152150 /**
153- * FIXME 暂时不支持 UDF 实现类--有参构造方法
154- * TABLE|SCALA
151+ * TABLE|SCALA|AGGREGATE
155152 * 注册UDF到table env
156153 */
157154 public static void registerUDF (String type , String classPath , String funcName , TableEnvironment tableEnv ,
@@ -160,10 +157,11 @@ public static void registerUDF(String type, String classPath, String funcName, T
160157 registerScalaUDF (classPath , funcName , tableEnv , classLoader );
161158 }else if ("TABLE" .equalsIgnoreCase (type )){
162159 registerTableUDF (classPath , funcName , tableEnv , classLoader );
160+ }else if ("AGGREGATE" .equalsIgnoreCase (type )){
161+ registerAggregateUDF (classPath , funcName , tableEnv , classLoader );
163162 }else {
164- throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA)" );
163+ throw new RuntimeException ("not support of UDF which is not in (TABLE, SCALA, AGGREGATE )" );
165164 }
166-
167165 }
168166
169167 /**
@@ -187,34 +185,22 @@ public static void registerScalaUDF(String classPath, String funcName, TableEnvi
187185
188186 /**
189187 * 注册自定义TABLEFFUNC方法到env上
190- * TODO 对User-Defined Aggregate Functions的支持
188+ *
191189 * @param classPath
192190 * @param funcName
193191 * @param tableEnv
194192 */
195193 public static void registerTableUDF (String classPath , String funcName , TableEnvironment tableEnv ,
196194 ClassLoader classLoader ){
197195 try {
198- UserDefinedFunction udfFunc = Class .forName (classPath ,false , classLoader )
199- .asSubclass (UserDefinedFunction .class ).newInstance ();
200-
201- if (tableEnv instanceof StreamTableEnvironment ){
202- if (udfFunc instanceof AggregateFunction ){
203- ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , (AggregateFunction )udfFunc );
204- }else if (udfFunc instanceof TableFunction ) {
205- ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , (TableFunction )udfFunc );
206- }else {
207- throw new RuntimeException ("no support UserDefinedFunction class for " + udfFunc .getClass ().getName ());
208- }
209- }else if (tableEnv instanceof BatchTableEnvironment ){
210- if (udfFunc instanceof AggregateFunction ){
211- ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , (AggregateFunction )udfFunc );
212- }else if (udfFunc instanceof TableFunction ) {
213- ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , (TableFunction )udfFunc );
214- }else {
215- throw new RuntimeException ("no support UserDefinedFunction class for " + udfFunc .getClass ().getName ());
216- }
217- }else {
196+ TableFunction udfFunc = Class .forName (classPath , false , classLoader )
197+ .asSubclass (TableFunction .class ).newInstance ();
198+
199+ if (tableEnv instanceof StreamTableEnvironment ) {
200+ ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
201+ } else if (tableEnv instanceof BatchTableEnvironment ) {
202+ ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
203+ } else {
218204 throw new RuntimeException ("no support tableEnvironment class for " + tableEnv .getClass ().getName ());
219205 }
220206
@@ -225,6 +211,33 @@ public static void registerTableUDF(String classPath, String funcName, TableEnvi
225211 }
226212 }
227213
214+ /**
215+ * 注册自定义Aggregate FUNC方法到env上
216+ *
217+ * @param classPath
218+ * @param funcName
219+ * @param tableEnv
220+ */
221+ public static void registerAggregateUDF (String classPath , String funcName , TableEnvironment tableEnv ,
222+ ClassLoader classLoader ) {
223+ try {
224+ AggregateFunction udfFunc = Class .forName (classPath , false , classLoader )
225+ .asSubclass (AggregateFunction .class ).newInstance ();
226+
227+ if (tableEnv instanceof StreamTableEnvironment ) {
228+ ((StreamTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
229+ } else if (tableEnv instanceof BatchTableEnvironment ) {
230+ ((BatchTableEnvironment ) tableEnv ).registerFunction (funcName , udfFunc );
231+ } else {
232+ throw new RuntimeException ("no support tableEnvironment class for " + tableEnv .getClass ().getName ());
233+ }
234+
235+ logger .info ("register Aggregate function:{} success." , funcName );
236+ } catch (Exception e ) {
237+ logger .error ("" , e );
238+ throw new RuntimeException ("register Aggregate UDF exception:" , e );
239+ }
240+ }
228241
229242 /**
230243 *
0 commit comments