Skip to content

Commit c178eae

Browse files
committed
Adding support for Savepointing before transItion in the case of UpgradeMode.SAVEPOINT
1 parent 5a3b802 commit c178eae

File tree

9 files changed

+372
-79
lines changed

9 files changed

+372
-79
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,10 @@ public enum FlinkBlueGreenDeploymentState {
3737

3838
/** Identifies the system is transitioning from "Blue" to "Green". */
3939
TRANSITIONING_TO_GREEN,
40+
41+
/** Identifies the system is savepointing "Blue" before it transitions to "Green". */
42+
SAVEPOINTING_BLUE,
43+
44+
/** Identifies the system is savepointing "Green" before it transitions to "Blue". */
45+
SAVEPOINTING_GREEN,
4046
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,7 @@ public class FlinkBlueGreenDeploymentStatus {
5252

5353
/** Timestamp when the deployment became READY/STABLE. Used to determine when to delete it. */
5454
private String deploymentReadyTimestamp;
55+
56+
/** Persisted triggerId to track transition with savepoint. Only used with UpgradeMode.SAVEPOINT */
57+
private String savepointTriggerId;
5558
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 109 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
2525
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2626
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
27+
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
28+
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
29+
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
2730
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
2831
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2932
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils;
@@ -32,6 +35,8 @@
3235

3336
import io.fabric8.kubernetes.api.model.ObjectMeta;
3437
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
38+
import lombok.AllArgsConstructor;
39+
import lombok.Getter;
3540
import org.slf4j.Logger;
3641
import org.slf4j.LoggerFactory;
3742

@@ -42,8 +47,13 @@
4247
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady;
4348
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment;
4449
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateFlinkDeployment;
45-
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.configureSavepoint;
50+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.fetchSavepointInfo;
51+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.getLastCheckpoint;
52+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.isSavepointRequired;
53+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.lookForCheckpoint;
4654
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.prepareFlinkDeployment;
55+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.triggerSavepoint;
56+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
4757
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
4858
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
4959

@@ -90,7 +100,7 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
90100
}
91101

92102
/**
93-
* Checks if a deployment can be initiated and initiates it if conditions are met.
103+
* Checks if a full transition can be initiated and initiates it if conditions are met.
94104
*
95105
* @param context the transition context
96106
* @param currentDeploymentType the current deployment type
@@ -101,19 +111,28 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
101111
BlueGreenDiffType specDiff = BlueGreenSpecUtils.getSpecDiff(context);
102112

103113
if (specDiff != BlueGreenDiffType.IGNORE) {
104-
BlueGreenSpecUtils.setLastReconciledSpec(context);
105-
106114
FlinkDeployment currentFlinkDeployment =
107115
context.getDeploymentByType(currentDeploymentType);
108116

109117
if (isFlinkDeploymentReady(currentFlinkDeployment)) {
110118
if (specDiff == BlueGreenDiffType.TRANSITION) {
119+
if (handleSavepoint(context, currentFlinkDeployment)) {
120+
// This is the only portion where the last reconciled spec is not set,
121+
// so we can process TRANSITION after the savepoint is done
122+
var savepointingState = calculateSavepointingState(currentDeploymentType);
123+
return patchStatusUpdateControl(context, savepointingState, null)
124+
.rescheduleAfter(getReconciliationReschedInterval(context));
125+
}
126+
127+
BlueGreenSpecUtils.setLastReconciledSpec(context);
111128
return startTransition(context, currentDeploymentType, currentFlinkDeployment);
112129
} else {
130+
BlueGreenSpecUtils.setLastReconciledSpec(context);
113131
return patchFlinkDeployment(context, currentDeploymentType, specDiff);
114132
}
115133
} else {
116134
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
135+
BlueGreenSpecUtils.setLastReconciledSpec(context);
117136
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
118137
}
119138
}
@@ -150,11 +169,7 @@ private UpdateControl<FlinkBlueGreenDeployment> startTransition(
150169
FlinkDeployment currentFlinkDeployment) {
151170
DeploymentTransition transition = calculateTransition(currentDeploymentType);
152171

153-
FlinkResourceContext<FlinkDeployment> resourceContext =
154-
context.getCtxFactory()
155-
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
156-
157-
Savepoint lastCheckpoint = configureSavepoint(resourceContext);
172+
Savepoint lastCheckpoint = configureInitialSavepoint(context, currentFlinkDeployment);
158173

159174
return initiateDeployment(
160175
context,
@@ -182,6 +197,85 @@ private FlinkBlueGreenDeploymentState calculatePatchingState(DeploymentType curr
182197
}
183198
}
184199

200+
// ==================== Savepointing Methods ====================
201+
202+
public boolean monitorSavepoint(
203+
BlueGreenContext context, DeploymentType currentDeploymentType) {
204+
205+
FlinkResourceContext<FlinkDeployment> ctx =
206+
context.getCtxFactory().getResourceContext(
207+
context.getDeploymentByType(currentDeploymentType),
208+
context.getJosdkContext());
209+
210+
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
211+
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
212+
213+
return !savepointFetchResult.isPending();
214+
}
215+
216+
private Savepoint configureInitialSavepoint(
217+
BlueGreenContext context,
218+
FlinkDeployment currentFlinkDeployment) {
219+
220+
FlinkResourceContext<FlinkDeployment> ctx =
221+
context.getCtxFactory()
222+
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
223+
224+
// If a savepoint is required we fetch it, should be ready by this point
225+
if (isSavepointRequired(context)) {
226+
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
227+
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
228+
229+
org.apache.flink.core.execution.SavepointFormatType coreSavepointFormatType =
230+
ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
231+
232+
var savepointFormatType = SavepointFormatType.valueOf(coreSavepointFormatType.toString());
233+
234+
return Savepoint.of(savepointFetchResult.getLocation(), SnapshotTriggerType.MANUAL, savepointFormatType);
235+
}
236+
237+
// Else we start looking for the last checkpoint if needed
238+
239+
if (!lookForCheckpoint(context)) {
240+
return null;
241+
}
242+
243+
return getLastCheckpoint(ctx);
244+
}
245+
246+
private boolean handleSavepoint(
247+
BlueGreenContext context,
248+
FlinkDeployment currentFlinkDeployment) {
249+
250+
if (!isSavepointRequired(context)) {
251+
return false;
252+
}
253+
254+
FlinkResourceContext<FlinkDeployment> ctx =
255+
context.getCtxFactory()
256+
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
257+
258+
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
259+
260+
if (savepointTriggerId == null || savepointTriggerId.isEmpty()) {
261+
String triggerId = triggerSavepoint(ctx);
262+
LOG.info("Savepoint requested (triggerId: {}", triggerId);
263+
context.getDeploymentStatus().setSavepointTriggerId(triggerId);
264+
return true;
265+
}
266+
267+
LOG.debug("Savepoint previously requested (triggerId: {})", savepointTriggerId);
268+
return false;
269+
}
270+
271+
private FlinkBlueGreenDeploymentState calculateSavepointingState(DeploymentType currentType) {
272+
if (DeploymentType.BLUE == currentType) {
273+
return FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE;
274+
} else {
275+
return FlinkBlueGreenDeploymentState.SAVEPOINTING_GREEN;
276+
}
277+
}
278+
185279
// ==================== Transition Monitoring Methods ====================
186280

187281
/**
@@ -396,6 +490,7 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
396490

397491
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
398492
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
493+
context.getDeploymentStatus().setSavepointTriggerId(null);
399494

400495
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING);
401496
}
@@ -423,31 +518,20 @@ public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
423518
return UpdateControl.patchStatus(flinkBlueGreenDeployment);
424519
}
425520

426-
// ==================== Inner Classes ====================
521+
// ==================== DTO/Result Classes ====================
427522

523+
@Getter
524+
@AllArgsConstructor
428525
private static class DeploymentTransition {
429526
final DeploymentType nextDeploymentType;
430527
final FlinkBlueGreenDeploymentState nextState;
431-
432-
DeploymentTransition(
433-
DeploymentType nextDeploymentType, FlinkBlueGreenDeploymentState nextState) {
434-
this.nextDeploymentType = nextDeploymentType;
435-
this.nextState = nextState;
436-
}
437528
}
438529

530+
@Getter
531+
@AllArgsConstructor
439532
private static class TransitionState {
440533
final FlinkDeployment currentDeployment;
441534
final FlinkDeployment nextDeployment;
442535
final FlinkBlueGreenDeploymentState nextState;
443-
444-
TransitionState(
445-
FlinkDeployment currentDeployment,
446-
FlinkDeployment nextDeployment,
447-
FlinkBlueGreenDeploymentState nextState) {
448-
this.currentDeployment = currentDeployment;
449-
this.nextDeployment = nextDeployment;
450-
this.nextState = nextState;
451-
}
452536
}
453537
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenKubernetesService.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.flink.kubernetes.operator.controller.bluegreen;
1919

20-
import org.apache.flink.api.common.JobStatus;
2120
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2221
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2322
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2423
import org.apache.flink.kubernetes.operator.api.spec.JobState;
24+
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2525

2626
import io.fabric8.kubernetes.api.model.ObjectMeta;
2727
import io.fabric8.kubernetes.api.model.OwnerReference;
@@ -67,7 +67,7 @@ public static void deployCluster(BlueGreenContext context, FlinkDeployment flink
6767
*/
6868
public static boolean isFlinkDeploymentReady(FlinkDeployment deployment) {
6969
return ResourceLifecycleState.STABLE == deployment.getStatus().getLifecycleState()
70-
&& JobStatus.RUNNING == deployment.getStatus().getJobStatus().getState();
70+
&& ReconciliationUtils.isJobRunning(deployment.getStatus());
7171
}
7272

7373
public static void suspendFlinkDeployment(
@@ -78,11 +78,21 @@ public static void suspendFlinkDeployment(
7878

7979
public static void updateFlinkDeployment(
8080
FlinkDeployment nextDeployment, BlueGreenContext context) {
81-
context.getJosdkContext().getClient().resource(nextDeployment).update();
81+
String namespace = context.getBgDeployment().getMetadata().getNamespace();
82+
context.getJosdkContext()
83+
.getClient()
84+
.resource(nextDeployment)
85+
.inNamespace(namespace)
86+
.update();
8287
}
8388

8489
public static void replaceFlinkBlueGreenDeployment(BlueGreenContext context) {
85-
context.getJosdkContext().getClient().resource(context.getBgDeployment()).replace();
90+
String namespace = context.getBgDeployment().getMetadata().getNamespace();
91+
context.getJosdkContext()
92+
.getClient()
93+
.resource(context.getBgDeployment())
94+
.inNamespace(namespace)
95+
.replace();
8696
}
8797

8898
/**

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenStateHandlerRegistry.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveStateHandler;
2222
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
2323
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.InitializingBlueStateHandler;
24+
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.SavepointingStateHandler;
2425
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningStateHandler;
2526

2627
import java.util.Map;
2728

2829
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
2930
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
3031
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
32+
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE;
33+
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.SAVEPOINTING_GREEN;
3134
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
3235
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
3336

@@ -51,7 +54,11 @@ ACTIVE_GREEN, new ActiveStateHandler(ACTIVE_GREEN, deploymentService),
5154
TRANSITIONING_TO_BLUE, deploymentService),
5255
TRANSITIONING_TO_GREEN,
5356
new TransitioningStateHandler(
54-
TRANSITIONING_TO_GREEN, deploymentService));
57+
TRANSITIONING_TO_GREEN, deploymentService),
58+
SAVEPOINTING_BLUE,
59+
new SavepointingStateHandler(SAVEPOINTING_BLUE, deploymentService),
60+
SAVEPOINTING_GREEN,
61+
new SavepointingStateHandler(SAVEPOINTING_GREEN, deploymentService));
5562
}
5663

5764
/**
@@ -68,14 +75,4 @@ public BlueGreenStateHandler getHandler(FlinkBlueGreenDeploymentState state) {
6875
}
6976
return handler;
7077
}
71-
72-
/**
73-
* Checks if a handler exists for the given state.
74-
*
75-
* @param state the Blue/Green deployment state
76-
* @return true if a handler exists, false otherwise
77-
*/
78-
public boolean hasHandler(FlinkBlueGreenDeploymentState state) {
79-
return handlers.containsKey(state);
80-
}
8178
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
2+
3+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
4+
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
5+
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
6+
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
7+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
8+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
9+
10+
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
11+
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
12+
13+
public class SavepointingStateHandler extends AbstractBlueGreenStateHandler {
14+
15+
public SavepointingStateHandler(
16+
FlinkBlueGreenDeploymentState supportedState,
17+
BlueGreenDeploymentService deploymentService) {
18+
super(supportedState, deploymentService);
19+
}
20+
21+
@Override
22+
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
23+
DeploymentType currentType = getCurrentDeploymentType();
24+
var isSavepointReady = deploymentService.monitorSavepoint(context, currentType);
25+
26+
// Savepoint complete, continue with the transition
27+
if (isSavepointReady) {
28+
var nextState = getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE ?
29+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE :
30+
FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
31+
return patchStatusUpdateControl(context, nextState, null)
32+
.rescheduleAfter(500);
33+
}
34+
35+
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate()
36+
.rescheduleAfter(getReconciliationReschedInterval(context));
37+
}
38+
39+
private DeploymentType getCurrentDeploymentType() {
40+
return getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE
41+
? DeploymentType.BLUE
42+
: DeploymentType.GREEN;
43+
}
44+
}

0 commit comments

Comments
 (0)