Skip to content

Commit 5296d63

Browse files
authored
[FLINK-35857] Fix redeploy failed deployment without latest checkpoint
1 parent 207b149 commit 5296d63

File tree

3 files changed

+102
-5
lines changed

3 files changed

+102
-5
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,18 @@ protected void resubmitJob(FlinkResourceContext<CR> ctx, boolean requireHaMetada
322322
throws Exception {
323323
LOG.info("Resubmitting Flink job...");
324324
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource());
325+
Optional<Savepoint> lastSavepoint =
326+
Optional.ofNullable(
327+
ctx.getResource()
328+
.getStatus()
329+
.getJobStatus()
330+
.getSavepointInfo()
331+
.getLastSavepoint());
325332
if (requireHaMetadata) {
326333
specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
334+
} else if (ctx.getResource().getSpec().getJob().getUpgradeMode() != UpgradeMode.STATELESS
335+
&& lastSavepoint.isPresent()) {
336+
specToRecover.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
327337
}
328338
restoreJob(ctx, specToRecover, ctx.getObserveConfig(), requireHaMetadata);
329339
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,16 @@ public CheckpointFetchResult fetchCheckpointInfo(
348348

349349
if (checkpointTriggers.containsKey(triggerId)) {
350350
if (checkpointTriggers.get(triggerId)) {
351+
// Mark completed checkpoint
352+
checkpointInfo =
353+
Tuple2.of(
354+
Optional.of(
355+
new CheckpointHistoryWrapper.CompletedCheckpointInfo(
356+
checkpointCounter,
357+
"ck_" + checkpointCounter,
358+
System.currentTimeMillis())),
359+
Optional.empty());
360+
351361
checkpointCounter++;
352362
return CheckpointFetchResult.completed();
353363
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,46 +18,59 @@
1818
package org.apache.flink.kubernetes.operator.controller;
1919

2020
import org.apache.flink.configuration.Configuration;
21+
import org.apache.flink.kubernetes.operator.OperatorTestBase;
2122
import org.apache.flink.kubernetes.operator.TestUtils;
22-
import org.apache.flink.kubernetes.operator.TestingFlinkService;
2323
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2424
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
2525
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
26+
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
27+
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2628
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
29+
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
2730
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
31+
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
32+
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
33+
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
34+
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
2835

2936
import io.fabric8.kubernetes.client.KubernetesClient;
3037
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
3138
import io.javaoperatorsdk.operator.api.reconciler.Context;
39+
import lombok.Getter;
3240
import org.junit.jupiter.api.BeforeEach;
3341
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.EnumSource;
3443
import org.junit.jupiter.params.provider.MethodSource;
3544

3645
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
46+
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
3747
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.api.Assertions.assertFalse;
49+
import static org.junit.jupiter.api.Assertions.assertNotNull;
50+
import static org.junit.jupiter.api.Assertions.assertNull;
3851

3952
/**
4053
* @link Unhealthy deployment restart tests
4154
*/
4255
@EnableKubernetesMockClient(crud = true)
43-
public class FailedDeploymentRestartTest {
56+
public class FailedDeploymentRestartTest extends OperatorTestBase {
4457
private FlinkConfigManager configManager;
4558

46-
private TestingFlinkService flinkService;
4759
private Context<FlinkDeployment> context;
4860
private TestingFlinkDeploymentController testController;
61+
private SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus> observer;
4962

50-
private KubernetesClient kubernetesClient;
63+
@Getter private KubernetesClient kubernetesClient;
5164

5265
@BeforeEach
5366
public void setup() {
5467
var configuration = new Configuration();
5568
configuration.set(OPERATOR_JOB_RESTART_FAILED, true);
5669
configManager = new FlinkConfigManager(configuration);
57-
flinkService = new TestingFlinkService(kubernetesClient);
5870
context = flinkService.getContext();
5971
testController = new TestingFlinkDeploymentController(configManager, flinkService);
6072
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
73+
observer = new SnapshotObserver<>(eventRecorder);
6174
}
6275

6376
@ParameterizedTest
@@ -98,4 +111,68 @@ public void verifyFailedApplicationRecovery(FlinkVersion flinkVersion, UpgradeMo
98111
appCluster.getSpec(),
99112
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
100113
}
114+
115+
@ParameterizedTest
116+
@EnumSource(UpgradeMode.class)
117+
public void verifyFailedApplicationRecoveryWithCheckpoint(UpgradeMode upgradeMode)
118+
throws Exception {
119+
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
120+
appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
121+
122+
// Start a healthy deployment
123+
testController.reconcile(appCluster, context);
124+
testController.reconcile(appCluster, context);
125+
testController.reconcile(appCluster, context);
126+
127+
// Mark job_id
128+
String jobId = appCluster.getStatus().getJobStatus().getJobId();
129+
assertNotNull(jobId);
130+
assertEquals(
131+
JobManagerDeploymentStatus.READY,
132+
appCluster.getStatus().getJobManagerDeploymentStatus());
133+
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
134+
assertNull(flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH));
135+
136+
// trigger checkpoint
137+
CheckpointInfo checkpointInfo = appCluster.getStatus().getJobStatus().getCheckpointInfo();
138+
flinkService.triggerCheckpoint(
139+
null,
140+
SnapshotTriggerType.PERIODIC,
141+
checkpointInfo,
142+
configManager.getObserveConfig(appCluster));
143+
144+
// Pending
145+
observer.observeCheckpointStatus(getResourceContext(appCluster));
146+
// Completed
147+
observer.observeCheckpointStatus(getResourceContext(appCluster));
148+
assertFalse(SnapshotUtils.checkpointInProgress(appCluster.getStatus().getJobStatus()));
149+
assertEquals(
150+
SnapshotUtils.getLastSnapshotStatus(appCluster, CHECKPOINT),
151+
SnapshotStatus.SUCCEEDED);
152+
153+
// Make deployment unhealthy
154+
flinkService.markApplicationJobFailedWithError(
155+
flinkService.listJobs().get(0).f1.getJobId(), "Failed job");
156+
testController.reconcile(appCluster, context);
157+
assertEquals(
158+
JobManagerDeploymentStatus.DEPLOYING,
159+
appCluster.getStatus().getJobManagerDeploymentStatus());
160+
161+
// After restart the deployment is healthy again
162+
testController.reconcile(appCluster, context);
163+
testController.reconcile(appCluster, context);
164+
assertEquals(
165+
JobManagerDeploymentStatus.READY,
166+
appCluster.getStatus().getJobManagerDeploymentStatus());
167+
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
168+
169+
// check savepoint_path
170+
if (upgradeMode != UpgradeMode.STATELESS) {
171+
assertEquals(
172+
flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH),
173+
"ck_0");
174+
} else {
175+
assertNull(flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH));
176+
}
177+
}
101178
}

0 commit comments

Comments
 (0)