Skip to content

Commit 287b1c4

Browse files
committed
Taking savepoint also for LAST-STATE, removed the last checkpoint usage.
1 parent 929f3d0 commit 287b1c4

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,9 @@
4747
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment;
4848
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateFlinkDeployment;
4949
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.fetchSavepointInfo;
50-
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.getLastCheckpoint;
5150
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.getSpecDiff;
5251
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.hasSpecChanged;
5352
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.isSavepointRequired;
54-
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.lookForCheckpoint;
5553
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.prepareFlinkDeployment;
5654
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.setLastReconciledSpec;
5755
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenSpecUtils.triggerSavepoint;
@@ -224,7 +222,7 @@ private Savepoint configureInitialSavepoint(
224222
context.getCtxFactory()
225223
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
226224

227-
// If a savepoint is required we fetch it, should be ready by this point
225+
// For now UpgradeMode != STATELESS will use a savepoint, originally only SAVEPOINT
228226
if (isSavepointRequired(context)) {
229227
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
230228
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
@@ -242,13 +240,15 @@ private Savepoint configureInitialSavepoint(
242240
savepointFormatType);
243241
}
244242

245-
// Else we start looking for the last checkpoint if needed
243+
// The logic below looked for the last checkpoint in case upgradeMode = LAST_STATE
244+
// We don't want to rely on last checkpoint for now.
245+
return null;
246246

247-
if (!lookForCheckpoint(context)) {
248-
return null;
249-
}
250-
251-
return getLastCheckpoint(ctx);
247+
// if (!lookForCheckpoint(context)) {
248+
// return null;
249+
// }
250+
//
251+
// return getLastCheckpoint(ctx);
252252
}
253253

254254
private boolean handleSavepoint(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenSpecUtils.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,16 @@ public static BlueGreenDiffType getSpecDiff(BlueGreenContext context) {
106106
}
107107

108108
public static boolean isSavepointRequired(BlueGreenContext context) {
109-
return UpgradeMode.SAVEPOINT
110-
== context.getBgDeployment()
109+
UpgradeMode upgradeMode =
110+
context.getBgDeployment()
111111
.getSpec()
112112
.getTemplate()
113113
.getSpec()
114114
.getJob()
115115
.getUpgradeMode();
116+
// return UpgradeMode.SAVEPOINT == upgradeMode;
117+
// For now we're taking savepoints in STATELESS or LAST-STATE
118+
return UpgradeMode.STATELESS != upgradeMode;
116119
}
117120

118121
public static boolean lookForCheckpoint(BlueGreenContext context) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,9 @@ public void verifyBasicTransition(
131131
simulateChangeInSpec(
132132
rs.deployment, customValue, ALT_DELETION_DELAY_VALUE, initialSavepointPath);
133133

134-
var expectedSavepointPath =
135-
upgradeMode == UpgradeMode.LAST_STATE ? TEST_CHECKPOINT_PATH : initialSavepointPath;
134+
var expectedSavepointPath = initialSavepointPath;
136135

137-
if (upgradeMode == UpgradeMode.SAVEPOINT) {
136+
if (upgradeMode != UpgradeMode.STATELESS) {
138137
// In this case there will ALWAYS be a savepoint generated with this value,
139138
// regardless of the initialSavepointPath
140139
expectedSavepointPath = "savepoint_1";

0 commit comments

Comments
 (0)