Skip to content

Commit 93e1954

Browse files
committed
Handling errors when fetching a savepoint.
1 parent 3bd0f18 commit 93e1954

File tree

4 files changed

+149
-26
lines changed

4 files changed

+149
-26
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,14 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
135135
}
136136

137137
setLastReconciledSpec(context);
138-
return startTransition(
139-
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
138+
try {
139+
return startTransition(
140+
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
141+
} catch (Exception e) {
142+
var error = "Could not start Transition. Details: " + e.getMessage();
143+
context.getDeploymentStatus().setSavepointTriggerId(null);
144+
return markDeploymentFailing(context, error);
145+
}
140146
} else {
141147
setLastReconciledSpec(context);
142148
LOG.info(
@@ -287,8 +293,16 @@ private Savepoint configureInitialSavepoint(
287293
context.getCtxFactory()
288294
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
289295

290-
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
291-
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
296+
String triggerId = context.getDeploymentStatus().getSavepointTriggerId();
297+
var savepointFetchResult = fetchSavepointInfo(ctx, triggerId);
298+
299+
if (savepointFetchResult.getError() != null
300+
&& !savepointFetchResult.getError().isEmpty()) {
301+
throw new RuntimeException(
302+
String.format(
303+
"Could not fetch savepoint with triggerId: %s. Error: %s",
304+
triggerId, savepointFetchResult.getError()));
305+
}
292306

293307
return getSavepointObject(ctx, savepointFetchResult.getLocation());
294308
}
@@ -551,7 +565,7 @@ private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
551565
}
552566

553567
@NotNull
554-
private static UpdateControl<FlinkBlueGreenDeployment> markDeploymentFailing(
568+
public static UpdateControl<FlinkBlueGreenDeployment> markDeploymentFailing(
555569
BlueGreenContext context, String error) {
556570
LOG.error(error);
557571
return patchStatusUpdateControl(context, null, JobStatus.FAILING, error);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public SavepointingStateHandler(
4040
@Override
4141
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
4242
BlueGreenDeploymentType currentType = getCurrentDeploymentType();
43-
var isSavepointReady = deploymentService.monitorSavepoint(context, currentType);
43+
boolean isSavepointReady = deploymentService.monitorSavepoint(context, currentType);
4444

4545
// Savepoint creation completed, transition back to active state to continue deployment
4646
if (isSavepointReady) {
@@ -51,6 +51,9 @@ public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context)
5151
return patchStatusUpdateControl(context, nextState, null, null).rescheduleAfter(0);
5252
}
5353

54+
// TODO: this will wait indefinitely for a savepoint to complete,
55+
// we could abort the transition, WITHOUT SUSPENDING the FlinkDeployment,
56+
// if the grace period is exceeded.
5457
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate()
5558
.rescheduleAfter(getReconciliationReschedInterval(context));
5659
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public class TestingFlinkService extends AbstractFlinkService {
136136
@Setter private Exception makeItFailWith;
137137
@Setter private boolean triggerSavepointFailure = false;
138138
@Setter private Exception savepointTriggerException = null;
139+
@Setter private String savepointFetchError = null;
139140
@Setter private boolean disposeSavepointFailure = false;
140141
@Setter private Runnable sessionJobSubmittedCallback;
141142
@Setter private PodList podList = new PodList();
@@ -397,6 +398,10 @@ public String triggerCheckpoint(
397398
public SavepointFetchResult fetchSavepointInfo(
398399
String triggerId, String jobId, Configuration conf) {
399400

401+
if (savepointFetchError != null) {
402+
return SavepointFetchResult.error(savepointFetchError);
403+
}
404+
400405
if (savepointTriggers.containsKey(triggerId)) {
401406
if (savepointTriggers.get(triggerId)) {
402407
return SavepointFetchResult.completed("savepoint_" + savepointCounter++);
@@ -781,4 +786,12 @@ public void setSavepointTriggerException(Exception exception) {
781786
public void clearSavepointTriggerException() {
782787
this.savepointTriggerException = null;
783788
}
789+
790+
public void setSavepointFetchError(String error) {
791+
this.savepointFetchError = error;
792+
}
793+
794+
public void clearSavepointFetchError() {
795+
this.savepointFetchError = null;
796+
}
784797
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 113 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -150,32 +150,12 @@ public void verifyBasicTransition(
150150
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult handleSavepoint(
151151
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs)
152152
throws Exception {
153-
return handleSavepointWithFailure(rs, null);
154-
}
155-
156-
@NotNull
157-
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
158-
handleSavepointWithFailure(
159-
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs,
160-
Exception expectedException)
161-
throws Exception {
162-
163-
if (expectedException != null) {
164-
flinkService.setSavepointTriggerException(expectedException);
165-
}
166153

167154
var triggers = flinkService.getSavepointTriggers();
168155
triggers.clear();
169156

170157
rs = reconcile(rs.deployment);
171158

172-
if (expectedException != null) {
173-
// Should fail immediately without entering savepointing state
174-
assertFailingJobStatus(rs);
175-
return rs;
176-
}
177-
178-
// Continue with existing successful savepoint logic...
179159
// Simulating a pending savepoint
180160
triggers.put(rs.deployment.getStatus().getSavepointTriggerId(), false);
181161

@@ -189,6 +169,9 @@ private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
189169
// This next reconciliation should continue waiting on the pending savepoint
190170
rs = reconcile(rs.deployment);
191171

172+
// NOTE: internally the above reconcile call invokes the fetchSavepointInfo on the trigger,
173+
// the TestFlinkService automatically sets it to "true" (completed)
174+
192175
assertTrue(rs.updateControl.isNoUpdate());
193176
assertTrue(rs.updateControl.getScheduleDelay().isPresent());
194177

@@ -525,8 +508,118 @@ public void verifySavepointFailureWithDifferentExceptionTypes(
525508
assertEquals(1, flinkDeployments.size());
526509
}
527510

511+
@ParameterizedTest
512+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
513+
public void verifySavepointFetchFailureRecovery(FlinkVersion flinkVersion) throws Exception {
514+
var blueGreenDeployment =
515+
buildSessionCluster(
516+
TEST_DEPLOYMENT_NAME,
517+
TEST_NAMESPACE,
518+
flinkVersion,
519+
null,
520+
UpgradeMode.SAVEPOINT);
521+
522+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);
523+
524+
String customValue = UUID.randomUUID().toString();
525+
simulateChangeInSpec(rs.deployment, customValue, 0, null);
526+
527+
// Trigger savepoint successfully and go through savepointing flow
528+
rs = handleSavepoint(rs);
529+
530+
// Now configure service to return fetch error - this will be detected in
531+
// configureInitialSavepoint
532+
flinkService.setSavepointFetchError("Savepoint corrupted or not found");
533+
534+
// The next reconciliation should transition back to ACTIVE_BLUE and then try
535+
// startTransition
536+
// which will fail in configureInitialSavepoint due to fetch error
537+
rs = reconcile(rs.deployment);
538+
assertFailingJobStatus(rs);
539+
assertTrue(rs.reconciledStatus.getError().contains("Could not start Transition"));
540+
assertTrue(rs.reconciledStatus.getError().contains("Savepoint corrupted or not found"));
541+
542+
// Recovery: Clear the fetch error and try again with new spec change
543+
flinkService.clearSavepointFetchError();
544+
customValue = UUID.randomUUID().toString() + "_recovery";
545+
simulateChangeInSpec(rs.deployment, customValue, ALT_DELETION_DELAY_VALUE, null);
546+
547+
// Should now succeed and complete transition properly
548+
rs = handleSavepoint(rs);
549+
550+
// Continue with successful transition - second savepoint will be "savepoint_2"
551+
testTransitionToGreen(rs, customValue, "savepoint_2");
552+
}
553+
554+
@ParameterizedTest
555+
@MethodSource("savepointFetchErrorProvider")
556+
public void verifySavepointFetchFailureWithDifferentErrors(
557+
FlinkVersion flinkVersion, String fetchError, String expectedErrorFragment)
558+
throws Exception {
559+
560+
var blueGreenDeployment =
561+
buildSessionCluster(
562+
TEST_DEPLOYMENT_NAME,
563+
TEST_NAMESPACE,
564+
flinkVersion,
565+
null,
566+
UpgradeMode.SAVEPOINT);
567+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);
568+
569+
simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null);
570+
571+
// Trigger savepoint successfully and go through savepointing flow
572+
rs = handleSavepoint(rs);
573+
574+
// Configure service to return fetch error - this will be detected in
575+
// configureInitialSavepoint
576+
flinkService.setSavepointFetchError(fetchError);
577+
578+
// The next reconciliation should transition back to ACTIVE_BLUE and then try
579+
// startTransition
580+
// which will fail in configureInitialSavepoint due to fetch error
581+
rs = reconcile(rs.deployment);
582+
583+
assertFailingJobStatus(rs);
584+
assertTrue(rs.reconciledStatus.getError().contains("Could not start Transition"));
585+
assertTrue(rs.reconciledStatus.getError().contains(expectedErrorFragment));
586+
587+
// Should remain in ACTIVE_BLUE state after failure
588+
assertEquals(
589+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
590+
591+
// Only Blue deployment should exist (Green transition never started)
592+
var flinkDeployments = getFlinkDeployments();
593+
assertEquals(1, flinkDeployments.size());
594+
}
595+
528596
// ==================== Parameterized Test Inputs ====================
529597

598+
static Stream<Arguments> savepointFetchErrorProvider() {
599+
return TestUtils.flinkVersions()
600+
.flatMap(
601+
flinkVersionArgs -> {
602+
FlinkVersion version = (FlinkVersion) flinkVersionArgs.get()[0];
603+
return Stream.of(
604+
Arguments.of(
605+
version,
606+
"Savepoint file corrupted",
607+
"Savepoint file corrupted"),
608+
Arguments.of(
609+
version,
610+
"Storage system unavailable",
611+
"Storage system unavailable"),
612+
Arguments.of(
613+
version,
614+
"Access denied to savepoint location",
615+
"Access denied to savepoint location"),
616+
Arguments.of(
617+
version,
618+
"Savepoint metadata missing",
619+
"Savepoint metadata missing"));
620+
});
621+
}
622+
530623
static Stream<Arguments> savepointExceptionProvider() {
531624
return TestUtils.flinkVersions()
532625
.flatMap(

0 commit comments

Comments
 (0)