1616 * limitations under the License.
1717 */
1818
19-
19+
2020
2121package com .dtstack .flink .sql .util ;
2222
3333import org .apache .flink .table .api .TableEnvironment ;
3434import org .apache .flink .table .api .java .BatchTableEnvironment ;
3535import org .apache .flink .table .api .java .StreamTableEnvironment ;
36- import org .apache .flink .table .functions .AggregateFunction ;
3736import org .apache .flink .table .functions .ScalarFunction ;
3837import org .apache .flink .table .functions .TableFunction ;
38+ import org .apache .flink .table .functions .AggregateFunction ;
39+
3940import org .slf4j .Logger ;
4041import org .slf4j .LoggerFactory ;
4142
@@ -70,12 +71,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
7071 }
7172
7273 //设置了时间间隔才表明开启了checkpoint
73- if (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY ) == null ){
74+ if (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_INTERVAL_KEY ) == null && properties . getProperty ( ConfigConstrant . FLINK_CHECKPOINT_INTERVAL_KEY ) == null ){
7475 return ;
7576 }else {
76- Long interval = Long .valueOf (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY ));
77+ Long sql_interval = Long .valueOf (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_INTERVAL_KEY ,"0" ));
78+ Long flink_interval = Long .valueOf (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY , "0" ));
79+ long checkpointInterval = Math .max (sql_interval , flink_interval );
7780 //start checkpoint every ${interval}
78- env .enableCheckpointing (interval );
81+ env .enableCheckpointing (checkpointInterval );
7982 }
8083
8184 String checkMode = properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_MODE_KEY );
@@ -103,7 +106,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
103106 env .getCheckpointConfig ().setMaxConcurrentCheckpoints (maxConcurrCheckpoints );
104107 }
105108
106- String cleanupModeStr = properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_CLEANUPMODE_KEY );
109+ Boolean sqlCleanMode = MathUtil .getBoolean (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_CLEANUPMODE_KEY ), false );
110+ Boolean flinkCleanMode = MathUtil .getBoolean (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_CLEANUPMODE_KEY ), false );
111+
112+ String cleanupModeStr = "false" ;
113+ if (sqlCleanMode || flinkCleanMode ){
114+ cleanupModeStr = "true" ;
115+ }
116+
107117 if ("true" .equalsIgnoreCase (cleanupModeStr )){
108118 env .getCheckpointConfig ().enableExternalizedCheckpoints (
109119 CheckpointConfig .ExternalizedCheckpointCleanup .DELETE_ON_CANCELLATION );
@@ -139,6 +149,7 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
139149 if (characteristicStr .equalsIgnoreCase (tmp .toString ())){
140150 env .setStreamTimeCharacteristic (tmp );
141151 flag = true ;
152+ break ;
142153 }
143154 }
144155
@@ -148,7 +159,6 @@ public static void setStreamTimeCharacteristic(StreamExecutionEnvironment env, P
148159 }
149160
150161
151-
152162 /**
153163 * TABLE|SCALA|AGGREGATE
154164 * 注册UDF到table env
@@ -269,21 +279,9 @@ public static long getBufferTimeoutMillis(Properties properties){
269279 }
270280
271281 public static URLClassLoader loadExtraJar (List <URL > jarURLList , URLClassLoader classLoader ) throws NoSuchMethodException , IllegalAccessException , InvocationTargetException {
272-
273- int size = 0 ;
274- for (URL url : jarURLList ){
275- if (url .toString ().endsWith (".jar" )){
276- size ++;
277- }
278- }
279-
280- URL [] urlArray = new URL [size ];
281- int i =0 ;
282282 for (URL url : jarURLList ){
283283 if (url .toString ().endsWith (".jar" )){
284- urlArray [i ] = url ;
285284 urlClassLoaderAddUrl (classLoader , url );
286- i ++;
287285 }
288286 }
289287
@@ -311,4 +309,4 @@ public static TypeInformation[] transformTypes(Class[] fieldTypes){
311309 return types ;
312310 }
313311
314- }
312+ }
0 commit comments