Skip to content

Commit b2dea76

Browse files
committed
Fix job upgrade mode switching corner cases
1 parent 8834197 commit b2dea76

File tree

2 files changed

+44
-34
lines changed

2 files changed

+44
-34
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -241,31 +241,52 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
241241
}
242242

243243
boolean running = ReconciliationUtils.isJobRunning(status);
244+
boolean versionChanged =
245+
flinkVersionChanged(
246+
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec());
247+
244248
if (upgradeMode == UpgradeMode.SAVEPOINT) {
245249
if (running) {
246250
LOG.info("Job is in running state, ready for upgrade with savepoint");
247251
return JobUpgrade.savepoint(false);
252+
} else if (versionChanged
253+
|| deployConfig.get(
254+
KubernetesOperatorConfigOptions
255+
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)) {
256+
LOG.info("Falling back to last-state upgrade mode from savepoint");
257+
ctx.getResource()
258+
.getSpec()
259+
.getJob()
260+
.setUpgradeMode(upgradeMode = UpgradeMode.LAST_STATE);
261+
} else {
262+
LOG.info("Last-state fallback is disabled, waiting for upgradable state");
263+
return JobUpgrade.pendingUpgrade();
248264
}
249-
} else {
250-
// Last state upgrade
265+
}
266+
267+
if (upgradeMode == UpgradeMode.LAST_STATE) {
268+
if (versionChanged) {
269+
// We need some special handling in case of version upgrades where HA based
270+
// last-state upgrade is not possible
271+
boolean savepointPossible =
272+
!StringUtils.isNullOrWhitespaceOnly(
273+
ctx.getObserveConfig()
274+
.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
275+
if (running && savepointPossible) {
276+
LOG.info("Using savepoint to upgrade Flink version");
277+
return JobUpgrade.savepoint(false);
278+
} else if (ReconciliationUtils.isJobCancellable(resource.getStatus())) {
279+
LOG.info("Using last-state upgrade with cancellation to upgrade Flink version");
280+
return JobUpgrade.lastStateUsingCancel();
281+
} else {
282+
LOG.info(
283+
"Neither savepoint nor cancellation is possible, cannot perform stateful version upgrade");
284+
return JobUpgrade.pendingUpgrade();
285+
}
286+
}
287+
251288
boolean cancellable = allowLastStateCancel(ctx);
252289
if (running) {
253-
if (flinkVersionChanged(
254-
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec())) {
255-
// We need some special handling in case of version upgrades, where we prefer
256-
// savepoints if possible conditional that savepoint directory is set
257-
boolean savepointPossible =
258-
!StringUtils.isNullOrWhitespaceOnly(
259-
ctx.getObserveConfig()
260-
.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
261-
if (savepointPossible) {
262-
LOG.info("Using savepoint upgrade mode to upgrade Flink version");
263-
return JobUpgrade.savepoint(false);
264-
} else if (cancellable) {
265-
LOG.info("Using cancel upgrade mode to upgrade Flink version");
266-
return JobUpgrade.lastStateUsingCancel();
267-
}
268-
}
269290
return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
270291
}
271292

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,12 @@ protected JobUpgrade getJobUpgrade(
9090
}
9191
var flinkService = ctx.getFlinkService();
9292

93-
boolean lastStateAllowed =
94-
deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
95-
|| deployConfig.getBoolean(
96-
KubernetesOperatorConfigOptions
97-
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED);
98-
99-
if (lastStateAllowed
100-
&& HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
93+
if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
10194
&& HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
102-
&& !flinkVersionChanged(
103-
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
104-
105-
if (flinkService.isHaMetadataAvailable(deployConfig)) {
106-
LOG.info(
107-
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
108-
return JobUpgrade.lastStateUsingHaMeta();
109-
}
95+
&& flinkService.isHaMetadataAvailable(deployConfig)) {
96+
LOG.info(
97+
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
98+
return JobUpgrade.lastStateUsingHaMeta();
11099
}
111100

112101
var jmDeployStatus = status.getJobManagerDeploymentStatus();

0 commit comments

Comments
 (0)