|
52 | 52 | import org.junit.jupiter.params.provider.Arguments; |
53 | 53 | import org.junit.jupiter.params.provider.MethodSource; |
54 | 54 |
|
| 55 | +import java.io.IOException; |
55 | 56 | import java.time.Instant; |
56 | 57 | import java.util.HashMap; |
57 | 58 | import java.util.List; |
@@ -473,6 +474,40 @@ public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exc |
473 | 474 | testTransitionToGreen(rs, customValue, "savepoint_1"); |
474 | 475 | } |
475 | 476 |
|
| 477 | + @ParameterizedTest |
| 478 | + @MethodSource("savepointExceptionProvider") |
| 479 | + public void verifySavepointFailureWithDifferentExceptionTypes( |
| 480 | + FlinkVersion flinkVersion, Exception savepointException) throws Exception { |
| 481 | + |
| 482 | + String expectedErrorFragment = savepointException.getMessage(); |
| 483 | + |
| 484 | + var blueGreenDeployment = |
| 485 | + buildSessionCluster( |
| 486 | + TEST_DEPLOYMENT_NAME, |
| 487 | + TEST_NAMESPACE, |
| 488 | + flinkVersion, |
| 489 | + null, |
| 490 | + UpgradeMode.SAVEPOINT); |
| 491 | + var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null); |
| 492 | + |
| 493 | + flinkService.setSavepointTriggerException(savepointException); |
| 494 | + simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null); |
| 495 | + |
| 496 | + rs = reconcile(rs.deployment); |
| 497 | + |
| 498 | + assertFailingJobStatus(rs); |
| 499 | + assertTrue(rs.reconciledStatus.getError().contains("Could not trigger Savepoint")); |
| 500 | + assertTrue(rs.reconciledStatus.getError().contains(expectedErrorFragment)); |
| 501 | + |
| 502 | + // Should remain in ACTIVE_BLUE state (no transition started) |
| 503 | + assertEquals( |
| 504 | + FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState()); |
| 505 | + |
| 506 | + // Verify only Blue deployment exists (Green was never created) |
| 507 | + var flinkDeployments = getFlinkDeployments(); |
| 508 | + assertEquals(1, flinkDeployments.size()); |
| 509 | + } |
| 510 | + |
476 | 511 | @ParameterizedTest |
477 | 512 | @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") |
478 | 513 | public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throws Exception { |
@@ -578,6 +613,21 @@ static Stream<Arguments> savepointErrorProvider() { |
578 | 613 | }); |
579 | 614 | } |
580 | 615 |
|
| 616 | + static Stream<Arguments> savepointExceptionProvider() { |
| 617 | + return TestUtils.flinkVersions() |
| 618 | + .flatMap( |
| 619 | + flinkVersionArgs -> { |
| 620 | + FlinkVersion version = (FlinkVersion) flinkVersionArgs.get()[0]; |
| 621 | + return Stream.of( |
| 622 | + Arguments.of(version, new IOException("Network timeout")), |
| 623 | + Arguments.of( |
| 624 | + version, new IllegalStateException("Job not running")), |
| 625 | + Arguments.of( |
| 626 | + version, new RuntimeException("Service unavailable")), |
| 627 | + Arguments.of(version, new Exception("Generic error"))); |
| 628 | + }); |
| 629 | + } |
| 630 | + |
581 | 631 | static Stream<Arguments> patchScenarioProvider() { |
582 | 632 | // Extract FlinkVersions from TestUtils and combine with PatchTypes |
583 | 633 | return TestUtils.flinkVersions() |
|
0 commit comments