diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 1026962b7c..257f355804 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -53,7 +53,6 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.File; @@ -62,6 +61,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import cn.hutool.core.lang.Assert; @@ -170,16 +170,25 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) { private Pipeline getPipeline(JobStatement jobStatement) { Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader()); if (pipeline instanceof StreamGraph) { - if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath()) - || (Asserts.isNotNull(jobManager.getConfig().getConfigJson()) - && Asserts.isNotNullString(jobManager - .getConfig() - .getConfigJson() - .get(SavepointConfigOptions.SAVEPOINT_PATH)))) { - ((StreamGraph) pipeline) - .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( - jobManager.getConfig().getSavePointPath(), - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + String savePointPath = jobManager.getConfig().getSavePointPath(); + Map configJson = jobManager.getConfig().getConfigJson(); + if (Asserts.isNotNullString(savePointPath) + || (Asserts.isNotNull(configJson) + && Asserts.isNotNullString(configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key())))) { + String effectivePath = Asserts.isNotNullString(savePointPath) + ? savePointPath + : configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key()); + JarSubmitParam submitParam = JarSubmitParam.getInfo(jobStatement.getStatement()); + boolean allowNonRestored = submitParam.getAllowNonRestoredState(); + log.info( + "Setting savepoint restore settings, path: {}, allowNonRestoredState: {}", + effectivePath, + allowNonRestored); + // 设置到 getRootConfiguration(),executeAsync 会以它为基础创建拷贝 + Configuration rootConfiguration = + executor.getCustomTableEnvironment().getRootConfiguration(); + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, effectivePath); + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, allowNonRestored); } } return pipeline;