diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 71ec3dee5e..023396b52a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.api.status.CommonStatus; import org.apache.flink.kubernetes.operator.api.status.JobStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.api.status.Savepoint; import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; @@ -398,21 +399,35 @@ protected void restoreJob( * * @param ctx context * @param savepointLocation location of savepoint taken + * @param cancelTs Timestamp when upgrade/cancel was triggered */ - protected void setUpgradeSavepointPath(FlinkResourceContext ctx, String savepointLocation) { + protected void setUpgradeSavepointPath( + FlinkResourceContext ctx, String savepointLocation, Instant cancelTs) { var conf = ctx.getObserveConfig(); var savepointFormatType = - ctx.getObserveConfig() - .get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); + SavepointFormatType.valueOf( + conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE) + .name()); FlinkStateSnapshotUtils.createUpgradeSnapshotResource( conf, ctx.getOperatorConfig(), ctx.getKubernetesClient(), ctx.getResource(), - SavepointFormatType.valueOf(savepointFormatType.name()), + savepointFormatType, savepointLocation); - ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation); + var jobStatus = ctx.getResource().getStatus().getJobStatus(); + jobStatus.setUpgradeSavepointPath(savepointLocation); + + // Register created savepoint in the now deprecated savepoint info and history + var savepoint = + new Savepoint( + cancelTs.toEpochMilli(), + savepointLocation, + SnapshotTriggerType.UPGRADE, + savepointFormatType, + null); + jobStatus.getSavepointInfo().updateLastSavepoint(savepoint); } /** diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 71cf417958..b5486e0ec7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -223,10 +223,12 @@ private void setJobIdIfNecessary( @Override protected boolean cancelJob(FlinkResourceContext ctx, SuspendMode suspendMode) throws Exception { + var cancelTs = Instant.now(); var result = ctx.getFlinkService() .cancelJob(ctx.getResource(), suspendMode, ctx.getObserveConfig()); - result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location)); + result.getSavepointPath() + .ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs)); return result.isPending(); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index f1fa88859a..4932940a4f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.Optional; /** The reconciler for the {@link FlinkSessionJob}. */ @@ -100,10 +101,12 @@ public void deploy( @Override protected boolean cancelJob(FlinkResourceContext ctx, SuspendMode suspendMode) throws Exception { + var cancelTs = Instant.now(); var result = ctx.getFlinkService() .cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig()); - result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location)); + result.getSavepointPath() + .ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs)); return result.isPending(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 47ce5e4eb2..19d5446113 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -98,6 +98,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -303,6 +304,13 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception { assertEquals(0, flinkService.getRunningCount()); + var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo(); + assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation()); + assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType()); + assertEquals( + spInfo.getLastSavepoint(), + new LinkedList<>(spInfo.getSavepointHistory()).getLast()); + reconciler.reconcile(statefulUpgrade, context); runningJobs = flinkService.listJobs();