@@ -456,12 +456,11 @@ public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exc
456456 new IllegalStateException ("Job not in valid state for savepoint" ));
457457
458458 String customValue = UUID .randomUUID ().toString ();
459- simulateChangeInSpec (rs .deployment , customValue , 0 , null );
459+ simulateSpecChange (rs .deployment , customValue );
460460
461461 // Should fail with savepoint error
462462 rs = reconcile (rs .deployment );
463- assertFailingJobStatus (rs );
464- assertTrue (rs .reconciledStatus .getError ().contains ("Job not in valid state for savepoint" ));
463+ assertFailingWithError (rs , "Job not in valid state for savepoint" );
465464
466465 // Recovery: Clear the exception and try again with new spec change
467466 flinkService .clearSavepointTriggerException ();
@@ -478,8 +477,9 @@ public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exc
478477 @ ParameterizedTest
479478 @ MethodSource ("savepointExceptionProvider" )
480479 public void verifySavepointFailureWithDifferentExceptionTypes (
481- FlinkVersion flinkVersion , Exception savepointException , String expectedErrorFragment )
482- throws Exception {
480+ FlinkVersion flinkVersion , Exception savepointException ) throws Exception {
481+
482+ String expectedErrorFragment = savepointException .getMessage ();
483483
484484 var blueGreenDeployment =
485485 buildSessionCluster (
@@ -511,6 +511,8 @@ public void verifySavepointFailureWithDifferentExceptionTypes(
511511 @ ParameterizedTest
512512 @ MethodSource ("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions" )
513513 public void verifySavepointFetchFailureRecovery (FlinkVersion flinkVersion ) throws Exception {
514+ String error = "Savepoint corrupted or not found" ;
515+
514516 var blueGreenDeployment =
515517 buildSessionCluster (
516518 TEST_DEPLOYMENT_NAME ,
@@ -522,22 +524,17 @@ public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throw
522524 var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false , null );
523525
524526 String customValue = UUID .randomUUID ().toString ();
525- simulateChangeInSpec (rs .deployment , customValue , 0 , null );
527+ simulateSpecChange (rs .deployment , customValue );
526528
527529 // Trigger savepoint successfully and go through savepointing flow
528530 rs = handleSavepoint (rs );
529531
530- // Now configure service to return fetch error - this will be detected in
531- // configureInitialSavepoint
532- flinkService .setSavepointFetchError ("Savepoint corrupted or not found" );
532+ // Configure service to return fetch error
533+ flinkService .setSavepointFetchError (error );
533534
534- // The next reconciliation should transition back to ACTIVE_BLUE and then try
535- // startTransition
536- // which will fail in configureInitialSavepoint due to fetch error
535+ // The next reconciliation will fail in configureInitialSavepoint due to fetch error
537536 rs = reconcile (rs .deployment );
538- assertFailingJobStatus (rs );
539- assertTrue (rs .reconciledStatus .getError ().contains ("Could not start Transition" ));
540- assertTrue (rs .reconciledStatus .getError ().contains ("Savepoint corrupted or not found" ));
537+ assertFailingWithError (rs , "Could not start Transition" , error );
541538
542539 // Recovery: Clear the fetch error and try again with new spec change
543540 flinkService .clearSavepointFetchError ();
@@ -552,10 +549,9 @@ public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throw
552549 }
553550
554551 @ ParameterizedTest
555- @ MethodSource ("savepointFetchErrorProvider " )
552+ @ MethodSource ("savepointErrorProvider " )
556553 public void verifySavepointFetchFailureWithDifferentErrors (
557- FlinkVersion flinkVersion , String fetchError , String expectedErrorFragment )
558- throws Exception {
554+ FlinkVersion flinkVersion , String errorMessage , boolean isFetchError ) throws Exception {
559555
560556 var blueGreenDeployment =
561557 buildSessionCluster (
@@ -566,23 +562,25 @@ public void verifySavepointFetchFailureWithDifferentErrors(
566562 UpgradeMode .SAVEPOINT );
567563 var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false , null );
568564
569- simulateChangeInSpec (rs .deployment , UUID .randomUUID ().toString (), 0 , null );
565+ simulateSpecChange (rs .deployment , UUID .randomUUID ().toString ());
570566
571- // Trigger savepoint successfully and go through savepointing flow
572- rs = handleSavepoint (rs );
567+ if (isFetchError ) {
568+ // Trigger savepoint successfully and go through savepointing flow
569+ rs = handleSavepoint (rs );
573570
574- // Configure service to return fetch error - this will be detected in
575- // configureInitialSavepoint
576- flinkService .setSavepointFetchError (fetchError );
571+ // Configure service to return fetch error
572+ flinkService .setSavepointFetchError (errorMessage );
577573
578- // The next reconciliation should transition back to ACTIVE_BLUE and then try
579- // startTransition
580- // which will fail in configureInitialSavepoint due to fetch error
581- rs = reconcile (rs .deployment );
574+ // The next reconciliation will fail in configureInitialSavepoint due to fetch error
575+ rs = reconcile (rs .deployment );
576+ assertFailingWithError (rs , "Could not start Transition" , errorMessage );
577+ } else {
578+ // Configure service to throw trigger exception
579+ flinkService .setSavepointTriggerException (new RuntimeException (errorMessage ));
582580
583- assertFailingJobStatus (rs );
584- assertTrue (rs . reconciledStatus . getError (). contains ( "Could not start Transition" ) );
585- assertTrue ( rs . reconciledStatus . getError (). contains ( expectedErrorFragment ));
581+ rs = reconcile (rs . deployment );
582+ assertFailingWithError (rs , "Could not trigger Savepoint" , errorMessage );
583+ }
586584
587585 // Should remain in ACTIVE_BLUE state after failure
588586 assertEquals (
@@ -595,28 +593,23 @@ public void verifySavepointFetchFailureWithDifferentErrors(
595593
596594 // ==================== Parameterized Test Inputs ====================
597595
598- static Stream <Arguments > savepointFetchErrorProvider () {
596+ static Stream <Arguments > savepointErrorProvider () {
599597 return TestUtils .flinkVersions ()
600598 .flatMap (
601599 flinkVersionArgs -> {
602600 FlinkVersion version = (FlinkVersion ) flinkVersionArgs .get ()[0 ];
603601 return Stream .of (
602+ // Fetch errors
603+ Arguments .of (version , "Savepoint file corrupted" , true ),
604+ Arguments .of (version , "Storage system unavailable" , true ),
604605 Arguments .of (
605- version ,
606- "Savepoint file corrupted" ,
607- "Savepoint file corrupted" ),
608- Arguments .of (
609- version ,
610- "Storage system unavailable" ,
611- "Storage system unavailable" ),
612- Arguments .of (
613- version ,
614- "Access denied to savepoint location" ,
615- "Access denied to savepoint location" ),
616- Arguments .of (
617- version ,
618- "Savepoint metadata missing" ,
619- "Savepoint metadata missing" ));
606+ version , "Access denied to savepoint location" , true ),
607+ Arguments .of (version , "Savepoint metadata missing" , true ),
608+ // Trigger exceptions
609+ Arguments .of (version , "Network timeout" , false ),
610+ Arguments .of (version , "Job not running" , false ),
611+ Arguments .of (version , "Service unavailable" , false ),
612+ Arguments .of (version , "Generic error" , false ));
620613 });
621614 }
622615
@@ -626,22 +619,12 @@ static Stream<Arguments> savepointExceptionProvider() {
626619 flinkVersionArgs -> {
627620 FlinkVersion version = (FlinkVersion ) flinkVersionArgs .get ()[0 ];
628621 return Stream .of (
622+ Arguments .of (version , new IOException ("Network timeout" )),
629623 Arguments .of (
630- version ,
631- new IOException ("Network timeout" ),
632- "Network timeout" ),
624+ version , new IllegalStateException ("Job not running" )),
633625 Arguments .of (
634- version ,
635- new IllegalStateException ("Job not running" ),
636- "Job not running" ),
637- Arguments .of (
638- version ,
639- new RuntimeException ("Service unavailable" ),
640- "Service unavailable" ),
641- Arguments .of (
642- version ,
643- new Exception ("Generic error" ),
644- "Generic error" ));
626+ version , new RuntimeException ("Service unavailable" )),
627+ Arguments .of (version , new Exception ("Generic error" )));
645628 });
646629 }
647630
@@ -897,6 +880,15 @@ private static void assertFailingJobStatus(
897880 assertNotNull (rs .reconciledStatus .getError ());
898881 }
899882
883+ private static void assertFailingWithError (
884+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
885+ String ... expectedErrorFragments ) {
886+ assertFailingJobStatus (rs );
887+ for (String fragment : expectedErrorFragments ) {
888+ assertTrue (rs .reconciledStatus .getError ().contains (fragment ));
889+ }
890+ }
891+
900892 private void assertFinalized (
901893 long minReconciliationTs ,
902894 TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
@@ -1102,6 +1094,10 @@ private void simulateChangeInSpec(
11021094 kubernetesClient .resource (blueGreenDeployment ).createOrReplace ();
11031095 }
11041096
1097+ private void simulateSpecChange (FlinkBlueGreenDeployment deployment , String customValue ) {
1098+ simulateChangeInSpec (deployment , customValue , 0 , null );
1099+ }
1100+
11051101 /*
11061102 Convenience function to reconcile and get the frequently used `BlueGreenReconciliationResult`
11071103 */
0 commit comments