Skip to content

Commit d1098bb

Browse files
committed
Triggering a full transition only when needed, otherwise just patch the child FlinkDeployment. Unit test added and simplified assertions.
1 parent 79d75bb commit d1098bb

File tree

6 files changed

+194
-80
lines changed

6 files changed

+194
-80
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ public enum BlueGreenDiffType {
2525
/** Changes that require a Blue/Green transition. */
2626
TRANSITION,
2727

28-
/** Changes that only affect the top-level configuration. */
29-
PATCH_TOP_LEVEL,
30-
3128
/** Changes that only affect the child FlinkDeploymentSpec. */
3229
PATCH_CHILD,
3330

31+
// TODO: the PATCH_TOP_LEVEL and PATCH_BOTH values are redundant, eliminate
32+
/** Changes that only affect the top-level configuration. */
33+
PATCH_TOP_LEVEL,
34+
3435
/** Changes that affect both top-level and child specifications. */
3536
PATCH_BOTH
3637
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,25 @@
4545
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
4646
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
4747

48-
/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
48+
/**
49+
* Controller that runs the main reconcile loop for Flink Blue/Green deployments.
50+
*
51+
* <p>State Machine Flow
52+
*
53+
* <p>Deployment States
54+
*
55+
* <p>1. INITIALIZING_BLUE - First-time deployment setup 2. ACTIVE_BLUE - Blue environment serving
56+
* traffic, monitoring for updates 3. TRANSITIONING_TO_GREEN - Deploying Green environment while
57+
* Blue serves traffic 4. ACTIVE_GREEN - Green environment serving traffic, monitoring for updates
58+
* 5. TRANSITIONING_TO_BLUE - Deploying Blue environment while Green serves traffic
59+
*
60+
* <p>Orchestration Process
61+
*
62+
* <p>FlinkBlueGreenDeploymentController.reconcile() ↓ 1. Create BlueGreenContext with current
63+
* deployment state ↓ 2. Query StateHandlerRegistry for appropriate handler ↓ 3. Delegate to
64+
* specific StateHandler.handle(context) ↓ 4. StateHandler invokes BlueGreenDeploymentService
65+
* operations ↓ 5. Return UpdateControl with next reconciliation schedule
66+
*/
4967
@ControllerConfiguration
5068
public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueGreenDeployment> {
5169

@@ -118,7 +136,6 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
118136

119137
BlueGreenStateHandler handler = handlerRegistry.getHandler(currentState);
120138
return handler.handle(context);
121-
// return stateMachine.processState(context);
122139
}
123140
}
124141

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.common.JobStatus;
2121
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2222
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
23+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
2324
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
2425
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2526
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
@@ -29,14 +30,20 @@
2930
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
3031
import org.apache.flink.util.Preconditions;
3132

33+
import io.fabric8.kubernetes.api.model.ObjectMeta;
3234
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

3638
import java.time.Instant;
3739

40+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deleteFlinkDeployment;
41+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster;
3842
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady;
43+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment;
44+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateFlinkDeployment;
3945
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.configureSavepoint;
46+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.prepareFlinkDeployment;
4047
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
4148
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
4249

@@ -64,9 +71,17 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
6471
FlinkBlueGreenDeploymentState nextState,
6572
Savepoint lastCheckpoint,
6673
boolean isFirstDeployment) {
74+
ObjectMeta bgMeta = context.getBgDeployment().getMetadata();
6775

68-
BlueGreenKubernetesService.deployCluster(
69-
context, nextDeploymentType, lastCheckpoint, isFirstDeployment);
76+
FlinkDeployment flinkDeployment =
77+
prepareFlinkDeployment(
78+
context.getBgDeployment(),
79+
nextDeploymentType,
80+
lastCheckpoint,
81+
isFirstDeployment,
82+
bgMeta);
83+
84+
deployCluster(context, flinkDeployment);
7085

7186
BlueGreenUtils.setAbortTimestamp(context);
7287

@@ -83,29 +98,20 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
8398
*/
8499
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
85100
BlueGreenContext context, DeploymentType currentDeploymentType) {
101+
BlueGreenDiffType specDiff = BlueGreenSpecUtils.getSpecDiff(context);
86102

87-
if (BlueGreenSpecUtils.hasSpecChanged(context)) {
103+
if (specDiff != BlueGreenDiffType.IGNORE) {
88104
BlueGreenSpecUtils.setLastReconciledSpec(context);
89105

90106
FlinkDeployment currentFlinkDeployment =
91107
context.getDeploymentByType(currentDeploymentType);
92108

93-
if (BlueGreenKubernetesService.isFlinkDeploymentReady(currentFlinkDeployment)) {
94-
DeploymentTransition transition = calculateTransition(currentDeploymentType);
95-
96-
FlinkResourceContext<FlinkDeployment> resourceContext =
97-
context.getCtxFactory()
98-
.getResourceContext(
99-
currentFlinkDeployment, context.getJosdkContext());
100-
101-
Savepoint lastCheckpoint = configureSavepoint(resourceContext);
102-
103-
return initiateDeployment(
104-
context,
105-
transition.nextDeploymentType,
106-
transition.nextState,
107-
lastCheckpoint,
108-
false);
109+
if (isFlinkDeploymentReady(currentFlinkDeployment)) {
110+
if (specDiff == BlueGreenDiffType.TRANSITION) {
111+
return startTransition(context, currentDeploymentType, currentFlinkDeployment);
112+
} else {
113+
return patchFlinkDeployment(context, currentDeploymentType, specDiff);
114+
}
109115
} else {
110116
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
111117
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
@@ -116,6 +122,50 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
116122
return UpdateControl.noUpdate();
117123
}
118124

125+
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
126+
BlueGreenContext context,
127+
DeploymentType currentDeploymentType,
128+
BlueGreenDiffType specDiff) {
129+
130+
if (specDiff == BlueGreenDiffType.PATCH_BOTH || specDiff == BlueGreenDiffType.PATCH_CHILD) {
131+
FlinkDeployment nextFlinkDeployment =
132+
context.getDeploymentByType(currentDeploymentType);
133+
134+
nextFlinkDeployment.setSpec(
135+
context.getBgDeployment().getSpec().getTemplate().getSpec());
136+
137+
updateFlinkDeployment(nextFlinkDeployment, context);
138+
139+
return patchStatusUpdateControl(
140+
context,
141+
calculatePatchingState(currentDeploymentType),
142+
JobStatus.RECONCILING)
143+
.rescheduleAfter(BlueGreenUtils.getReconciliationReschedInterval(context));
144+
}
145+
146+
return UpdateControl.noUpdate();
147+
}
148+
149+
private UpdateControl<FlinkBlueGreenDeployment> startTransition(
150+
BlueGreenContext context,
151+
DeploymentType currentDeploymentType,
152+
FlinkDeployment currentFlinkDeployment) {
153+
DeploymentTransition transition = calculateTransition(currentDeploymentType);
154+
155+
FlinkResourceContext<FlinkDeployment> resourceContext =
156+
context.getCtxFactory()
157+
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
158+
159+
Savepoint lastCheckpoint = configureSavepoint(resourceContext);
160+
161+
return initiateDeployment(
162+
context,
163+
transition.nextDeploymentType,
164+
transition.nextState,
165+
lastCheckpoint,
166+
false);
167+
}
168+
119169
private DeploymentTransition calculateTransition(DeploymentType currentType) {
120170
if (DeploymentType.BLUE == currentType) {
121171
return new DeploymentTransition(
@@ -126,6 +176,14 @@ private DeploymentTransition calculateTransition(DeploymentType currentType) {
126176
}
127177
}
128178

179+
private FlinkBlueGreenDeploymentState calculatePatchingState(DeploymentType currentType) {
180+
if (DeploymentType.BLUE == currentType) {
181+
return FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
182+
} else {
183+
return FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
184+
}
185+
}
186+
129187
// ==================== Transition Monitoring Methods ====================
130188

131189
/**
@@ -244,8 +302,7 @@ private UpdateControl<FlinkBlueGreenDeployment> waitBeforeDeleting(
244302
private UpdateControl<FlinkBlueGreenDeployment> deleteDeployment(
245303
FlinkDeployment currentDeployment, BlueGreenContext context) {
246304

247-
boolean deleted =
248-
BlueGreenKubernetesService.deleteFlinkDeployment(currentDeployment, context);
305+
boolean deleted = deleteFlinkDeployment(currentDeployment, context);
249306

250307
if (!deleted) {
251308
LOG.info("FlinkDeployment '{}' not deleted, will retry", currentDeployment);
@@ -295,7 +352,7 @@ private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
295352
FlinkBlueGreenDeploymentState nextState,
296353
String deploymentName) {
297354

298-
BlueGreenKubernetesService.suspendFlinkDeployment(context, nextDeployment);
355+
suspendFlinkDeployment(context, nextDeployment);
299356

300357
FlinkBlueGreenDeploymentState previousState =
301358
getPreviousState(nextState, context.getDeployments());

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,15 @@
2020
import org.apache.flink.api.common.JobStatus;
2121
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2222
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
23-
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
2423
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2524
import org.apache.flink.kubernetes.operator.api.spec.JobState;
26-
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
2725

2826
import io.fabric8.kubernetes.api.model.ObjectMeta;
2927
import io.fabric8.kubernetes.api.model.OwnerReference;
3028
import io.fabric8.kubernetes.api.model.StatusDetails;
3129

3230
import java.util.List;
3331

34-
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.prepareFlinkDeployment;
35-
3632
/** Utility methods for handling Kubernetes operations in Blue/Green deployments. */
3733
public class BlueGreenKubernetesService {
3834

@@ -58,21 +54,7 @@ public static ObjectMeta getDependentObjectMeta(FlinkBlueGreenDeployment bgDeplo
5854
return objectMeta;
5955
}
6056

61-
public static void deployCluster(
62-
BlueGreenContext context,
63-
DeploymentType deploymentType,
64-
Savepoint lastCheckpoint,
65-
boolean isFirstDeployment) {
66-
ObjectMeta bgMeta = context.getBgDeployment().getMetadata();
67-
68-
FlinkDeployment flinkDeployment =
69-
prepareFlinkDeployment(
70-
context.getBgDeployment(),
71-
deploymentType,
72-
lastCheckpoint,
73-
isFirstDeployment,
74-
bgMeta);
75-
57+
public static void deployCluster(BlueGreenContext context, FlinkDeployment flinkDeployment) {
7658
// Deploy
7759
context.getJosdkContext().getClient().resource(flinkDeployment).createOrReplace();
7860
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenSpecUtils.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ public static <T> T adjustNameReferences(
7878
*/
7979
public static boolean hasSpecChanged(BlueGreenContext context) {
8080

81+
BlueGreenDiffType diffType = getSpecDiff(context);
82+
83+
return diffType != BlueGreenDiffType.IGNORE;
84+
}
85+
86+
public static BlueGreenDiffType getSpecDiff(BlueGreenContext context) {
8187
FlinkBlueGreenDeploymentStatus deploymentStatus = context.getDeploymentStatus();
8288
String lastReconciledSpec = deploymentStatus.getLastReconciledSpec();
8389
FlinkBlueGreenDeploymentSpec lastSpec =
@@ -90,9 +96,7 @@ public static boolean hasSpecChanged(BlueGreenContext context) {
9096
lastSpec,
9197
context.getBgDeployment().getSpec());
9298

93-
BlueGreenDiffType diffType = diff.compare();
94-
95-
return diffType != BlueGreenDiffType.IGNORE;
99+
return diff.compare();
96100
}
97101

98102
/**

0 commit comments

Comments
 (0)