Skip to content

Commit 087c211

Browse files
committed
Adding an error property to the FlinkBlueGreenDeploymentStatus to capture failure descriptions.
1 parent 4537f04 commit 087c211

File tree

6 files changed

+52
-25
lines changed

6 files changed

+52
-25
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,7 @@ public class FlinkBlueGreenDeploymentStatus {
5757
* Persisted triggerId to track transition with savepoint. Only used with UpgradeMode.SAVEPOINT
5858
*/
5959
private String savepointTriggerId;
60+
61+
/** Error information about the FlinkBlueGreenDeployment. */
62+
private String error;
6063
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
111111
null,
112112
ctxFactory);
113113
return BlueGreenDeploymentService.patchStatusUpdateControl(
114-
context, INITIALIZING_BLUE, null)
114+
context, INITIALIZING_BLUE, null, null)
115115
.rescheduleAfter(0);
116116
} else {
117117
FlinkBlueGreenDeploymentState currentState = deploymentStatus.getBlueGreenState();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
9696

9797
BlueGreenUtils.setAbortTimestamp(context);
9898

99-
return patchStatusUpdateControl(context, nextState, JobStatus.RECONCILING)
99+
return patchStatusUpdateControl(context, nextState, JobStatus.RECONCILING, null)
100100
.rescheduleAfter(BlueGreenUtils.getReconciliationReschedInterval(context));
101101
}
102102

@@ -122,7 +122,7 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
122122
// reprocessing the TRANSITION once savepoint creation completes
123123
var savepointingState =
124124
calculateSavepointingState(currentBlueGreenDeploymentType);
125-
return patchStatusUpdateControl(context, savepointingState, null)
125+
return patchStatusUpdateControl(context, savepointingState, null, null)
126126
.rescheduleAfter(getReconciliationReschedInterval(context));
127127
}
128128

@@ -138,15 +138,16 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
138138
}
139139
} else {
140140
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
141-
LOG.warn(
142-
"Transition to {} not possible, current Flink Deployment '{}' is not READY. FAILING '{}'",
143-
calculateTransition(currentBlueGreenDeploymentType)
144-
.nextBlueGreenDeploymentType,
145-
currentFlinkDeployment.getMetadata().getName(),
146-
context.getBgDeployment().getMetadata().getName());
147-
141+
var error =
142+
String.format(
143+
"Transition to %s not possible, current Flink Deployment '%s' is not READY. FAILING '%s'",
144+
calculateTransition(currentBlueGreenDeploymentType)
145+
.nextBlueGreenDeploymentType,
146+
currentFlinkDeployment.getMetadata().getName(),
147+
context.getBgDeployment().getMetadata().getName());
148+
LOG.error(error);
148149
setLastReconciledSpec(context);
149-
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
150+
return patchStatusUpdateControl(context, null, JobStatus.FAILING, error);
150151
}
151152
}
152153
}
@@ -450,7 +451,7 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldWeDelete(
450451
deploymentDeletionDelayMs / 1000);
451452

452453
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
453-
return patchStatusUpdateControl(context, null, null)
454+
return patchStatusUpdateControl(context, null, null, null)
454455
.rescheduleAfter(deploymentDeletionDelayMs);
455456
}
456457

@@ -520,7 +521,7 @@ private UpdateControl<FlinkBlueGreenDeployment> retryDeployment(
520521
deploymentName,
521522
delay / 1000);
522523

523-
return patchStatusUpdateControl(context, null, null).rescheduleAfter(delay);
524+
return patchStatusUpdateControl(context, null, null, null).rescheduleAfter(delay);
524525
}
525526

526527
private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
@@ -535,12 +536,13 @@ private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
535536
getPreviousState(nextState, context.getDeployments());
536537
context.getDeploymentStatus().setBlueGreenState(previousState);
537538

538-
LOG.warn(
539-
"Aborting deployment '{}', rolling B/G deployment back to {}",
540-
deploymentName,
541-
previousState);
539+
var error =
540+
String.format(
541+
"Aborting deployment '%s', rolling B/G deployment back to %s",
542+
deploymentName, previousState);
543+
LOG.warn(error);
542544

543-
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
545+
return patchStatusUpdateControl(context, null, JobStatus.FAILING, error);
544546
}
545547

546548
private static FlinkBlueGreenDeploymentState getPreviousState(
@@ -577,15 +579,16 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
577579
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
578580
context.getDeploymentStatus().setSavepointTriggerId(null);
579581

580-
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING);
582+
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null);
581583
}
582584

583585
// ==================== Common Utility Methods ====================
584586

585587
public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
586588
BlueGreenContext context,
587589
FlinkBlueGreenDeploymentState deploymentState,
588-
JobStatus jobState) {
590+
JobStatus jobState,
591+
String error) {
589592

590593
var deploymentStatus = context.getDeploymentStatus();
591594
var flinkBlueGreenDeployment = context.getBgDeployment();
@@ -598,6 +601,14 @@ public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
598601
deploymentStatus.getJobStatus().setState(jobState);
599602
}
600603

604+
if (jobState == JobStatus.FAILING) {
605+
deploymentStatus.setError(error);
606+
}
607+
608+
if (jobState == JobStatus.RECONCILING || jobState == JobStatus.RUNNING) {
609+
deploymentStatus.setError(null);
610+
}
611+
601612
deploymentStatus.setLastReconciledTimestamp(java.time.Instant.now().toString());
602613
flinkBlueGreenDeployment.setStatus(deploymentStatus);
603614
return UpdateControl.patchStatus(flinkBlueGreenDeployment);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context)
4848
getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE
4949
? FlinkBlueGreenDeploymentState.ACTIVE_BLUE
5050
: FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
51-
return patchStatusUpdateControl(context, nextState, null).rescheduleAfter(500);
51+
return patchStatusUpdateControl(context, nextState, null, null).rescheduleAfter(0);
5252
}
5353

5454
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate()

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exce
212212
minReconciliationTs
213213
< instantStrToMillis(rs.reconciledStatus.getLastReconciledTimestamp()));
214214

215-
// Assert job status/state is left the way it is and that the Blue job never got submitted
216-
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
215+
assertFailingJobStatus(rs);
216+
217217
var flinkDeployments = getFlinkDeployments();
218218
assertEquals(1, flinkDeployments.size());
219219
assertEquals(
@@ -297,7 +297,7 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
297297
assertTrue(rs.updateControl.isPatchStatus());
298298

299299
// The first job should be RUNNING, the second should be SUSPENDED
300-
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
300+
assertFailingJobStatus(rs);
301301
// No longer TRANSITIONING_TO_GREEN and rolled back to ACTIVE_BLUE
302302
assertEquals(
303303
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
@@ -377,7 +377,8 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
377377

378378
rs = reconcile(rs.deployment);
379379

380-
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
380+
assertFailingJobStatus(rs);
381+
381382
// No longer TRANSITIONING_TO_GREEN and rolled back to INITIALIZING_BLUE
382383
assertEquals(
383384
FlinkBlueGreenDeploymentState.INITIALIZING_BLUE,
@@ -397,6 +398,8 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
397398
// Resubmitting should re-start the Initialization to Blue
398399
rs = reconcile(rs.deployment);
399400

401+
// Any error should've been cleaned up
402+
assertNull(rs.reconciledStatus.getError());
400403
assertTrue(rs.updateControl.isPatchStatus());
401404
assertTrue(
402405
rs.updateControl.getScheduleDelay().isPresent()
@@ -678,6 +681,12 @@ private void assertTransitioningState(
678681
assertEquals(JobStatus.RECONCILING, rs.reconciledStatus.getJobStatus().getState());
679682
}
680683

684+
private static void assertFailingJobStatus(
685+
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs) {
686+
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
687+
assertNotNull(rs.reconciledStatus.getError());
688+
}
689+
681690
private void assertFinalized(
682691
long minReconciliationTs,
683692
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs,
@@ -812,6 +821,8 @@ private void testTransitionToGreen(
812821
var flinkDeployments = getFlinkDeployments();
813822
var greenDeploymentName = flinkDeployments.get(1).getMetadata().getName();
814823

824+
// Any error should've been cleaned up
825+
assertNull(rs.reconciledStatus.getError());
815826
assertTrue(rs.updateControl.isPatchStatus());
816827
assertTrue(
817828
minReconciliationTs

helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10770,6 +10770,8 @@ spec:
1077010770
type: string
1077110771
deploymentReadyTimestamp:
1077210772
type: string
10773+
error:
10774+
type: string
1077310775
jobStatus:
1077410776
properties:
1077510777
checkpointInfo:

0 commit comments

Comments
 (0)