Skip to content

Commit d03b816

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-36037][snapshot] Add backwards compatibility for job upgrades
1 parent d10918c commit d03b816

File tree

4 files changed

+117
-10
lines changed

4 files changed

+117
-10
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,16 @@ protected void restoreJob(
293293
FlinkStateSnapshotUtils
294294
.getValidatedFlinkStateSnapshotPath(
295295
ctx.getKubernetesClient(), ref));
296+
if (savepointOpt.isEmpty()) {
297+
savepointOpt =
298+
Optional.ofNullable(
299+
ctx.getResource()
300+
.getStatus()
301+
.getJobStatus()
302+
.getSavepointInfo()
303+
.getLastSavepoint())
304+
.flatMap(s -> Optional.ofNullable(s.getLocation()));
305+
}
296306
}
297307

298308
deploy(ctx, spec, deployConfig, savepointOpt, requireHaMetadata);
@@ -445,13 +455,17 @@ protected void resubmitJob(FlinkResourceContext<CR> ctx, boolean requireHaMetada
445455
throws Exception {
446456
LOG.info("Resubmitting Flink job...");
447457
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());
448-
var lastSavepoint =
449-
Optional.ofNullable(
450-
ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference());
458+
459+
var upgradeSnapshotRef =
460+
ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference();
461+
var savepointLegacy =
462+
ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint();
463+
var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy != null;
464+
451465
if (requireHaMetadata) {
452466
specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
453467
} else if (ctx.getResource().getSpec().getJob().getUpgradeMode() != UpgradeMode.STATELESS
454-
&& lastSavepoint.isPresent()) {
468+
&& lastSavepointKnown) {
455469
specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
456470
}
457471
restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -370,15 +370,23 @@ public static void resetSnapshotTriggers(
370370
public static boolean lastSavepointKnown(CommonStatus<?> status) {
371371
var lastSavepoint = status.getJobStatus().getUpgradeSnapshotReference();
372372

373-
if (lastSavepoint == null) {
374-
return true;
373+
if (lastSavepoint != null) {
374+
if (StringUtils.isNotBlank(lastSavepoint.getName())) {
375+
return true;
376+
}
377+
378+
var location = lastSavepoint.getPath();
379+
return location != null
380+
&& !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
375381
}
376382

377-
if (StringUtils.isNotBlank(lastSavepoint.getName())) {
383+
// Check legacy savepoint field too
384+
var lastSavepointLegacy = status.getJobStatus().getSavepointInfo().getLastSavepoint();
385+
if (lastSavepointLegacy == null) {
378386
return true;
379387
}
380-
381-
var location = lastSavepoint.getPath();
382-
return location != null && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
388+
return !lastSavepointLegacy
389+
.getLocation()
390+
.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
383391
}
384392
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
3838
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
3939
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
40+
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
4041
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
4142
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
4243
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -283,6 +284,56 @@ private void testUpgradeToLastState(
283284
assertEquals("finished_sp", runningJobs.get(0).f0);
284285
}
285286

287+
@ParameterizedTest
288+
@ValueSource(booleans = {true, false})
289+
public void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Exception {
290+
var savepointPath = "finished_sp";
291+
var deployment = buildApplicationCluster(FlinkVersion.v1_19, UpgradeMode.SAVEPOINT);
292+
293+
reconciler.reconcile(deployment, context);
294+
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
295+
deployment.getSpec().setRestartNonce(100L);
296+
flinkService.clear();
297+
298+
if (useLegacyFields) {
299+
deployment
300+
.getStatus()
301+
.getJobStatus()
302+
.getSavepointInfo()
303+
.updateLastSavepoint(
304+
new Savepoint(
305+
0L,
306+
savepointPath,
307+
SnapshotTriggerType.UPGRADE,
308+
SavepointFormatType.CANONICAL,
309+
0L));
310+
} else {
311+
deployment
312+
.getStatus()
313+
.getJobStatus()
314+
.setUpgradeSnapshotReference(
315+
FlinkStateSnapshotReference.fromPath(savepointPath));
316+
deployment
317+
.getStatus()
318+
.getJobStatus()
319+
.getSavepointInfo()
320+
.updateLastSavepoint(
321+
new Savepoint(
322+
0L,
323+
"wrong_sp",
324+
SnapshotTriggerType.UPGRADE,
325+
SavepointFormatType.CANONICAL,
326+
0L));
327+
}
328+
329+
deployment.getStatus().getJobStatus().setState("FINISHED");
330+
reconciler.reconcile(deployment, context);
331+
reconciler.reconcile(deployment, context);
332+
333+
assertEquals(1, flinkService.getRunningCount());
334+
assertEquals(savepointPath, flinkService.listJobs().get(0).f0);
335+
}
336+
286337
private FlinkDeployment cloneDeploymentWithUpgradeMode(
287338
FlinkDeployment deployment, UpgradeMode upgradeMode) {
288339
FlinkDeployment result = ReconciliationUtils.clone(deployment);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.kubernetes.operator.TestUtils;
2424
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
25+
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
2526
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
27+
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2628
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
29+
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
2730
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
2831
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2932
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
33+
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
3034

3135
import org.apache.logging.log4j.core.util.CronExpression;
3236
import org.junit.jupiter.api.Test;
@@ -277,6 +281,36 @@ public void shouldTriggerAutomaticSnapshot_ValidCronExpression() {
277281
assertTrue(shouldTrigger);
278282
}
279283

284+
@Test
285+
public void testLastSavepointKnown() {
286+
var status = new FlinkDeploymentStatus();
287+
288+
assertTrue(SnapshotUtils.lastSavepointKnown(status));
289+
290+
var sp = new Savepoint();
291+
sp.setLocation("sp1");
292+
status.getJobStatus().getSavepointInfo().setLastSavepoint(sp);
293+
assertTrue(SnapshotUtils.lastSavepointKnown(status));
294+
295+
sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH);
296+
assertFalse(SnapshotUtils.lastSavepointKnown(status));
297+
298+
status.getJobStatus()
299+
.setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1"));
300+
assertTrue(SnapshotUtils.lastSavepointKnown(status));
301+
302+
status.getJobStatus()
303+
.setUpgradeSnapshotReference(
304+
FlinkStateSnapshotReference.fromPath(
305+
AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH));
306+
assertFalse(SnapshotUtils.lastSavepointKnown(status));
307+
308+
status.getJobStatus()
309+
.setUpgradeSnapshotReference(
310+
new FlinkStateSnapshotReference("namespace", "name", null));
311+
assertTrue(SnapshotUtils.lastSavepointKnown(status));
312+
}
313+
280314
private static void resetTrigger(FlinkDeployment deployment, SnapshotType snapshotType) {
281315
switch (snapshotType) {
282316
case SAVEPOINT:

0 commit comments

Comments
 (0)