Skip to content

Commit a293a38

Browse files
committed
Add extra job upgrade tests
1 parent c228b8c commit a293a38

File tree

1 file changed

+128
-4
lines changed

1 file changed

+128
-4
lines changed

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,16 @@ public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toM
396396
}
397397
}
398398

399+
private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
400+
return Stream.of(
401+
Arguments.of(UpgradeMode.LAST_STATE, true),
402+
Arguments.of(UpgradeMode.LAST_STATE, false),
403+
Arguments.of(UpgradeMode.SAVEPOINT, true),
404+
Arguments.of(UpgradeMode.SAVEPOINT, false),
405+
Arguments.of(UpgradeMode.STATELESS, true),
406+
Arguments.of(UpgradeMode.STATELESS, false));
407+
}
408+
399409
@ParameterizedTest
400410
@MethodSource("testInitialJmDeployCannotStartParams")
401411
public void testInitialJmDeployCannotStartLegacy(UpgradeMode upgradeMode, boolean initSavepoint)
@@ -573,14 +583,128 @@ public void testLastStateMaxCheckpointAge(boolean cancellable) throws Exception
573583
jobReconciler.getJobUpgrade(ctx, deployConf));
574584
}
575585

576-
private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
586+
private static Stream<Arguments> testVersionUpgradeTestParams() {
587+
return Stream.of(
588+
Arguments.of(UpgradeMode.LAST_STATE, true, true),
589+
Arguments.of(UpgradeMode.LAST_STATE, true, false),
590+
Arguments.of(UpgradeMode.LAST_STATE, false, true),
591+
Arguments.of(UpgradeMode.LAST_STATE, false, false),
592+
Arguments.of(UpgradeMode.SAVEPOINT, true, true),
593+
Arguments.of(UpgradeMode.SAVEPOINT, true, false));
594+
}
595+
596+
@ParameterizedTest
597+
@MethodSource("testVersionUpgradeTestParams")
598+
public void testFlinkVersionSwitching(
599+
UpgradeMode upgradeMode, boolean savepointsEnabled, boolean allowFallback)
600+
throws Exception {
601+
var jobReconciler = (ApplicationReconciler) this.reconciler.getReconciler();
602+
var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_18);
603+
if (!savepointsEnabled) {
604+
deployment
605+
.getSpec()
606+
.getFlinkConfiguration()
607+
.remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
608+
}
609+
deployment
610+
.getSpec()
611+
.getFlinkConfiguration()
612+
.put(
613+
KubernetesOperatorConfigOptions
614+
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
615+
.key(),
616+
Boolean.toString(allowFallback));
617+
deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
618+
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
619+
deployment.getSpec().setFlinkVersion(FlinkVersion.v1_19);
620+
621+
// Set job status to running
622+
var jobStatus = deployment.getStatus().getJobStatus();
623+
long now = System.currentTimeMillis();
624+
625+
jobStatus.setStartTime(Long.toString(now));
626+
jobStatus.setJobId(new JobID().toString());
627+
628+
// Running state, savepoint if possible
629+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
630+
var ctx = getResourceContext(deployment);
631+
var deployConf = ctx.getDeployConfig(deployment.getSpec());
632+
633+
assertEquals(
634+
savepointsEnabled
635+
? AbstractJobReconciler.JobUpgrade.savepoint(false)
636+
: AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
637+
jobReconciler.getJobUpgrade(ctx, deployConf));
638+
639+
// Not running (but cancellable)
640+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
641+
assertEquals(
642+
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
643+
jobReconciler.getJobUpgrade(ctx, deployConf));
644+
645+
// Unknown / reconciling
646+
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
647+
assertEquals(
648+
AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
649+
jobReconciler.getJobUpgrade(ctx, deployConf));
650+
}
651+
652+
private static Stream<Arguments> testLastStateCancelParams() {
577653
return Stream.of(
578654
Arguments.of(UpgradeMode.LAST_STATE, true),
579655
Arguments.of(UpgradeMode.LAST_STATE, false),
580656
Arguments.of(UpgradeMode.SAVEPOINT, true),
581-
Arguments.of(UpgradeMode.SAVEPOINT, false),
582-
Arguments.of(UpgradeMode.STATELESS, true),
583-
Arguments.of(UpgradeMode.STATELESS, false));
657+
Arguments.of(UpgradeMode.SAVEPOINT, false));
658+
}
659+
660+
@ParameterizedTest
661+
@MethodSource("testLastStateCancelParams")
662+
public void testLastStateNoHaMeta(UpgradeMode upgradeMode, boolean allowFallback)
663+
throws Exception {
664+
var jobReconciler = (ApplicationReconciler) this.reconciler.getReconciler();
665+
var deployment = TestUtils.buildApplicationCluster();
666+
deployment
667+
.getSpec()
668+
.getFlinkConfiguration()
669+
.put(
670+
KubernetesOperatorConfigOptions
671+
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
672+
.key(),
673+
Boolean.toString(allowFallback));
674+
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
675+
deployment
676+
.getSpec()
677+
.getFlinkConfiguration()
678+
.put(
679+
KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB
680+
.key(),
681+
Boolean.toString(false));
682+
deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
683+
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
684+
685+
// Set job status to running
686+
var jobStatus = deployment.getStatus().getJobStatus();
687+
long now = System.currentTimeMillis();
688+
689+
jobStatus.setStartTime(Long.toString(now));
690+
jobStatus.setJobId(new JobID().toString());
691+
692+
// Running state, savepoint if possible
693+
jobStatus.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
694+
var ctx = getResourceContext(deployment);
695+
var deployConf = ctx.getDeployConfig(deployment.getSpec());
696+
697+
if (upgradeMode == UpgradeMode.LAST_STATE) {
698+
assertEquals(
699+
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
700+
jobReconciler.getJobUpgrade(ctx, deployConf));
701+
} else {
702+
assertEquals(
703+
allowFallback
704+
? AbstractJobReconciler.JobUpgrade.lastStateUsingCancel()
705+
: AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
706+
jobReconciler.getJobUpgrade(ctx, deployConf));
707+
}
584708
}
585709

586710
private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {

0 commit comments

Comments
 (0)