Skip to content

Commit eb5b24f

Browse files
committed
Addressing PR comments. Adding unit test to cover for Spec changes mid-transition.
1 parent ba62f07 commit eb5b24f

File tree

4 files changed

+80
-68
lines changed

4 files changed

+80
-68
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/DeploymentType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2121

22-
/** Enumeration of the two possible Flink Blue/Green deployment types. */
22+
/**
23+
* Enumeration of the two possible Flink Blue/Green deployment types. Only one of each type will be
24+
* present at all times for a particular job.
25+
*/
2326
public enum DeploymentType {
27+
/** Identifier for the first or "Blue" deployment type. */
2428
BLUE,
29+
30+
/** Identifier for the second or "Green" deployment type. */
2531
GREEN;
2632

2733
public static DeploymentType fromDeployment(FlinkDeployment flinkDeployment) {

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@
1919

2020
/** Enumeration of the possible states of the blue/green transition. */
2121
public enum FlinkBlueGreenDeploymentState {
22+
/** Identifies the system is running normally with a "Blue" deployment type. */
2223
ACTIVE_BLUE,
24+
25+
/** Identifies the system is running normally with a "Green" deployment type. */
2326
ACTIVE_GREEN,
27+
28+
/** Identifies the system is transitioning from "Green" to "Blue". */
2429
TRANSITIONING_TO_BLUE,
30+
31+
/** Identifies the system is transitioning from "Blue" to "Green". */
2532
TRANSITIONING_TO_GREEN,
2633
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
101101

102102
if (deploymentStatus == null) {
103103
deploymentStatus = new FlinkBlueGreenDeploymentStatus();
104-
deploymentStatus.setLastReconciledSpec(
105-
SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec"));
104+
setLastReconciledSpec(flinkBlueGreenDeployment, deploymentStatus);
106105
return initiateDeployment(
107106
flinkBlueGreenDeployment,
108107
deploymentStatus,
@@ -115,12 +114,6 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
115114
FlinkBlueGreenDeployments deployments =
116115
FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext);
117116

118-
// TODO: if a new deployment request comes while in the middle of a transition it's
119-
// currently ignored, but the new spec remains changed, should we roll it back?
120-
// TODO: if we choose to leave a previously failed deployment 'running' for debug
121-
// purposes,
122-
// we should flag it somehow as 'ROLLED_BACK' to signal that it can be overriden by a
123-
// new deployment attempt.
124117
switch (deploymentStatus.getBlueGreenState()) {
125118
case ACTIVE_BLUE:
126119
return checkAndInitiateDeployment(
@@ -161,7 +154,21 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
161154
FlinkBlueGreenDeployments deployments,
162155
FlinkBlueGreenDeploymentStatus deploymentStatus,
163156
DeploymentType currentDeploymentType,
164-
Context<FlinkBlueGreenDeployment> josdkContext) {
157+
Context<FlinkBlueGreenDeployment> josdkContext)
158+
throws JsonProcessingException {
159+
160+
if (hasSpecChanged(bgDeployment.getSpec(), deploymentStatus, currentDeploymentType)) {
161+
// this means the spec was changed during transition,
162+
// ignore the new change, revert the spec and log as warning
163+
bgDeployment.setSpec(
164+
SpecUtils.deserializeObject(
165+
deploymentStatus.getLastReconciledSpec(),
166+
"spec",
167+
FlinkBlueGreenDeploymentSpec.class));
168+
josdkContext.getClient().resource(bgDeployment).replace();
169+
LOG.warn(
170+
"Blue/Green Spec change detected during transition, ignored and reverted to the last reconciled spec");
171+
}
165172

166173
var nextState = FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
167174
FlinkDeployment currentDeployment;
@@ -185,18 +192,8 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
185192

186193
if (isDeploymentReady(nextDeployment, josdkContext, deploymentStatus)) {
187194
return deleteAndFinalize(
188-
bgDeployment,
189-
deploymentStatus,
190-
currentDeploymentType,
191-
josdkContext,
192-
currentDeployment,
193-
nextState);
195+
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
194196
} else {
195-
// This phase requires rescheduling the reconciliation because the pod initialization
196-
// could get stuck
197-
// (e.g. waiting for resources)
198-
// TODO: figure out the course of action for error/failure cases
199-
200197
int maxNumRetries = bgDeployment.getSpec().getTemplate().getMaxNumRetries();
201198
if (maxNumRetries <= 0) {
202199
maxNumRetries = DEFAULT_MAX_NUM_RETRIES;
@@ -242,7 +239,6 @@ private static int getReconciliationReschedInterval(FlinkBlueGreenDeployment bgD
242239
private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
243240
FlinkBlueGreenDeployment bgDeployment,
244241
FlinkBlueGreenDeploymentStatus deploymentStatus,
245-
DeploymentType currentDeploymentType,
246242
Context<FlinkBlueGreenDeployment> josdkContext,
247243
FlinkDeployment currentDeployment,
248244
FlinkBlueGreenDeploymentState nextState) {
@@ -251,11 +247,6 @@ private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
251247
deleteDeployment(currentDeployment, josdkContext);
252248
return UpdateControl.noUpdate();
253249
} else {
254-
deploymentStatus.setLastReconciledSpec(
255-
SpecUtils.serializeObject(bgDeployment.getSpec(), "spec"));
256-
257-
// TODO: Set the new child job STATUS to RUNNING too
258-
259250
return patchStatusUpdateControl(
260251
bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING, false);
261252
}
@@ -272,6 +263,9 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
272263
if (hasSpecChanged(
273264
flinkBlueGreenDeployment.getSpec(), deploymentStatus, currentDeploymentType)) {
274265

266+
// Ack the change in the spec (setLastReconciledSpec)
267+
setLastReconciledSpec(flinkBlueGreenDeployment, deploymentStatus);
268+
275269
FlinkDeployment currentFlinkDeployment =
276270
DeploymentType.BLUE == currentDeploymentType
277271
? deployments.getFlinkDeploymentBlue()
@@ -286,10 +280,6 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
286280
FlinkResourceContext<FlinkDeployment> resourceContext =
287281
ctxFactory.getResourceContext(currentFlinkDeployment, josdkContext);
288282

289-
// TODO: this operation is already done by hasSpecChanged() above, dedup later
290-
String serializedSpec =
291-
SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec");
292-
293283
// Updating status
294284
if (DeploymentType.BLUE == currentDeploymentType) {
295285
nextState = FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
@@ -323,6 +313,14 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
323313
return UpdateControl.noUpdate();
324314
}
325315

316+
private static void setLastReconciledSpec(
317+
FlinkBlueGreenDeployment flinkBlueGreenDeployment,
318+
FlinkBlueGreenDeploymentStatus deploymentStatus) {
319+
deploymentStatus.setLastReconciledSpec(
320+
SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec"));
321+
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
322+
}
323+
326324
public void logPotentialWarnings(
327325
FlinkDeployment flinkDeployment,
328326
Context<FlinkBlueGreenDeployment> josdkContext,
@@ -345,7 +343,7 @@ public void logPotentialWarnings(
345343
LOG.warn("Deployment not healthy, some Pods have the following status: " + podPhases);
346344
}
347345

348-
List<Event> badEvents =
346+
List<Event> abnormalEvents =
349347
josdkContext
350348
.getClient()
351349
.v1()
@@ -373,8 +371,8 @@ public void logPotentialWarnings(
373371
.contains(p)))
374372
.collect(Collectors.toList());
375373

376-
if (!badEvents.isEmpty()) {
377-
LOG.warn("Bad events detected: " + badEvents);
374+
if (!abnormalEvents.isEmpty()) {
375+
LOG.warn("Abnormal events detected: " + abnormalEvents);
378376
}
379377
}
380378

@@ -393,9 +391,7 @@ private static Savepoint configureSavepoint(
393391
.getJobId()),
394392
resourceContext.getObserveConfig());
395393

396-
// TODO 1: check the last CP age with the logic from
397-
// AbstractJobReconciler.changeLastStateIfCheckpointTooOld
398-
// TODO 2: if no checkpoint is available, take a savepoint? throw error?
394+
// TODO: alternative action if no checkpoint is available?
399395
if (lastCheckpoint.isEmpty()) {
400396
throw new IllegalStateException(
401397
"Last Checkpoint for Job "
@@ -422,8 +418,6 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
422418
josdkContext,
423419
isFirstDeployment);
424420

425-
// TODO: set child job status to JobStatus.INITIALIZING
426-
427421
return patchStatusUpdateControl(
428422
flinkBlueGreenDeployment,
429423
deploymentStatus,
@@ -439,8 +433,7 @@ private boolean isDeploymentReady(
439433
FlinkBlueGreenDeploymentStatus deploymentStatus) {
440434
if (ResourceLifecycleState.STABLE == deployment.getStatus().getLifecycleState()
441435
&& JobStatus.RUNNING == deployment.getStatus().getJobStatus().getState()) {
442-
// TODO: verify, e.g. will pods be "pending" after the FlinkDeployment is RUNNING and
443-
// STABLE?
436+
// TODO: checking for running pods seems to be redundant, check if this can be removed
444437
int notRunningPods =
445438
(int)
446439
getDeploymentPods(josdkContext, deployment)
@@ -501,7 +494,6 @@ private UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
501494
deploymentStatus.getJobStatus().setState(jobState);
502495
}
503496

504-
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
505497
flinkBlueGreenDeployment.setStatus(deploymentStatus);
506498
return UpdateControl.patchStatus(flinkBlueGreenDeployment);
507499
}
@@ -553,9 +545,6 @@ private void deploy(
553545

554546
private static void deleteDeployment(
555547
FlinkDeployment currentDeployment, Context<FlinkBlueGreenDeployment> josdkContext) {
556-
// TODO: This gets called multiple times, check to see if it's already in a TERMINATING
557-
// state
558-
// (or only execute if RUNNING)
559548
List<StatusDetails> deletedStatus =
560549
josdkContext
561550
.getClient()

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ public void verifyBasicDeployment(FlinkVersion flinkVersion) throws Exception {
107107
var blueGreenDeployment =
108108
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
109109

110-
// 1. Initiate the Green deployment
110+
// 1. Initiate the Blue deployment
111111
var bgSpecBefore = blueGreenDeployment.getSpec();
112112
Long minReconciliationTs = System.currentTimeMillis() - 1;
113113
var rs = reconcile(blueGreenDeployment);
114114

115-
assertSpec(rs, minReconciliationTs);
115+
assertTrue(rs.updateControl.isPatchStatus());
116+
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
116117

117118
// check the status (reconciled spec, reconciled ts, a/b state)
118119
assertEquals(
@@ -129,12 +130,10 @@ public void verifyBasicDeployment(FlinkVersion flinkVersion) throws Exception {
129130

130131
simulateSubmitAndSuccessfulJobStart(deploymentA);
131132

132-
// 2. Finalize the Green deployment
133-
minReconciliationTs = System.currentTimeMillis() - 1;
133+
// 2. Finalize the Blue deployment
134134
rs = reconcile(rs.deployment);
135135

136-
assertSpec(rs, minReconciliationTs);
137-
136+
assertTrue(rs.updateControl.isPatchStatus());
138137
assertEquals(
139138
SpecUtils.serializeObject(bgSpecBefore, "spec"),
140139
rs.reconciledStatus.getLastReconciledSpec());
@@ -182,7 +181,8 @@ private void testTransitionToGreen(
182181
var flinkDeployments = getFlinkDeployments();
183182
var greenDeploymentName = flinkDeployments.get(1).getMetadata().getName();
184183

185-
assertSpec(rs, minReconciliationTs);
184+
assertTrue(rs.updateControl.isPatchStatus());
185+
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
186186

187187
assertEquals(2, flinkDeployments.size());
188188
assertNull(flinkDeployments.get(0).getSpec().getJob().getInitialSavepointPath());
@@ -211,12 +211,9 @@ private void testTransitionToGreen(
211211
assertEquals(1, flinkDeployments.size());
212212
assertEquals(greenDeploymentName, flinkDeployments.get(0).getMetadata().getName());
213213

214-
minReconciliationTs = System.currentTimeMillis() - 1;
215214
rs = reconcile(rs.deployment);
216215

217-
// Spec should still be the new one
218-
assertSpec(rs, minReconciliationTs);
219-
216+
assertTrue(rs.updateControl.isPatchStatus());
220217
assertNotNull(rs.reconciledStatus.getLastReconciledSpec());
221218
assertEquals(
222219
SpecUtils.serializeObject(bgUpdatedSpec, "spec"),
@@ -252,7 +249,8 @@ public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exce
252249
var minReconciliationTs = System.currentTimeMillis() - 1;
253250
rs = reconcile(rs.deployment);
254251

255-
assertSpec(rs, minReconciliationTs);
252+
assertTrue(rs.updateControl.isPatchStatus());
253+
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
256254

257255
// Assert job status/state is left the way it is and that the Blue job never got submitted
258256
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
@@ -315,7 +313,6 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
315313

316314
// 4a. Simulating the Blue deployment doesn't start correctly (status will remain the same)
317315
// Asserting the status retry count is incremented by 1
318-
long lastTs = System.currentTimeMillis();
319316
for (int i = 1; i <= maxNumRetries; i++) {
320317
Thread.sleep(1);
321318
rs = reconcile(rs.deployment);
@@ -326,16 +323,13 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
326323
reconciliationReschedulingIntervalMs,
327324
rs.updateControl.getScheduleDelay().get());
328325
assertEquals(i, rs.reconciledStatus.getNumRetries());
329-
assertTrue(rs.reconciledStatus.getLastReconciledTimestamp() > lastTs);
330-
lastTs = rs.reconciledStatus.getLastReconciledTimestamp();
331326
System.out.println();
332327
}
333328

334329
// 4b. After the retries are exhausted
335-
var minReconciliationTs = System.currentTimeMillis() - 1;
336330
rs = reconcile(rs.deployment);
337331

338-
assertSpec(rs, minReconciliationTs);
332+
assertTrue(rs.updateControl.isPatchStatus());
339333

340334
// The first job should be RUNNING, the second should be SUSPENDED
341335
assertEquals(JobStatus.FAILING, rs.reconciledStatus.getJobStatus().getState());
@@ -361,17 +355,33 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
361355

362356
// 6. Initiate the redeployment
363357
var bgUpdatedSpec = rs.deployment.getSpec();
364-
minReconciliationTs = System.currentTimeMillis() - 1;
358+
var minReconciliationTs = System.currentTimeMillis() - 1;
365359
rs = reconcile(rs.deployment);
366360

367361
testTransitionToGreen(rs, minReconciliationTs, customValue, bgUpdatedSpec);
368362
}
369363

370-
private static void assertSpec(
371-
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs,
372-
long minReconciliationTs) {
373-
assertTrue(rs.updateControl.isPatchStatus());
374-
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
364+
@ParameterizedTest
365+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
366+
public void verifySpecChangeDuringTransition(FlinkVersion flinkVersion) throws Exception {
367+
var blueGreenDeployment =
368+
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
369+
370+
// 1. Initiate the Blue deployment
371+
var originalSpec = blueGreenDeployment.getSpec();
372+
var rs = reconcile(blueGreenDeployment);
373+
374+
// 2. Job starting...
375+
simulateSubmitAndSuccessfulJobStart(getFlinkDeployments().get(0));
376+
377+
// 3. Simulate a spec change before the transition is complete
378+
simulateChangeInSpec(rs.deployment, "MODIFIED_VALUE");
379+
rs = reconcile(rs.deployment);
380+
381+
// The spec should have been reverted
382+
assertEquals(
383+
SpecUtils.serializeObject(originalSpec, "spec"),
384+
SpecUtils.serializeObject(rs.deployment.getSpec(), "spec"));
375385
}
376386

377387
private void simulateChangeInSpec(

0 commit comments

Comments
 (0)