Skip to content

Commit dcce6c8

Browse files
committed
Using the initial savepoint path, on first deployments only, if specified
1 parent f9245aa commit dcce6c8

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,9 +513,23 @@ private void deploy(
513513
"spec",
514514
FlinkBlueGreenDeploymentSpec.class);
515515

516-
if (lastCheckpoint != null) {
516+
// The B/G initialSavepointPath is only used in first time deployments
517+
if (isFirstDeployment) {
518+
String initialSavepointPath =
519+
adjustedSpec.getTemplate().getSpec().getJob().getInitialSavepointPath();
520+
if (initialSavepointPath != null && !initialSavepointPath.isEmpty()) {
521+
LOG.info("Using initialSavepointPath: " + initialSavepointPath);
522+
adjustedSpec
523+
.getTemplate()
524+
.getSpec()
525+
.getJob()
526+
.setInitialSavepointPath(initialSavepointPath);
527+
} else {
528+
LOG.info("Clean start up, no checkpoint/savepoint");
529+
}
530+
} else if (lastCheckpoint != null) {
517531
String location = lastCheckpoint.getLocation().replace("file:", "");
518-
LOG.info("Using checkpoint: " + location);
532+
LOG.info("Using B/G checkpoint: " + location);
519533
adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
520534
}
521535

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import static org.junit.jupiter.api.Assertions.assertEquals;
6666
import static org.junit.jupiter.api.Assertions.assertFalse;
6767
import static org.junit.jupiter.api.Assertions.assertNotNull;
68-
import static org.junit.jupiter.api.Assertions.assertNull;
6968
import static org.junit.jupiter.api.Assertions.assertTrue;
7069

7170
/** {@link FlinkBlueGreenDeploymentController} tests. */
@@ -80,6 +79,8 @@ public class FlinkBlueGreenDeploymentControllerTest {
8079
private static final String CUSTOM_CONFIG_FIELD = "custom-configuration-field";
8180
private static final int DEFAULT_DELETION_DELAY_VALUE = 500;
8281
private static final int ALT_DELETION_DELAY_VALUE = 1000;
82+
private static final String TEST_CHECKPOINT_PATH = "/tmp/checkpoints";
83+
private static final String TEST_INITIAL_SAVEPOINT_PATH = "/tmp/savepoints";
8384
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
8485
private TestingFlinkService flinkService;
8586
private Context<FlinkBlueGreenDeployment> context;
@@ -336,6 +337,9 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
336337
if (execAssertions) {
337338
assertEquals(1, flinkDeployments.size());
338339
verifyOwnerReferences(rs.deployment, deploymentA);
340+
assertEquals(
341+
TEST_INITIAL_SAVEPOINT_PATH,
342+
deploymentA.getSpec().getJob().getInitialSavepointPath());
339343
}
340344

341345
simulateSubmitAndSuccessfulJobStart(deploymentA);
@@ -420,7 +424,7 @@ private void assertDeploymentDeleted(
420424
// A reconciliation before the deletion delay has expired should result in no-op
421425
var rs2 = reconcile(rs.deployment);
422426
var remainingDeletionDelay = rs2.updateControl.getScheduleDelay().get();
423-
assertTrue(remainingDeletionDelay < expectedDeletionDelay);
427+
assertTrue(remainingDeletionDelay <= expectedDeletionDelay);
424428
assertTrue(rs2.updateControl.isNoUpdate());
425429

426430
Thread.sleep(remainingDeletionDelay);
@@ -443,8 +447,12 @@ private void testTransitionToGreen(
443447
assertTrue(rs.updateControl.isPatchStatus());
444448
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
445449
assertEquals(2, flinkDeployments.size());
446-
assertNull(flinkDeployments.get(0).getSpec().getJob().getInitialSavepointPath());
447-
assertNotNull(flinkDeployments.get(1).getSpec().getJob().getInitialSavepointPath());
450+
assertEquals(
451+
TEST_INITIAL_SAVEPOINT_PATH,
452+
flinkDeployments.get(0).getSpec().getJob().getInitialSavepointPath());
453+
assertEquals(
454+
TEST_CHECKPOINT_PATH,
455+
flinkDeployments.get(1).getSpec().getJob().getInitialSavepointPath());
448456

449457
assertEquals(
450458
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
@@ -527,7 +535,7 @@ private void simulateSubmitAndSuccessfulJobStart(FlinkDeployment deployment) thr
527535
// TODO: is this correct? Doing this to give the TestingFlinkService awareness of the job
528536
JobSpec jobSpec = deployment.getSpec().getJob();
529537
Configuration conf = new Configuration();
530-
conf.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint");
538+
conf.set(SavepointConfigOptions.SAVEPOINT_PATH, TEST_CHECKPOINT_PATH);
531539
flinkService.submitApplicationCluster(jobSpec, conf, false);
532540
var jobId = flinkService.listJobs().get(0).f1.getJobId().toString();
533541
deployment.getStatus().getJobStatus().setJobId(jobId);
@@ -591,6 +599,7 @@ private static FlinkBlueGreenDeployment buildSessionCluster(
591599
.parallelism(1)
592600
.upgradeMode(UpgradeMode.STATELESS)
593601
.state(JobState.RUNNING)
602+
.initialSavepointPath(TEST_INITIAL_SAVEPOINT_PATH)
594603
.build());
595604

596605
deployment.setSpec(bgDeploymentSpec);
@@ -627,6 +636,8 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
627636
.spec(flinkDeploymentSpec)
628637
.build();
629638

630-
return new FlinkBlueGreenDeploymentSpec(flinkDeploymentTemplateSpec);
639+
FlinkBlueGreenDeploymentSpec flinkBlueGreenDeploymentSpec =
640+
new FlinkBlueGreenDeploymentSpec(flinkDeploymentTemplateSpec);
641+
return flinkBlueGreenDeploymentSpec;
631642
}
632643
}

0 commit comments

Comments
 (0)