@@ -435,6 +435,16 @@ public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toM
435435 }
436436 }
437437
438+ private static Stream <Arguments > testInitialJmDeployCannotStartParams () {
439+ return Stream .of (
440+ Arguments .of (UpgradeMode .LAST_STATE , true ),
441+ Arguments .of (UpgradeMode .LAST_STATE , false ),
442+ Arguments .of (UpgradeMode .SAVEPOINT , true ),
443+ Arguments .of (UpgradeMode .SAVEPOINT , false ),
444+ Arguments .of (UpgradeMode .STATELESS , true ),
445+ Arguments .of (UpgradeMode .STATELESS , false ));
446+ }
447+
438448 @ ParameterizedTest
439449 @ MethodSource ("testInitialJmDeployCannotStartParams" )
440450 public void testInitialJmDeployCannotStartLegacy (UpgradeMode upgradeMode , boolean initSavepoint )
@@ -613,14 +623,128 @@ public void testLastStateMaxCheckpointAge(boolean cancellable) throws Exception
613623 jobReconciler .getJobUpgrade (ctx , deployConf ));
614624 }
615625
616- private static Stream <Arguments > testInitialJmDeployCannotStartParams () {
626+ private static Stream <Arguments > testVersionUpgradeTestParams () {
627+ return Stream .of (
628+ Arguments .of (UpgradeMode .LAST_STATE , true , true ),
629+ Arguments .of (UpgradeMode .LAST_STATE , true , false ),
630+ Arguments .of (UpgradeMode .LAST_STATE , false , true ),
631+ Arguments .of (UpgradeMode .LAST_STATE , false , false ),
632+ Arguments .of (UpgradeMode .SAVEPOINT , true , true ),
633+ Arguments .of (UpgradeMode .SAVEPOINT , true , false ));
634+ }
635+
636+ @ ParameterizedTest
637+ @ MethodSource ("testVersionUpgradeTestParams" )
638+ public void testFlinkVersionSwitching (
639+ UpgradeMode upgradeMode , boolean savepointsEnabled , boolean allowFallback )
640+ throws Exception {
641+ var jobReconciler = (ApplicationReconciler ) this .reconciler .getReconciler ();
642+ var deployment = TestUtils .buildApplicationCluster (FlinkVersion .v1_18 );
643+ if (!savepointsEnabled ) {
644+ deployment
645+ .getSpec ()
646+ .getFlinkConfiguration ()
647+ .remove (CheckpointingOptions .SAVEPOINT_DIRECTORY .key ());
648+ }
649+ deployment
650+ .getSpec ()
651+ .getFlinkConfiguration ()
652+ .put (
653+ KubernetesOperatorConfigOptions
654+ .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
655+ .key (),
656+ Boolean .toString (allowFallback ));
657+ deployment .getSpec ().getJob ().setUpgradeMode (upgradeMode );
658+ ReconciliationUtils .updateStatusForDeployedSpec (deployment , new Configuration ());
659+ deployment .getSpec ().setFlinkVersion (FlinkVersion .v1_19 );
660+
661+ // Set job status to running
662+ var jobStatus = deployment .getStatus ().getJobStatus ();
663+ long now = System .currentTimeMillis ();
664+
665+ jobStatus .setStartTime (Long .toString (now ));
666+ jobStatus .setJobId (new JobID ().toString ());
667+
668+ // Running state, savepoint if possible
669+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RUNNING .name ());
670+ var ctx = getResourceContext (deployment );
671+ var deployConf = ctx .getDeployConfig (deployment .getSpec ());
672+
673+ assertEquals (
674+ savepointsEnabled
675+ ? AbstractJobReconciler .JobUpgrade .savepoint (false )
676+ : AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
677+ jobReconciler .getJobUpgrade (ctx , deployConf ));
678+
679+ // Not running (but cancellable)
680+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RESTARTING .name ());
681+ assertEquals (
682+ AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
683+ jobReconciler .getJobUpgrade (ctx , deployConf ));
684+
685+ // Unknown / reconciling
686+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RECONCILING .name ());
687+ assertEquals (
688+ AbstractJobReconciler .JobUpgrade .pendingUpgrade (),
689+ jobReconciler .getJobUpgrade (ctx , deployConf ));
690+ }
691+
692+ private static Stream <Arguments > testLastStateCancelParams () {
617693 return Stream .of (
618694 Arguments .of (UpgradeMode .LAST_STATE , true ),
619695 Arguments .of (UpgradeMode .LAST_STATE , false ),
620696 Arguments .of (UpgradeMode .SAVEPOINT , true ),
621- Arguments .of (UpgradeMode .SAVEPOINT , false ),
622- Arguments .of (UpgradeMode .STATELESS , true ),
623- Arguments .of (UpgradeMode .STATELESS , false ));
697+ Arguments .of (UpgradeMode .SAVEPOINT , false ));
698+ }
699+
700+ @ ParameterizedTest
701+ @ MethodSource ("testLastStateCancelParams" )
702+ public void testLastStateNoHaMeta (UpgradeMode upgradeMode , boolean allowFallback )
703+ throws Exception {
704+ var jobReconciler = (ApplicationReconciler ) this .reconciler .getReconciler ();
705+ var deployment = TestUtils .buildApplicationCluster ();
706+ deployment
707+ .getSpec ()
708+ .getFlinkConfiguration ()
709+ .put (
710+ KubernetesOperatorConfigOptions
711+ .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
712+ .key (),
713+ Boolean .toString (allowFallback ));
714+ deployment .getSpec ().getFlinkConfiguration ().remove (HighAvailabilityOptions .HA_MODE .key ());
715+ deployment
716+ .getSpec ()
717+ .getFlinkConfiguration ()
718+ .put (
719+ KubernetesOperatorConfigOptions .OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB
720+ .key (),
721+ Boolean .toString (false ));
722+ deployment .getSpec ().getJob ().setUpgradeMode (upgradeMode );
723+ ReconciliationUtils .updateStatusForDeployedSpec (deployment , new Configuration ());
724+
725+ // Set job status to running
726+ var jobStatus = deployment .getStatus ().getJobStatus ();
727+ long now = System .currentTimeMillis ();
728+
729+ jobStatus .setStartTime (Long .toString (now ));
730+ jobStatus .setJobId (new JobID ().toString ());
731+
732+ // Running state, savepoint if possible
733+ jobStatus .setState (org .apache .flink .api .common .JobStatus .FAILING .name ());
734+ var ctx = getResourceContext (deployment );
735+ var deployConf = ctx .getDeployConfig (deployment .getSpec ());
736+
737+ if (upgradeMode == UpgradeMode .LAST_STATE ) {
738+ assertEquals (
739+ AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
740+ jobReconciler .getJobUpgrade (ctx , deployConf ));
741+ } else {
742+ assertEquals (
743+ allowFallback
744+ ? AbstractJobReconciler .JobUpgrade .lastStateUsingCancel ()
745+ : AbstractJobReconciler .JobUpgrade .pendingUpgrade (),
746+ jobReconciler .getJobUpgrade (ctx , deployConf ));
747+ }
624748 }
625749
626750 private static Stream <Arguments > testUpgradeJmDeployCannotStartParams () {
0 commit comments