Skip to content

Commit 4fbc971

Browse files
author
yanxi0227
committed
save checkpoint when sql.checkpoint.cleanup.mode is null
1 parent 7dae889 commit 4fbc971

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,14 @@ public static void openCheckpoint(StreamExecutionEnvironment env, Properties pro
101101
}
102102

103103
String cleanupModeStr = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_CLEANUPMODE_KEY);
104-
if(cleanupModeStr != null){//设置在cancel job情况下checkpoint是否被保存
105-
if("true".equalsIgnoreCase(cleanupModeStr)){
106-
env.getCheckpointConfig().enableExternalizedCheckpoints(
107-
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
108-
}else if("false".equalsIgnoreCase(cleanupModeStr)){
109-
env.getCheckpointConfig().enableExternalizedCheckpoints(
110-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
111-
}else{
112-
throw new RuntimeException("not support value of cleanup mode :" + cleanupModeStr);
113-
}
104+
if ("true".equalsIgnoreCase(cleanupModeStr)){
105+
env.getCheckpointConfig().enableExternalizedCheckpoints(
106+
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
107+
} else if("false".equalsIgnoreCase(cleanupModeStr) || cleanupModeStr == null){
108+
env.getCheckpointConfig().enableExternalizedCheckpoints(
109+
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
110+
} else{
111+
throw new RuntimeException("not support value of cleanup mode :" + cleanupModeStr);
114112
}
115113

116114
String backendPath = properties.getProperty(ConfigConstrant.FLINK_CHECKPOINT_DATAURI_KEY);

0 commit comments

Comments
 (0)