Skip to content

Commit 4537f04

Browse files
committed
Adding extra savepoint "carry over" feature
1 parent 347117d commit 4537f04

File tree

1 file changed

+73
-22
lines changed

1 file changed

+73
-22
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
3737
import lombok.AllArgsConstructor;
3838
import lombok.Getter;
39+
import org.jetbrains.annotations.NotNull;
40+
import org.jetbrains.annotations.Nullable;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
4143

@@ -129,6 +131,9 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
129131
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
130132
} else {
131133
setLastReconciledSpec(context);
134+
LOG.info(
135+
"Patching FlinkDeployment '{}' during checkAndInitiateDeployment",
136+
currentFlinkDeployment.getMetadata().getName());
132137
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
133138
}
134139
} else {
@@ -150,25 +155,65 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
150155
}
151156

152157
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
153-
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
158+
BlueGreenContext context, BlueGreenDeploymentType blueGreenDeploymentTypeToPatch) {
154159

155160
String childDeploymentName =
156161
context.getBgDeployment().getMetadata().getName()
157162
+ "-"
158-
+ currentBlueGreenDeploymentType.toString().toLowerCase();
159-
LOG.info("Patching FlinkDeployment '{}'", childDeploymentName);
163+
+ blueGreenDeploymentTypeToPatch.toString().toLowerCase();
160164

161165
// We want to patch, therefore the transition should point to the existing deployment
162166
// details
163-
var patchingState = calculatePatchingState(currentBlueGreenDeploymentType);
167+
var patchingState = calculatePatchingState(blueGreenDeploymentTypeToPatch);
164168

165169
// If we're not transitioning between deployments, mark as a single deployment to have it
166170
// not wait for synchronization
167171
var isFirstDeployment = context.getDeployments().getNumberOfDeployments() != 2;
168172

169-
// No checkpoint will be used when patching
173+
// TODO: if the resource failed right after being deployed with an initialSavepointPath,
174+
// will it be used by this patching? otherwise this is unnecessary, keep lastSavepoint =
175+
// null.
176+
Savepoint lastSavepoint =
177+
carryOverSavepoint(context, blueGreenDeploymentTypeToPatch, childDeploymentName);
178+
170179
return initiateDeployment(
171-
context, currentBlueGreenDeploymentType, patchingState, null, isFirstDeployment);
180+
context,
181+
blueGreenDeploymentTypeToPatch,
182+
patchingState,
183+
lastSavepoint,
184+
isFirstDeployment);
185+
}
186+
187+
@Nullable
188+
private static Savepoint carryOverSavepoint(
189+
BlueGreenContext context,
190+
BlueGreenDeploymentType blueGreenDeploymentTypeToPatch,
191+
String childDeploymentName) {
192+
var deploymentToPatch = context.getDeploymentByType(blueGreenDeploymentTypeToPatch);
193+
var initialSavepointPath = deploymentToPatch.getSpec().getJob().getInitialSavepointPath();
194+
195+
if (initialSavepointPath == null || initialSavepointPath.isEmpty()) {
196+
initialSavepointPath =
197+
deploymentToPatch.getStatus().getJobStatus().getUpgradeSavepointPath();
198+
}
199+
200+
Savepoint lastSavepoint = null;
201+
if (initialSavepointPath != null && !initialSavepointPath.isEmpty()) {
202+
var ctx =
203+
context.getCtxFactory()
204+
.getResourceContext(deploymentToPatch, context.getJosdkContext());
205+
206+
lastSavepoint = getSavepointObject(ctx, initialSavepointPath);
207+
208+
LOG.info(
209+
"Patching FlinkDeployment '{}', carrying over Savepoint at: '{}'",
210+
childDeploymentName,
211+
initialSavepointPath);
212+
} else {
213+
LOG.info("Patching FlinkDeployment '{}'", childDeploymentName);
214+
}
215+
216+
return lastSavepoint;
172217
}
173218

174219
private UpdateControl<FlinkBlueGreenDeployment> startTransition(
@@ -227,28 +272,17 @@ public boolean monitorSavepoint(
227272

228273
private Savepoint configureInitialSavepoint(
229274
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) {
230-
231-
FlinkResourceContext<FlinkDeployment> ctx =
232-
context.getCtxFactory()
233-
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
234-
235275
// Create savepoint for all upgrade modes except STATELESS
236276
// (originally only SAVEPOINT mode required savepoints)
237277
if (isSavepointRequired(context)) {
278+
FlinkResourceContext<FlinkDeployment> ctx =
279+
context.getCtxFactory()
280+
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
281+
238282
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
239283
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
240284

241-
org.apache.flink.core.execution.SavepointFormatType coreSavepointFormatType =
242-
ctx.getObserveConfig()
243-
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
244-
245-
var savepointFormatType =
246-
SavepointFormatType.valueOf(coreSavepointFormatType.toString());
247-
248-
return Savepoint.of(
249-
savepointFetchResult.getLocation(),
250-
SnapshotTriggerType.MANUAL,
251-
savepointFormatType);
285+
return getSavepointObject(ctx, savepointFetchResult.getLocation());
252286
}
253287

254288
// Currently not using last checkpoint recovery for LAST_STATE upgrade mode
@@ -262,6 +296,18 @@ private Savepoint configureInitialSavepoint(
262296
// return getLastCheckpoint(ctx);
263297
}
264298

299+
@NotNull
300+
private static Savepoint getSavepointObject(
301+
FlinkResourceContext<FlinkDeployment> ctx, String savepointLocation) {
302+
org.apache.flink.core.execution.SavepointFormatType coreSavepointFormatType =
303+
ctx.getObserveConfig()
304+
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
305+
306+
var savepointFormatType = SavepointFormatType.valueOf(coreSavepointFormatType.toString());
307+
308+
return Savepoint.of(savepointLocation, SnapshotTriggerType.MANUAL, savepointFormatType);
309+
}
310+
265311
private boolean handleSavepoint(
266312
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) {
267313

@@ -338,6 +384,11 @@ private UpdateControl<FlinkBlueGreenDeployment> handleSpecChangesDuringTransitio
338384
setLastReconciledSpec(context);
339385
var oppositeDeploymentType =
340386
context.getOppositeDeploymentType(currentBlueGreenDeploymentType);
387+
LOG.info(
388+
"Patching FlinkDeployment '{}' during handleSpecChangesDuringTransition",
389+
context.getDeploymentByType(oppositeDeploymentType)
390+
.getMetadata()
391+
.getName());
341392
return patchFlinkDeployment(context, oppositeDeploymentType);
342393
}
343394
}

0 commit comments

Comments
 (0)