From 58374b23cfeac09937018a3337e89e3fe68d2812 Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Fri, 20 Mar 2026 11:09:05 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[Fix]=20=E4=BF=AE=E5=A4=8DJAR=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BB=8ESavepoint=E6=81=A2=E5=A4=8D=E6=97=B6allowNonR?= =?UTF-8?q?estoredState=E6=9C=AA=E7=94=9F=E6=95=88=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/dinky/job/runner/JobJarRunner.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index 1026962b7c..a5b6428999 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -53,7 +53,6 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.File; @@ -62,6 +61,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import cn.hutool.core.lang.Assert; @@ -170,16 +170,25 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) { private Pipeline getPipeline(JobStatement jobStatement) { Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader()); if (pipeline instanceof StreamGraph) { - if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath()) - || (Asserts.isNotNull(jobManager.getConfig().getConfigJson()) - && Asserts.isNotNullString(jobManager - .getConfig() - .getConfigJson() - .get(SavepointConfigOptions.SAVEPOINT_PATH)))) { - ((StreamGraph) pipeline) - .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( - jobManager.getConfig().getSavePointPath(), - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + String savePointPath = jobManager.getConfig().getSavePointPath(); + Map configJson = jobManager.getConfig().getConfigJson(); + if (Asserts.isNotNullString(savePointPath) + || (Asserts.isNotNull(configJson) + && Asserts.isNotNullString(configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key())))) { + String effectivePath = Asserts.isNotNullString(savePointPath) + ? savePointPath + : configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key()); + JarSubmitParam submitParam = JarSubmitParam.getInfo(jobStatement.getStatement()); + boolean allowNonRestored = Boolean.TRUE.equals(submitParam.getAllowNonRestoredState()); + log.info( + "Setting savepoint restore settings, path: {}, allowNonRestoredState: {}", + effectivePath, + allowNonRestored); + // 设置到 getRootConfiguration(),executeAsync 会以它为基础创建拷贝 + Configuration rootConfiguration = + executor.getCustomTableEnvironment().getRootConfiguration(); + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, effectivePath); + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, allowNonRestored); } } return pipeline; From e82055d1e8a21c29cbc39634ec04ebcdfd9c836d Mon Sep 17 00:00:00 2001 From: zhuxt2015 <594754793@qq.com> Date: Fri, 20 Mar 2026 11:17:52 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java index a5b6428999..257f355804 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java @@ -179,7 +179,7 @@ private Pipeline getPipeline(JobStatement jobStatement) { ? savePointPath : configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key()); JarSubmitParam submitParam = JarSubmitParam.getInfo(jobStatement.getStatement()); - boolean allowNonRestored = Boolean.TRUE.equals(submitParam.getAllowNonRestoredState()); + boolean allowNonRestored = submitParam.getAllowNonRestoredState(); log.info( "Setting savepoint restore settings, path: {}, allowNonRestoredState: {}", effectivePath,