@@ -396,6 +396,16 @@ public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toM
396396 }
397397 }
398398
399+ private static Stream <Arguments > testInitialJmDeployCannotStartParams () {
400+ return Stream .of (
401+ Arguments .of (UpgradeMode .LAST_STATE , true ),
402+ Arguments .of (UpgradeMode .LAST_STATE , false ),
403+ Arguments .of (UpgradeMode .SAVEPOINT , true ),
404+ Arguments .of (UpgradeMode .SAVEPOINT , false ),
405+ Arguments .of (UpgradeMode .STATELESS , true ),
406+ Arguments .of (UpgradeMode .STATELESS , false ));
407+ }
408+
399409 @ ParameterizedTest
400410 @ MethodSource ("testInitialJmDeployCannotStartParams" )
401411 public void testInitialJmDeployCannotStartLegacy (UpgradeMode upgradeMode , boolean initSavepoint )
@@ -573,14 +583,128 @@ public void testLastStateMaxCheckpointAge(boolean cancellable) throws Exception
573583 jobReconciler .getJobUpgrade (ctx , deployConf ));
574584 }
575585
576- private static Stream <Arguments > testInitialJmDeployCannotStartParams () {
586+ private static Stream <Arguments > testVersionUpgradeTestParams () {
587+ return Stream .of (
588+ Arguments .of (UpgradeMode .LAST_STATE , true , true ),
589+ Arguments .of (UpgradeMode .LAST_STATE , true , false ),
590+ Arguments .of (UpgradeMode .LAST_STATE , false , true ),
591+ Arguments .of (UpgradeMode .LAST_STATE , false , false ),
592+ Arguments .of (UpgradeMode .SAVEPOINT , true , true ),
593+ Arguments .of (UpgradeMode .SAVEPOINT , true , false ));
594+ }
595+
596+ @ ParameterizedTest
597+ @ MethodSource ("testVersionUpgradeTestParams" )
598+ public void testFlinkVersionSwitching (
599+ UpgradeMode upgradeMode , boolean savepointsEnabled , boolean allowFallback )
600+ throws Exception {
601+ var jobReconciler = (ApplicationReconciler ) this .reconciler .getReconciler ();
602+ var deployment = TestUtils .buildApplicationCluster (FlinkVersion .v1_18 );
603+ if (!savepointsEnabled ) {
604+ deployment
605+ .getSpec ()
606+ .getFlinkConfiguration ()
607+ .remove (CheckpointingOptions .SAVEPOINT_DIRECTORY .key ());
608+ }
609+ deployment
610+ .getSpec ()
611+ .getFlinkConfiguration ()
612+ .put (
613+ KubernetesOperatorConfigOptions
614+ .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
615+ .key (),
616+ Boolean .toString (allowFallback ));
617+ deployment .getSpec ().getJob ().setUpgradeMode (upgradeMode );
618+ ReconciliationUtils .updateStatusForDeployedSpec (deployment , new Configuration ());
619+ deployment .getSpec ().setFlinkVersion (FlinkVersion .v1_19 );
620+
621+ // Set job status to running
622+ var jobStatus = deployment .getStatus ().getJobStatus ();
623+ long now = System .currentTimeMillis ();
624+
625+ jobStatus .setStartTime (Long .toString (now ));
626+ jobStatus .setJobId (new JobID ().toString ());
627+
628+ // Running state, savepoint if possible
629+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RUNNING .name ());
630+ var ctx = getResourceContext (deployment );
631+ var deployConf = ctx .getDeployConfig (deployment .getSpec ());
632+
633+ assertEquals (
634+ savepointsEnabled
635+ ? AbstractJobReconciler .JobUpgrade .savepoint (false )
636+ : AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
637+ jobReconciler .getJobUpgrade (ctx , deployConf ));
638+
639+ // Not running (but cancellable)
640+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RESTARTING .name ());
641+ assertEquals (
642+ AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
643+ jobReconciler .getJobUpgrade (ctx , deployConf ));
644+
645+ // Unknown / reconciling
646+ jobStatus .setState (org .apache .flink .api .common .JobStatus .RECONCILING .name ());
647+ assertEquals (
648+ AbstractJobReconciler .JobUpgrade .pendingUpgrade (),
649+ jobReconciler .getJobUpgrade (ctx , deployConf ));
650+ }
651+
652+ private static Stream <Arguments > testLastStateCancelParams () {
577653 return Stream .of (
578654 Arguments .of (UpgradeMode .LAST_STATE , true ),
579655 Arguments .of (UpgradeMode .LAST_STATE , false ),
580656 Arguments .of (UpgradeMode .SAVEPOINT , true ),
581- Arguments .of (UpgradeMode .SAVEPOINT , false ),
582- Arguments .of (UpgradeMode .STATELESS , true ),
583- Arguments .of (UpgradeMode .STATELESS , false ));
657+ Arguments .of (UpgradeMode .SAVEPOINT , false ));
658+ }
659+
660+ @ ParameterizedTest
661+ @ MethodSource ("testLastStateCancelParams" )
662+ public void testLastStateNoHaMeta (UpgradeMode upgradeMode , boolean allowFallback )
663+ throws Exception {
664+ var jobReconciler = (ApplicationReconciler ) this .reconciler .getReconciler ();
665+ var deployment = TestUtils .buildApplicationCluster ();
666+ deployment
667+ .getSpec ()
668+ .getFlinkConfiguration ()
669+ .put (
670+ KubernetesOperatorConfigOptions
671+ .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
672+ .key (),
673+ Boolean .toString (allowFallback ));
674+ deployment .getSpec ().getFlinkConfiguration ().remove (HighAvailabilityOptions .HA_MODE .key ());
675+ deployment
676+ .getSpec ()
677+ .getFlinkConfiguration ()
678+ .put (
679+ KubernetesOperatorConfigOptions .OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB
680+ .key (),
681+ Boolean .toString (false ));
682+ deployment .getSpec ().getJob ().setUpgradeMode (upgradeMode );
683+ ReconciliationUtils .updateStatusForDeployedSpec (deployment , new Configuration ());
684+
685+ // Set job status to running
686+ var jobStatus = deployment .getStatus ().getJobStatus ();
687+ long now = System .currentTimeMillis ();
688+
689+ jobStatus .setStartTime (Long .toString (now ));
690+ jobStatus .setJobId (new JobID ().toString ());
691+
692+ // Running state, savepoint if possible
693+ jobStatus .setState (org .apache .flink .api .common .JobStatus .FAILING .name ());
694+ var ctx = getResourceContext (deployment );
695+ var deployConf = ctx .getDeployConfig (deployment .getSpec ());
696+
697+ if (upgradeMode == UpgradeMode .LAST_STATE ) {
698+ assertEquals (
699+ AbstractJobReconciler .JobUpgrade .lastStateUsingCancel (),
700+ jobReconciler .getJobUpgrade (ctx , deployConf ));
701+ } else {
702+ assertEquals (
703+ allowFallback
704+ ? AbstractJobReconciler .JobUpgrade .lastStateUsingCancel ()
705+ : AbstractJobReconciler .JobUpgrade .pendingUpgrade (),
706+ jobReconciler .getJobUpgrade (ctx , deployConf ));
707+ }
584708 }
585709
586710 private static Stream <Arguments > testUpgradeJmDeployCannotStartParams () {
0 commit comments