5252import org .junit .jupiter .params .provider .Arguments ;
5353import org .junit .jupiter .params .provider .MethodSource ;
5454
55- import java .io .IOException ;
5655import java .time .Instant ;
5756import java .util .HashMap ;
5857import java .util .List ;
@@ -456,12 +455,11 @@ public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exc
456455 new IllegalStateException ("Job not in valid state for savepoint" ));
457456
458457 String customValue = UUID .randomUUID ().toString ();
459- simulateChangeInSpec (rs .deployment , customValue , 0 , null );
458+ simulateSpecChange (rs .deployment , customValue );
460459
461460 // Should fail with savepoint error
462461 rs = reconcile (rs .deployment );
463- assertFailingJobStatus (rs );
464- assertTrue (rs .reconciledStatus .getError ().contains ("Job not in valid state for savepoint" ));
462+ assertFailingWithError (rs , "Job not in valid state for savepoint" );
465463
466464 // Recovery: Clear the exception and try again with new spec change
467465 flinkService .clearSavepointTriggerException ();
@@ -475,42 +473,11 @@ public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exc
475473 testTransitionToGreen (rs , customValue , "savepoint_1" );
476474 }
477475
478- @ ParameterizedTest
479- @ MethodSource ("savepointExceptionProvider" )
480- public void verifySavepointFailureWithDifferentExceptionTypes (
481- FlinkVersion flinkVersion , Exception savepointException , String expectedErrorFragment )
482- throws Exception {
483-
484- var blueGreenDeployment =
485- buildSessionCluster (
486- TEST_DEPLOYMENT_NAME ,
487- TEST_NAMESPACE ,
488- flinkVersion ,
489- null ,
490- UpgradeMode .SAVEPOINT );
491- var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false , null );
492-
493- flinkService .setSavepointTriggerException (savepointException );
494- simulateChangeInSpec (rs .deployment , UUID .randomUUID ().toString (), 0 , null );
495-
496- rs = reconcile (rs .deployment );
497-
498- assertFailingJobStatus (rs );
499- assertTrue (rs .reconciledStatus .getError ().contains ("Could not trigger Savepoint" ));
500- assertTrue (rs .reconciledStatus .getError ().contains (expectedErrorFragment ));
501-
502- // Should remain in ACTIVE_BLUE state (no transition started)
503- assertEquals (
504- FlinkBlueGreenDeploymentState .ACTIVE_BLUE , rs .reconciledStatus .getBlueGreenState ());
505-
506- // Verify only Blue deployment exists (Green was never created)
507- var flinkDeployments = getFlinkDeployments ();
508- assertEquals (1 , flinkDeployments .size ());
509- }
510-
511476 @ ParameterizedTest
512477 @ MethodSource ("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions" )
513478 public void verifySavepointFetchFailureRecovery (FlinkVersion flinkVersion ) throws Exception {
479+ String error = "Savepoint corrupted or not found" ;
480+
514481 var blueGreenDeployment =
515482 buildSessionCluster (
516483 TEST_DEPLOYMENT_NAME ,
@@ -522,22 +489,17 @@ public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throw
522489 var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false , null );
523490
524491 String customValue = UUID .randomUUID ().toString ();
525- simulateChangeInSpec (rs .deployment , customValue , 0 , null );
492+ simulateSpecChange (rs .deployment , customValue );
526493
527494 // Trigger savepoint successfully and go through savepointing flow
528495 rs = handleSavepoint (rs );
529496
530- // Now configure service to return fetch error - this will be detected in
531- // configureInitialSavepoint
532- flinkService .setSavepointFetchError ("Savepoint corrupted or not found" );
497+ // Configure service to return fetch error
498+ flinkService .setSavepointFetchError (error );
533499
534- // The next reconciliation should transition back to ACTIVE_BLUE and then try
535- // startTransition
536- // which will fail in configureInitialSavepoint due to fetch error
500+ // The next reconciliation will fail in configureInitialSavepoint due to fetch error
537501 rs = reconcile (rs .deployment );
538- assertFailingJobStatus (rs );
539- assertTrue (rs .reconciledStatus .getError ().contains ("Could not start Transition" ));
540- assertTrue (rs .reconciledStatus .getError ().contains ("Savepoint corrupted or not found" ));
502+ assertFailingWithError (rs , "Could not start Transition" , error );
541503
542504 // Recovery: Clear the fetch error and try again with new spec change
543505 flinkService .clearSavepointFetchError ();
@@ -552,10 +514,9 @@ public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throw
552514 }
553515
554516 @ ParameterizedTest
555- @ MethodSource ("savepointFetchErrorProvider " )
517+ @ MethodSource ("savepointErrorProvider " )
556518 public void verifySavepointFetchFailureWithDifferentErrors (
557- FlinkVersion flinkVersion , String fetchError , String expectedErrorFragment )
558- throws Exception {
519+ FlinkVersion flinkVersion , String errorMessage , boolean isFetchError ) throws Exception {
559520
560521 var blueGreenDeployment =
561522 buildSessionCluster (
@@ -566,23 +527,25 @@ public void verifySavepointFetchFailureWithDifferentErrors(
566527 UpgradeMode .SAVEPOINT );
567528 var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false , null );
568529
569- simulateChangeInSpec (rs .deployment , UUID .randomUUID ().toString (), 0 , null );
530+ simulateSpecChange (rs .deployment , UUID .randomUUID ().toString ());
570531
571- // Trigger savepoint successfully and go through savepointing flow
572- rs = handleSavepoint (rs );
532+ if (isFetchError ) {
533+ // Trigger savepoint successfully and go through savepointing flow
534+ rs = handleSavepoint (rs );
573535
574- // Configure service to return fetch error - this will be detected in
575- // configureInitialSavepoint
576- flinkService .setSavepointFetchError (fetchError );
536+ // Configure service to return fetch error
537+ flinkService .setSavepointFetchError (errorMessage );
577538
578- // The next reconciliation should transition back to ACTIVE_BLUE and then try
579- // startTransition
580- // which will fail in configureInitialSavepoint due to fetch error
581- rs = reconcile (rs .deployment );
539+ // The next reconciliation will fail in configureInitialSavepoint due to fetch error
540+ rs = reconcile (rs .deployment );
541+ assertFailingWithError (rs , "Could not start Transition" , errorMessage );
542+ } else {
543+ // Configure service to throw trigger exception
544+ flinkService .setSavepointTriggerException (new RuntimeException (errorMessage ));
582545
583- assertFailingJobStatus (rs );
584- assertTrue (rs . reconciledStatus . getError (). contains ( "Could not start Transition" ) );
585- assertTrue ( rs . reconciledStatus . getError (). contains ( expectedErrorFragment ));
546+ rs = reconcile (rs . deployment );
547+ assertFailingWithError (rs , "Could not trigger Savepoint" , errorMessage );
548+ }
586549
587550 // Should remain in ACTIVE_BLUE state after failure
588551 assertEquals (
@@ -595,53 +558,23 @@ public void verifySavepointFetchFailureWithDifferentErrors(
595558
596559 // ==================== Parameterized Test Inputs ====================
597560
598- static Stream <Arguments > savepointFetchErrorProvider () {
561+ static Stream <Arguments > savepointErrorProvider () {
599562 return TestUtils .flinkVersions ()
600563 .flatMap (
601564 flinkVersionArgs -> {
602565 FlinkVersion version = (FlinkVersion ) flinkVersionArgs .get ()[0 ];
603566 return Stream .of (
567+ // Fetch errors
568+ Arguments .of (version , "Savepoint file corrupted" , true ),
569+ Arguments .of (version , "Storage system unavailable" , true ),
604570 Arguments .of (
605- version ,
606- "Savepoint file corrupted" ,
607- "Savepoint file corrupted" ),
608- Arguments .of (
609- version ,
610- "Storage system unavailable" ,
611- "Storage system unavailable" ),
612- Arguments .of (
613- version ,
614- "Access denied to savepoint location" ,
615- "Access denied to savepoint location" ),
616- Arguments .of (
617- version ,
618- "Savepoint metadata missing" ,
619- "Savepoint metadata missing" ));
620- });
621- }
622-
623- static Stream <Arguments > savepointExceptionProvider () {
624- return TestUtils .flinkVersions ()
625- .flatMap (
626- flinkVersionArgs -> {
627- FlinkVersion version = (FlinkVersion ) flinkVersionArgs .get ()[0 ];
628- return Stream .of (
629- Arguments .of (
630- version ,
631- new IOException ("Network timeout" ),
632- "Network timeout" ),
633- Arguments .of (
634- version ,
635- new IllegalStateException ("Job not running" ),
636- "Job not running" ),
637- Arguments .of (
638- version ,
639- new RuntimeException ("Service unavailable" ),
640- "Service unavailable" ),
641- Arguments .of (
642- version ,
643- new Exception ("Generic error" ),
644- "Generic error" ));
571+ version , "Access denied to savepoint location" , true ),
572+ Arguments .of (version , "Savepoint metadata missing" , true ),
573+ // Trigger exceptions
574+ Arguments .of (version , "Network timeout" , false ),
575+ Arguments .of (version , "Job not running" , false ),
576+ Arguments .of (version , "Service unavailable" , false ),
577+ Arguments .of (version , "Generic error" , false ));
645578 });
646579 }
647580
@@ -897,6 +830,15 @@ private static void assertFailingJobStatus(
897830 assertNotNull (rs .reconciledStatus .getError ());
898831 }
899832
833+ private static void assertFailingWithError (
834+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
835+ String ... expectedErrorFragments ) {
836+ assertFailingJobStatus (rs );
837+ for (String fragment : expectedErrorFragments ) {
838+ assertTrue (rs .reconciledStatus .getError ().contains (fragment ));
839+ }
840+ }
841+
900842 private void assertFinalized (
901843 long minReconciliationTs ,
902844 TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
@@ -1102,6 +1044,10 @@ private void simulateChangeInSpec(
11021044 kubernetesClient .resource (blueGreenDeployment ).createOrReplace ();
11031045 }
11041046
1047+ private void simulateSpecChange (FlinkBlueGreenDeployment deployment , String customValue ) {
1048+ simulateChangeInSpec (deployment , customValue , 0 , null );
1049+ }
1050+
11051051 /*
11061052 Convenience function to reconcile and get the frequently used `BlueGreenReconciliationResult`
11071053 */
0 commit comments