@@ -68,12 +68,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
6868 }
6969
7070 //设置了时间间隔才表明开启了checkpoint
71- if (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY ) == null ){
71+ if (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_INTERVAL_KEY ) == null && properties . getProperty ( ConfigConstrant . FLINK_CHECKPOINT_INTERVAL_KEY ) == null ){
7272 return ;
7373 }else {
74- Long interval = Long .valueOf (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY ));
74+ Long sql_interval = Long .valueOf (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_INTERVAL_KEY ,"0" ));
75+ Long flink_interval = Long .valueOf (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_INTERVAL_KEY , "0" ));
76+ long checkpointInterval = Math .max (sql_interval , flink_interval );
7577 //start checkpoint every ${interval}
76- env .enableCheckpointing (interval );
78+ env .enableCheckpointing (checkpointInterval );
7779 }
7880
7981 String checkMode = properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_MODE_KEY );
@@ -101,7 +103,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
101103 env .getCheckpointConfig ().setMaxConcurrentCheckpoints (maxConcurrCheckpoints );
102104 }
103105
104- String cleanupModeStr = properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_CLEANUPMODE_KEY );
106+ Boolean sqlCleanMode = MathUtil .getBoolean (properties .getProperty (ConfigConstrant .SQL_CHECKPOINT_CLEANUPMODE_KEY ), false );
107+ Boolean flinkCleanMode = MathUtil .getBoolean (properties .getProperty (ConfigConstrant .FLINK_CHECKPOINT_CLEANUPMODE_KEY ), false );
108+
109+ String cleanupModeStr = "false" ;
110+ if (sqlCleanMode || flinkCleanMode ){
111+ cleanupModeStr = "true" ;
112+ }
113+
105114 if ("true" .equalsIgnoreCase (cleanupModeStr )){
106115 env .getCheckpointConfig ().enableExternalizedCheckpoints (
107116 CheckpointConfig .ExternalizedCheckpointCleanup .DELETE_ON_CANCELLATION );
0 commit comments