Skip to content

Commit 207b149

Browse files
committed
[FLINK-35292] Set dummy savepoint path during last-state upgrade
1 parent 15f648c commit 207b149

File tree

10 files changed

+214
-5
lines changed

10 files changed

+214
-5
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
3434
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3535
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
36+
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
3637
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3738
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
3839
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
@@ -64,6 +65,8 @@ public abstract class AbstractJobReconciler<
6465

6566
private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class);
6667

68+
public static final String LAST_STATE_DUMMY_SP_PATH = "KUBERNETES_OPERATOR_LAST_STATE";
69+
6770
public AbstractJobReconciler(
6871
EventRecorder eventRecorder,
6972
StatusRecorder<CR, STATUS> statusRecorder,
@@ -179,6 +182,12 @@ protected AvailableUpgradeMode getAvailableUpgradeMode(
179182
var flinkService = ctx.getFlinkService();
180183
if (ReconciliationUtils.isJobInTerminalState(status)
181184
&& !flinkService.isHaMetadataAvailable(ctx.getObserveConfig())) {
185+
186+
if (!SnapshotUtils.lastSavepointKnown(status)) {
187+
throw new RecoveryFailureException(
188+
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.",
189+
"UpgradeFailed");
190+
}
182191
LOG.info(
183192
"Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
184193
return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
@@ -265,7 +274,7 @@ protected void restoreJob(
265274
throws Exception {
266275
Optional<String> savepointOpt = Optional.empty();
267276

268-
if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
277+
if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) {
269278
savepointOpt =
270279
Optional.ofNullable(
271280
ctx.getResource()

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
3030
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
3131
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
32+
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
33+
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
3234
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
3335
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3436
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -159,8 +161,18 @@ public void deploy(
159161
relatedResource.getStatus().getClusterInfo());
160162

161163
if (savepoint.isPresent()) {
164+
// Savepoint deployment
162165
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
166+
} else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
167+
// Last state deployment, explicitly set a dummy savepoint path to avoid accidental
168+
// incorrect state restore in case the HA metadata is deleted by the user
169+
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH);
170+
status.getJobStatus()
171+
.getSavepointInfo()
172+
.setLastSavepoint(
173+
Savepoint.of(LAST_STATE_DUMMY_SP_PATH, SnapshotTriggerType.UNKNOWN));
163174
} else {
175+
// Stateless deployment, remove any user configured savepoint path
164176
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
165177
}
166178

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ public boolean isHaMetadataAvailable(Configuration conf) {
229229
return false;
230230
}
231231

232+
@Override
233+
public boolean atLeastOneCheckpoint(Configuration conf) {
234+
if (FlinkUtils.isKubernetesHAActivated(conf)) {
235+
return FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(conf, kubernetesClient);
236+
} else {
237+
return isHaMetadataAvailable(conf);
238+
}
239+
}
240+
232241
@Override
233242
public JobID submitJobToSessionCluster(
234243
ObjectMeta meta,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requi
5757

5858
boolean isHaMetadataAvailable(Configuration conf);
5959

60+
boolean atLeastOneCheckpoint(Configuration conf);
61+
6062
void submitSessionCluster(Configuration conf) throws Exception;
6163

6264
JobID submitJobToSessionCluster(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.LinkedHashMap;
5959
import java.util.Map;
6060
import java.util.Optional;
61+
import java.util.function.Predicate;
6162

6263
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
6364

@@ -287,6 +288,20 @@ public static boolean isZookeeperHaMetadataAvailable(Configuration conf) {
287288

288289
public static boolean isKubernetesHaMetadataAvailable(
289290
Configuration conf, KubernetesClient kubernetesClient) {
291+
return isKubernetesHaMetadataAvailable(
292+
conf, kubernetesClient, FlinkUtils::isValidHaConfigMap);
293+
}
294+
295+
public static boolean isKubernetesHaMetadataAvailableWithCheckpoint(
296+
Configuration conf, KubernetesClient kubernetesClient) {
297+
return isKubernetesHaMetadataAvailable(
298+
conf, kubernetesClient, cm -> isValidHaConfigMap(cm) && checkpointExists(cm));
299+
}
300+
301+
private static boolean isKubernetesHaMetadataAvailable(
302+
Configuration conf,
303+
KubernetesClient kubernetesClient,
304+
Predicate<ConfigMap> cmPredicate) {
290305

291306
String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
292307
String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
@@ -303,7 +318,7 @@ public static boolean isKubernetesHaMetadataAvailable(
303318
.list()
304319
.getItems();
305320

306-
return configMaps.stream().anyMatch(FlinkUtils::isValidHaConfigMap);
321+
return configMaps.stream().anyMatch(cmPredicate);
307322
}
308323

309324
private static boolean isValidHaConfigMap(ConfigMap cm) {
@@ -319,6 +334,13 @@ private static boolean isValidHaConfigMap(ConfigMap cm) {
319334
return name.endsWith("-jobmanager-leader");
320335
}
321336

337+
private static boolean checkpointExists(ConfigMap cm) {
338+
var data = cm.getData();
339+
return data != null
340+
&& data.keySet().stream()
341+
.anyMatch(s -> s.startsWith(Constants.CHECKPOINT_ID_KEY_PREFIX));
342+
}
343+
322344
private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
323345
return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
324346
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.apache.flink.configuration.ConfigurationUtils;
2323
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2424
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
25+
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
2526
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
2627
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
2728
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
2829
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
2930
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3031
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
32+
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
3133
import org.apache.flink.kubernetes.operator.service.FlinkService;
3234

3335
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -402,4 +404,23 @@ public static void resetSnapshotTriggers(
402404
}
403405
}
404406
}
407+
408+
/**
409+
* Check if the last snapshot information is known. True if the snapshot location is known
410+
* explicitly (not implicitly through a last-state upgrade) or if the savepoint is known to be
411+
* empty.
412+
*
413+
* @param status Flink resource status
414+
* @return True if last savepoint is known
415+
*/
416+
public static boolean lastSavepointKnown(CommonStatus<?> status) {
417+
var lastSavepoint = status.getJobStatus().getSavepointInfo().getLastSavepoint();
418+
419+
if (lastSavepoint == null) {
420+
return true;
421+
}
422+
String location = lastSavepoint.getLocation();
423+
424+
return !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
425+
}
405426
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public class TestingFlinkService extends AbstractFlinkService {
125125
@Setter private boolean isFlinkJobTerminatedWithoutCancellation = false;
126126
@Setter private boolean isPortReady = true;
127127
@Setter private boolean haDataAvailable = true;
128+
@Setter private boolean checkpointAvailable = true;
128129
@Setter private boolean jobManagerReady = true;
129130
@Setter private boolean deployFailure = false;
130131
@Setter private Runnable sessionJobSubmittedCallback;
@@ -236,6 +237,11 @@ protected void validateHaMetadataExists(Configuration conf) {
236237
}
237238
}
238239

240+
@Override
241+
public boolean atLeastOneCheckpoint(Configuration conf) {
242+
return isHaMetadataAvailable(conf) && checkpointAvailable;
243+
}
244+
239245
@Override
240246
public boolean isHaMetadataAvailable(Configuration conf) {
241247
return HighAvailabilityMode.isHighAvailabilityModeActivated(conf) && haDataAvailable;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,15 @@ public void triggerRestart() throws Exception {
635635
private void verifyAndSetRunningJobsToStatus(
636636
FlinkDeployment deployment,
637637
List<Tuple3<String, JobStatusMessage, Configuration>> runningJobs) {
638+
verifyAndSetRunningJobsToStatus(deployment, runningJobs, null);
639+
}
640+
641+
private void verifyAndSetRunningJobsToStatus(
642+
FlinkDeployment deployment,
643+
List<Tuple3<String, JobStatusMessage, Configuration>> runningJobs,
644+
String savepoint) {
638645
assertEquals(1, runningJobs.size());
639-
assertNull(runningJobs.get(0).f0);
646+
assertEquals(savepoint, runningJobs.get(0).f0);
640647
deployment
641648
.getStatus()
642649
.setJobStatus(
@@ -1124,14 +1131,26 @@ public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
11241131
deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
11251132

11261133
// Set to running to let savepoint upgrade proceed
1127-
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
1134+
verifyAndSetRunningJobsToStatus(
1135+
deployment,
1136+
flinkService.listJobs(),
1137+
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH);
11281138

11291139
reconciler.reconcile(deployment, context);
11301140
// Make sure upgrade is properly triggered now
11311141
lastReconciledSpec =
11321142
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
11331143
assertEquals(deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
11341144
assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
1145+
assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode());
1146+
assertEquals(
1147+
"savepoint_0",
1148+
deployment
1149+
.getStatus()
1150+
.getJobStatus()
1151+
.getSavepointInfo()
1152+
.getLastSavepoint()
1153+
.getLocation());
11351154
}
11361155

11371156
@Test

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.junit.jupiter.params.ParameterizedTest;
5353
import org.junit.jupiter.params.provider.Arguments;
5454
import org.junit.jupiter.params.provider.MethodSource;
55+
import org.junit.jupiter.params.provider.ValueSource;
5556

5657
import java.time.Duration;
5758
import java.util.ArrayList;
@@ -568,6 +569,48 @@ public void testLastStateOnDeletedDeployment() throws Exception {
568569
assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
569570
}
570571

572+
@ParameterizedTest
573+
@ValueSource(booleans = {true, false})
574+
public void testLastStateDummySpPath(boolean checkpointAvailable) throws Exception {
575+
// Bootstrap running deployment
576+
var deployment = TestUtils.buildApplicationCluster();
577+
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
578+
579+
reconciler.reconcile(deployment, context);
580+
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
581+
582+
flinkService.setHaDataAvailable(true);
583+
flinkService.setCheckpointAvailable(checkpointAvailable);
584+
585+
// Submit upgrade
586+
deployment.getSpec().setRestartNonce(123L);
587+
reconciler.reconcile(deployment, context);
588+
reconciler.reconcile(deployment, context);
589+
590+
var lastReconciledSpec =
591+
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
592+
593+
// Make sure we correctly record upgrade mode to last state
594+
assertEquals(UpgradeMode.LAST_STATE, lastReconciledSpec.getJob().getUpgradeMode());
595+
596+
if (checkpointAvailable) {
597+
assertEquals(
598+
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
599+
deployment
600+
.getStatus()
601+
.getJobStatus()
602+
.getSavepointInfo()
603+
.getLastSavepoint()
604+
.getLocation());
605+
assertEquals(
606+
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
607+
flinkService.listJobs().get(0).f0);
608+
} else {
609+
assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
610+
assertNull(flinkService.listJobs().get(0).f0);
611+
}
612+
}
613+
571614
@Test
572615
public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
573616
final String expectedSavepointPath = "savepoint_0";
@@ -682,7 +725,16 @@ public void testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEna
682725
.getImage());
683726
// Upgrade mode changes from stateless to last-state while HA enabled previously should not
684727
// trigger a savepoint
685-
assertNull(flinkService.listJobs().get(0).f0);
728+
assertEquals(
729+
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH,
730+
deployment
731+
.getStatus()
732+
.getJobStatus()
733+
.getSavepointInfo()
734+
.getLastSavepoint()
735+
.getLocation());
736+
assertEquals(
737+
ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, flinkService.listJobs().get(0).f0);
686738
}
687739

688740
public static FlinkDeployment buildApplicationCluster(

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,63 @@ public void kubernetesHaMetaDataCheckTest() {
201201
kubernetesClient));
202202
}
203203

204+
@Test
205+
public void kubernetesHaMetaDataCheckpointCheckTest() {
206+
var cr = TestUtils.buildApplicationCluster();
207+
var confManager = new FlinkConfigManager(new Configuration());
208+
assertFalse(
209+
FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(
210+
confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
211+
kubernetesClient));
212+
213+
var withCheckpoint = Map.of("checkpointID-2", "p");
214+
var withoutCheckpoint = Map.of("counter", "2");
215+
216+
// Wrong CM name
217+
createHAConfigMapWithData(
218+
cr.getMetadata().getName() + "-wrong-name",
219+
cr.getMetadata().getNamespace(),
220+
cr.getMetadata().getName(),
221+
withCheckpoint);
222+
assertFalse(
223+
FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(
224+
confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
225+
kubernetesClient));
226+
227+
// Missing data
228+
createHAConfigMapWithData(
229+
cr.getMetadata().getName() + "-000000000000-config-map",
230+
cr.getMetadata().getNamespace(),
231+
cr.getMetadata().getName(),
232+
null);
233+
assertFalse(
234+
FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(
235+
confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
236+
kubernetesClient));
237+
238+
// CM data without CP
239+
createHAConfigMapWithData(
240+
cr.getMetadata().getName() + "-000000000000-config-map",
241+
cr.getMetadata().getNamespace(),
242+
cr.getMetadata().getName(),
243+
withoutCheckpoint);
244+
assertFalse(
245+
FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(
246+
confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
247+
kubernetesClient));
248+
249+
// CM data with CP
250+
createHAConfigMapWithData(
251+
cr.getMetadata().getName() + "-000000000000-config-map",
252+
cr.getMetadata().getNamespace(),
253+
cr.getMetadata().getName(),
254+
withCheckpoint);
255+
assertTrue(
256+
FlinkUtils.isKubernetesHaMetadataAvailableWithCheckpoint(
257+
confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
258+
kubernetesClient));
259+
}
260+
204261
@Test
205262
public void testJmNeverStartedDetection() {
206263
var jmDeployment = new Deployment();

0 commit comments

Comments
 (0)