|
53 | 53 | import org.apache.flink.core.execution.JobClient; |
54 | 54 | import org.apache.flink.runtime.jobgraph.JobGraph; |
55 | 55 | import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; |
56 | | -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; |
57 | 56 | import org.apache.flink.streaming.api.graph.StreamGraph; |
58 | 57 |
|
59 | 58 | import java.io.File; |
|
62 | 61 | import java.util.ArrayList; |
63 | 62 | import java.util.Collections; |
64 | 63 | import java.util.List; |
| 64 | +import java.util.Map; |
65 | 65 | import java.util.Set; |
66 | 66 |
|
67 | 67 | import cn.hutool.core.lang.Assert; |
@@ -170,16 +170,25 @@ private GatewayResult submitNormalWithGateway(JobStatement jobStatement) { |
170 | 170 | private Pipeline getPipeline(JobStatement jobStatement) { |
171 | 171 | Pipeline pipeline = getJarStreamGraph(jobStatement.getStatement(), jobManager.getDinkyClassLoader()); |
172 | 172 | if (pipeline instanceof StreamGraph) { |
173 | | - if (Asserts.isNotNullString(jobManager.getConfig().getSavePointPath()) |
174 | | - || (Asserts.isNotNull(jobManager.getConfig().getConfigJson()) |
175 | | - && Asserts.isNotNullString(jobManager |
176 | | - .getConfig() |
177 | | - .getConfigJson() |
178 | | - .get(SavepointConfigOptions.SAVEPOINT_PATH)))) { |
179 | | - ((StreamGraph) pipeline) |
180 | | - .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( |
181 | | - jobManager.getConfig().getSavePointPath(), |
182 | | - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); |
| 173 | + String savePointPath = jobManager.getConfig().getSavePointPath(); |
| 174 | + Map<String, String> configJson = jobManager.getConfig().getConfigJson(); |
| 175 | + if (Asserts.isNotNullString(savePointPath) |
| 176 | + || (Asserts.isNotNull(configJson) |
| 177 | + && Asserts.isNotNullString(configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key())))) { |
| 178 | + String effectivePath = Asserts.isNotNullString(savePointPath) |
| 179 | + ? savePointPath |
| 180 | + : configJson.get(SavepointConfigOptions.SAVEPOINT_PATH.key()); |
| 181 | + JarSubmitParam submitParam = JarSubmitParam.getInfo(jobStatement.getStatement()); |
| 182 | + boolean allowNonRestored = Boolean.TRUE.equals(submitParam.getAllowNonRestoredState()); |
| 183 | + log.info( |
| 184 | + "Setting savepoint restore settings, path: {}, allowNonRestoredState: {}", |
| 185 | + effectivePath, |
| 186 | + allowNonRestored); |
| 187 | + // 设置到 getRootConfiguration(),executeAsync 会以它为基础创建拷贝 |
| 188 | + Configuration rootConfiguration = |
| 189 | + executor.getCustomTableEnvironment().getRootConfiguration(); |
| 190 | + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_PATH, effectivePath); |
| 191 | + rootConfiguration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, allowNonRestored); |
183 | 192 | } |
184 | 193 | } |
185 | 194 | return pipeline; |
|
0 commit comments