Skip to content

Commit 3690b0d

Browse files
committed
Addressing edge case restarting after the first deployment fails.
1 parent 1df7747 commit 3690b0d

File tree

7 files changed

+300
-179
lines changed

7 files changed

+300
-179
lines changed

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public void setup() {
8383
stateStore = new InMemoryAutoScalerStateStore<>();
8484
}
8585

86-
@Test
8786
void testMetricReporting() throws Exception {
8887
JobVertexID jobVertexID = new JobVertexID();
8988
JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, Map.of(), 1, 10));

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
/** Enumeration of the possible states of the blue/green transition. */
2121
public enum FlinkBlueGreenDeploymentState {
22+
23+
/**
24+
* We use this state while initializing for the first time, always with a "Blue" deployment
25+
* type.
26+
*/
27+
INITIALIZING_BLUE,
28+
2229
/** Identifies the system is running normally with a "Blue" deployment type. */
2330
ACTIVE_BLUE,
2431

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -102,45 +102,41 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
102102

103103
if (deploymentStatus == null) {
104104
deploymentStatus = new FlinkBlueGreenDeploymentStatus();
105-
setLastReconciledSpec(bgDeployment, deploymentStatus);
106-
return initiateDeployment(
107-
bgDeployment,
108-
deploymentStatus,
109-
DeploymentType.BLUE,
110-
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
111-
null,
112-
josdkContext,
113-
true);
105+
return patchStatusUpdateControl(
106+
bgDeployment,
107+
deploymentStatus,
108+
FlinkBlueGreenDeploymentState.INITIALIZING_BLUE,
109+
null)
110+
.rescheduleAfter(100);
114111
} else {
115-
FlinkBlueGreenDeployments deployments =
116-
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext);
117-
118112
switch (deploymentStatus.getBlueGreenState()) {
113+
case INITIALIZING_BLUE:
114+
return checkFirstDeployment(bgDeployment, josdkContext, deploymentStatus);
119115
case ACTIVE_BLUE:
120116
return checkAndInitiateDeployment(
121117
bgDeployment,
122-
deployments,
118+
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext),
123119
deploymentStatus,
124120
DeploymentType.BLUE,
125121
josdkContext);
126122
case ACTIVE_GREEN:
127123
return checkAndInitiateDeployment(
128124
bgDeployment,
129-
deployments,
125+
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext),
130126
deploymentStatus,
131127
DeploymentType.GREEN,
132128
josdkContext);
133129
case TRANSITIONING_TO_BLUE:
134130
return monitorTransition(
135131
bgDeployment,
136-
deployments,
132+
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext),
137133
deploymentStatus,
138134
DeploymentType.GREEN,
139135
josdkContext);
140136
case TRANSITIONING_TO_GREEN:
141137
return monitorTransition(
142138
bgDeployment,
143-
deployments,
139+
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext),
144140
deploymentStatus,
145141
DeploymentType.BLUE,
146142
josdkContext);
@@ -150,6 +146,28 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
150146
}
151147
}
152148

149+
private UpdateControl<FlinkBlueGreenDeployment> checkFirstDeployment(
150+
FlinkBlueGreenDeployment bgDeployment,
151+
Context<FlinkBlueGreenDeployment> josdkContext,
152+
FlinkBlueGreenDeploymentStatus deploymentStatus)
153+
throws JsonProcessingException {
154+
if (deploymentStatus.getLastReconciledSpec() == null
155+
|| hasSpecChanged(bgDeployment.getSpec(), deploymentStatus)) {
156+
// Ack the change in the spec (setLastReconciledSpec)
157+
setLastReconciledSpec(bgDeployment, deploymentStatus);
158+
return initiateDeployment(
159+
bgDeployment,
160+
deploymentStatus,
161+
DeploymentType.BLUE,
162+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
163+
null,
164+
josdkContext,
165+
true);
166+
} else {
167+
return UpdateControl.noUpdate();
168+
}
169+
}
170+
153171
private static void setAbortTimestamp(
154172
FlinkBlueGreenDeployment bgDeployment,
155173
FlinkBlueGreenDeploymentStatus deploymentStatus) {
@@ -204,7 +222,12 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
204222
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
205223
} else {
206224
return shouldAbort(
207-
bgDeployment, deploymentStatus, josdkContext, nextDeployment, nextState);
225+
bgDeployment,
226+
deploymentStatus,
227+
josdkContext,
228+
nextDeployment,
229+
nextState,
230+
deployments);
208231
}
209232
}
210233

@@ -246,7 +269,8 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
246269
FlinkBlueGreenDeploymentStatus deploymentStatus,
247270
Context<FlinkBlueGreenDeployment> josdkContext,
248271
FlinkDeployment nextDeployment,
249-
FlinkBlueGreenDeploymentState nextState) {
272+
FlinkBlueGreenDeploymentState nextState,
273+
FlinkBlueGreenDeployments deployments) {
250274

251275
String deploymentName = nextDeployment.getMetadata().getName();
252276
long abortTimestamp = deploymentStatus.getAbortTimestamp();
@@ -263,10 +287,17 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
263287

264288
// We indicate this Blue/Green deployment is no longer Transitioning
265289
// and rollback the state value
266-
var previousState =
267-
nextState == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
268-
? FlinkBlueGreenDeploymentState.ACTIVE_GREEN
269-
: FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
290+
FlinkBlueGreenDeploymentState previousState;
291+
if (deployments.getNumberOfDeployments() == 1) {
292+
previousState = FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
293+
} else if (deployments.getNumberOfDeployments() == 2) {
294+
previousState =
295+
nextState == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
296+
? FlinkBlueGreenDeploymentState.ACTIVE_GREEN
297+
: FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
298+
} else {
299+
throw new IllegalStateException("No blue/green FlinkDeployments found!");
300+
}
270301

271302
deploymentStatus.setBlueGreenState(previousState);
272303

@@ -447,7 +478,7 @@ public void logPotentialWarnings(
447478

448479
private static Savepoint configureSavepoint(
449480
FlinkResourceContext<FlinkDeployment> resourceContext) throws Exception {
450-
// TODO: if the user specified an initialSavepointPath, use it and skip this
481+
// TODO: if the user specified an initialSavepointPath, use it and skip this?
451482
Optional<Savepoint> lastCheckpoint =
452483
resourceContext
453484
.getFlinkService()
@@ -460,7 +491,7 @@ private static Savepoint configureSavepoint(
460491
.getJobId()),
461492
resourceContext.getObserveConfig());
462493

463-
// TODO: alternative action if no checkpoint is available?
494+
// Alternative action if no checkpoint is available?
464495
if (lastCheckpoint.isEmpty()) {
465496
throw new IllegalStateException(
466497
"Last Checkpoint for Job "
@@ -484,7 +515,8 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
484515

485516
setAbortTimestamp(bgDeployment, deploymentStatus);
486517

487-
return patchStatusUpdateControl(bgDeployment, deploymentStatus, nextState, null)
518+
return patchStatusUpdateControl(
519+
bgDeployment, deploymentStatus, nextState, JobStatus.RECONCILING)
488520
.rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
489521
}
490522

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@ class FlinkBlueGreenDeployments {
3434
private FlinkDeployment flinkDeploymentBlue;
3535
private FlinkDeployment flinkDeploymentGreen;
3636

37+
public int getNumberOfDeployments() {
38+
var counter = 0;
39+
if (flinkDeploymentBlue != null) {
40+
counter++;
41+
}
42+
if (flinkDeploymentGreen != null) {
43+
counter++;
44+
}
45+
return counter;
46+
}
47+
3748
static FlinkBlueGreenDeployments fromSecondaryResources(
3849
Context<FlinkBlueGreenDeployment> context) {
3950
Set<FlinkDeployment> secondaryResources =

0 commit comments

Comments
 (0)