Skip to content

Commit 3bd0f18

Browse files
committed
Handling exceptions when triggering a savepoint.
1 parent 087c211 commit 3bd0f18

File tree

4 files changed

+149
-9
lines changed

4 files changed

+149
-9
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,15 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
117117

118118
if (isFlinkDeploymentReady(currentFlinkDeployment)) {
119119
if (specDiff == BlueGreenDiffType.TRANSITION) {
120-
if (handleSavepoint(context, currentFlinkDeployment)) {
120+
boolean savepointTriggered = false;
121+
try {
122+
savepointTriggered = handleSavepoint(context, currentFlinkDeployment);
123+
} catch (Exception e) {
124+
var error = "Could not trigger Savepoint. Details: " + e.getMessage();
125+
return markDeploymentFailing(context, error);
126+
}
127+
128+
if (savepointTriggered) {
121129
// Spec is intentionally not marked as reconciled here to allow
122130
// reprocessing the TRANSITION once savepoint creation completes
123131
var savepointingState =
@@ -138,16 +146,15 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
138146
}
139147
} else {
140148
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
149+
setLastReconciledSpec(context);
141150
var error =
142151
String.format(
143152
"Transition to %s not possible, current Flink Deployment '%s' is not READY. FAILING '%s'",
144153
calculateTransition(currentBlueGreenDeploymentType)
145154
.nextBlueGreenDeploymentType,
146155
currentFlinkDeployment.getMetadata().getName(),
147156
context.getBgDeployment().getMetadata().getName());
148-
LOG.error(error);
149-
setLastReconciledSpec(context);
150-
return patchStatusUpdateControl(context, null, JobStatus.FAILING, error);
157+
return markDeploymentFailing(context, error);
151158
}
152159
}
153160
}
@@ -310,7 +317,7 @@ private static Savepoint getSavepointObject(
310317
}
311318

312319
private boolean handleSavepoint(
313-
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) {
320+
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) throws Exception {
314321

315322
if (!isSavepointRequired(context)) {
316323
return false;
@@ -540,8 +547,13 @@ private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
540547
String.format(
541548
"Aborting deployment '%s', rolling B/G deployment back to %s",
542549
deploymentName, previousState);
543-
LOG.warn(error);
550+
return markDeploymentFailing(context, error);
551+
}
544552

553+
@NotNull
554+
private static UpdateControl<FlinkBlueGreenDeployment> markDeploymentFailing(
555+
BlueGreenContext context, String error) {
556+
LOG.error(error);
545557
return patchStatusUpdateControl(context, null, JobStatus.FAILING, error);
546558
}
547559

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.flink.util.Preconditions;
4040

4141
import io.fabric8.kubernetes.api.model.ObjectMeta;
42-
import lombok.SneakyThrows;
4342
import org.slf4j.Logger;
4443
import org.slf4j.LoggerFactory;
4544

@@ -259,8 +258,8 @@ public static boolean lookForCheckpoint(BlueGreenContext context) {
259258
return previousUpgradeMode == nextUpgradeMode && nextUpgradeMode == UpgradeMode.LAST_STATE;
260259
}
261260

262-
@SneakyThrows
263-
public static String triggerSavepoint(FlinkResourceContext<FlinkDeployment> ctx) {
261+
public static String triggerSavepoint(FlinkResourceContext<FlinkDeployment> ctx)
262+
throws Exception {
264263
var jobId = ctx.getResource().getStatus().getJobStatus().getJobId();
265264
var conf = ctx.getObserveConfig();
266265
var savepointFormatType =

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public class TestingFlinkService extends AbstractFlinkService {
135135
@Setter private boolean deployFailure = false;
136136
@Setter private Exception makeItFailWith;
137137
@Setter private boolean triggerSavepointFailure = false;
138+
@Setter private Exception savepointTriggerException = null;
138139
@Setter private boolean disposeSavepointFailure = false;
139140
@Setter private Runnable sessionJobSubmittedCallback;
140141
@Setter private PodList podList = new PodList();
@@ -370,6 +371,9 @@ public String triggerSavepoint(
370371
String savepointDirectory,
371372
Configuration conf)
372373
throws Exception {
374+
if (savepointTriggerException != null) {
375+
throw savepointTriggerException;
376+
}
373377
if (triggerSavepointFailure) {
374378
throw new Exception(SNAPSHOT_ERROR_MESSAGE);
375379
}
@@ -769,4 +773,12 @@ public void addExceptionHistory(
769773
new JobExceptionsInfoWithHistory(exceptionHistory);
770774
jobExceptionsMap.put(jobId, newExceptionHistory);
771775
}
776+
777+
public void setSavepointTriggerException(Exception exception) {
778+
this.savepointTriggerException = exception;
779+
}
780+
781+
public void clearSavepointTriggerException() {
782+
this.savepointTriggerException = null;
783+
}
772784
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.junit.jupiter.params.provider.Arguments;
5353
import org.junit.jupiter.params.provider.MethodSource;
5454

55+
import java.io.IOException;
5556
import java.time.Instant;
5657
import java.util.HashMap;
5758
import java.util.List;
@@ -149,11 +150,32 @@ public void verifyBasicTransition(
149150
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult handleSavepoint(
150151
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs)
151152
throws Exception {
153+
return handleSavepointWithFailure(rs, null);
154+
}
155+
156+
@NotNull
157+
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
158+
handleSavepointWithFailure(
159+
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs,
160+
Exception expectedException)
161+
throws Exception {
162+
163+
if (expectedException != null) {
164+
flinkService.setSavepointTriggerException(expectedException);
165+
}
166+
152167
var triggers = flinkService.getSavepointTriggers();
153168
triggers.clear();
154169

155170
rs = reconcile(rs.deployment);
156171

172+
if (expectedException != null) {
173+
// Should fail immediately without entering savepointing state
174+
assertFailingJobStatus(rs);
175+
return rs;
176+
}
177+
178+
// Continue with existing successful savepoint logic...
157179
// Simulating a pending savepoint
158180
triggers.put(rs.deployment.getStatus().getSavepointTriggerId(), false);
159181

@@ -433,8 +455,103 @@ public void verifyPatchScenario(FlinkVersion flinkVersion, PatchTestCase testCas
433455
}
434456
}
435457

458+
@ParameterizedTest
459+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
460+
public void verifySavepointFailureRecovery(FlinkVersion flinkVersion) throws Exception {
461+
var blueGreenDeployment =
462+
buildSessionCluster(
463+
TEST_DEPLOYMENT_NAME,
464+
TEST_NAMESPACE,
465+
flinkVersion,
466+
null,
467+
UpgradeMode.LAST_STATE);
468+
469+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);
470+
471+
// First attempt: Configure service to throw exception
472+
flinkService.setSavepointTriggerException(
473+
new IllegalStateException("Job not in valid state for savepoint"));
474+
475+
String customValue = UUID.randomUUID().toString();
476+
simulateChangeInSpec(rs.deployment, customValue, 0, null);
477+
478+
// Should fail with savepoint error
479+
rs = reconcile(rs.deployment);
480+
assertFailingJobStatus(rs);
481+
assertTrue(rs.reconciledStatus.getError().contains("Job not in valid state for savepoint"));
482+
483+
// Recovery: Clear the exception and try again with new spec change
484+
flinkService.clearSavepointTriggerException();
485+
customValue = UUID.randomUUID().toString() + "_recovery";
486+
simulateChangeInSpec(rs.deployment, customValue, ALT_DELETION_DELAY_VALUE, null);
487+
488+
// Should now succeed and trigger savepoint properly
489+
rs = handleSavepoint(rs);
490+
491+
// Continue with successful transition
492+
testTransitionToGreen(rs, customValue, "savepoint_1");
493+
}
494+
495+
@ParameterizedTest
496+
@MethodSource("savepointExceptionProvider")
497+
public void verifySavepointFailureWithDifferentExceptionTypes(
498+
FlinkVersion flinkVersion, Exception savepointException, String expectedErrorFragment)
499+
throws Exception {
500+
501+
var blueGreenDeployment =
502+
buildSessionCluster(
503+
TEST_DEPLOYMENT_NAME,
504+
TEST_NAMESPACE,
505+
flinkVersion,
506+
null,
507+
UpgradeMode.SAVEPOINT);
508+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);
509+
510+
flinkService.setSavepointTriggerException(savepointException);
511+
simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0, null);
512+
513+
rs = reconcile(rs.deployment);
514+
515+
assertFailingJobStatus(rs);
516+
assertTrue(rs.reconciledStatus.getError().contains("Could not trigger Savepoint"));
517+
assertTrue(rs.reconciledStatus.getError().contains(expectedErrorFragment));
518+
519+
// Should remain in ACTIVE_BLUE state (no transition started)
520+
assertEquals(
521+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
522+
523+
// Verify only Blue deployment exists (Green was never created)
524+
var flinkDeployments = getFlinkDeployments();
525+
assertEquals(1, flinkDeployments.size());
526+
}
527+
436528
// ==================== Parameterized Test Inputs ====================
437529

530+
static Stream<Arguments> savepointExceptionProvider() {
531+
return TestUtils.flinkVersions()
532+
.flatMap(
533+
flinkVersionArgs -> {
534+
FlinkVersion version = (FlinkVersion) flinkVersionArgs.get()[0];
535+
return Stream.of(
536+
Arguments.of(
537+
version,
538+
new IOException("Network timeout"),
539+
"Network timeout"),
540+
Arguments.of(
541+
version,
542+
new IllegalStateException("Job not running"),
543+
"Job not running"),
544+
Arguments.of(
545+
version,
546+
new RuntimeException("Service unavailable"),
547+
"Service unavailable"),
548+
Arguments.of(
549+
version,
550+
new Exception("Generic error"),
551+
"Generic error"));
552+
});
553+
}
554+
438555
static Stream<Arguments> patchScenarioProvider() {
439556
// Extract FlinkVersions from TestUtils and combine with PatchTypes
440557
return TestUtils.flinkVersions()

0 commit comments

Comments
 (0)