Skip to content

Commit f46f248

Browse files
committed
If a spec change comes in mid transition, we apply it right away.
1 parent ec120f4 commit f46f248

File tree

3 files changed

+42
-21
lines changed

3 files changed

+42
-21
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3030
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
3131
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
32-
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils;
3332
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
3433
import org.apache.flink.util.Preconditions;
3534

@@ -49,13 +48,17 @@
4948
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateFlinkDeployment;
5049
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.fetchSavepointInfo;
5150
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.getLastCheckpoint;
51+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.getSpecDiff;
52+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.hasSpecChanged;
5253
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.isSavepointRequired;
5354
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.lookForCheckpoint;
5455
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.prepareFlinkDeployment;
56+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.setLastReconciledSpec;
5557
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.triggerSavepoint;
5658
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
5759
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
5860
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
61+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setAbortTimestamp;
5962

6063
/** Consolidated service for all Blue/Green deployment operations. */
6164
public class BlueGreenDeploymentService {
@@ -108,7 +111,7 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
108111
*/
109112
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
110113
BlueGreenContext context, DeploymentType currentDeploymentType) {
111-
BlueGreenDiffType specDiff = BlueGreenSpecUtils.getSpecDiff(context);
114+
BlueGreenDiffType specDiff = getSpecDiff(context);
112115

113116
if (specDiff != BlueGreenDiffType.IGNORE) {
114117
FlinkDeployment currentFlinkDeployment =
@@ -124,15 +127,15 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
124127
.rescheduleAfter(getReconciliationReschedInterval(context));
125128
}
126129

127-
BlueGreenSpecUtils.setLastReconciledSpec(context);
130+
setLastReconciledSpec(context);
128131
return startTransition(context, currentDeploymentType, currentFlinkDeployment);
129132
} else {
130-
BlueGreenSpecUtils.setLastReconciledSpec(context);
133+
setLastReconciledSpec(context);
131134
return patchFlinkDeployment(context, currentDeploymentType, specDiff);
132135
}
133136
} else {
134137
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
135-
BlueGreenSpecUtils.setLastReconciledSpec(context);
138+
setLastReconciledSpec(context);
136139
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
137140
}
138141
}
@@ -292,10 +295,10 @@ private FlinkBlueGreenDeploymentState calculateSavepointingState(DeploymentType
292295
public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
293296
BlueGreenContext context, DeploymentType currentDeploymentType) {
294297

295-
handleSpecChangesDuringTransition(context);
296-
297298
TransitionState transitionState = determineTransitionState(context, currentDeploymentType);
298299

300+
handleSpecChangesDuringTransition(context, transitionState);
301+
299302
if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
300303
return shouldWeDelete(
301304
context,
@@ -308,11 +311,27 @@ public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
308311
}
309312
}
310313

311-
private void handleSpecChangesDuringTransition(BlueGreenContext context) {
312-
if (BlueGreenSpecUtils.hasSpecChanged(context)) {
313-
BlueGreenSpecUtils.revertToLastSpec(context);
314-
LOG.warn(
315-
"Blue/Green Spec change detected during transition, ignored and reverted to the last reconciled spec");
314+
private void handleSpecChangesDuringTransition(
315+
BlueGreenContext context, TransitionState transitionState) {
316+
if (hasSpecChanged(context)) {
317+
BlueGreenDiffType diffType = getSpecDiff(context);
318+
319+
FlinkDeployment incomingFlinkDeployment = transitionState.nextDeployment;
320+
321+
if (diffType != BlueGreenDiffType.IGNORE) {
322+
// Apply the new spec changes to the currently active deployment
323+
incomingFlinkDeployment.setSpec(
324+
context.getBgDeployment().getSpec().getTemplate().getSpec());
325+
// Resetting the abort grace period
326+
setAbortTimestamp(context);
327+
// Ack'ing the spec change
328+
setLastReconciledSpec(context);
329+
updateFlinkDeployment(incomingFlinkDeployment, context);
330+
331+
LOG.info(
332+
"Blue/Green Spec change detected during transition, patching incoming '{}' deployment with new changes",
333+
incomingFlinkDeployment.getMetadata().getName());
334+
}
316335
}
317336
}
318337

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,15 @@ public BlueGreenDiffType compare() {
6565

6666
BlueGreenDiffType childDiffType = getChildSpecDiffType();
6767

68-
// If nested spec has SCALE or UPGRADE differences, return TRANSITION regardless
68+
// If nested spec has SCALE or UPGRADE differences, return TRANSITION
6969
if (childDiffType == BlueGreenDiffType.TRANSITION) {
7070
return BlueGreenDiffType.TRANSITION;
7171
}
7272

73-
// Determine result based on where differences are found
74-
boolean hasChildDiff = childDiffType != BlueGreenDiffType.IGNORE;
75-
76-
if (hasChildDiff) {
73+
if (childDiffType != BlueGreenDiffType.IGNORE) {
7774
// Child spec changes take precedence, return the child diff type
7875
return BlueGreenDiffType.PATCH_CHILD;
79-
}
80-
else {
76+
} else {
8177
return BlueGreenDiffType.IGNORE;
8278
}
8379
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
6969
import static org.junit.jupiter.api.Assertions.assertEquals;
7070
import static org.junit.jupiter.api.Assertions.assertFalse;
71+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
7172
import static org.junit.jupiter.api.Assertions.assertNotNull;
7273
import static org.junit.jupiter.api.Assertions.assertNull;
7374
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -344,12 +345,17 @@ public void verifySpecChangeDuringTransition(FlinkVersion flinkVersion) throws E
344345

345346
// Simulate a spec change before the transition is complete
346347
simulateChangeInSpec(rs.deployment, "MODIFIED_VALUE", 0, null);
348+
var moddedSpec = rs.deployment.getSpec();
347349
rs = reconcile(rs.deployment);
348350

349-
// The spec should have been reverted
350-
assertEquals(
351+
// The spec change should have been preserved
352+
assertNotEquals(
351353
SpecUtils.writeSpecAsJSON(originalSpec, "spec"),
352354
SpecUtils.writeSpecAsJSON(rs.deployment.getSpec(), "spec"));
355+
356+
assertEquals(
357+
SpecUtils.writeSpecAsJSON(moddedSpec, "spec"),
358+
rs.deployment.getStatus().getLastReconciledSpec(), "spec");
353359
}
354360

355361
@ParameterizedTest

0 commit comments

Comments
 (0)