Skip to content

Commit 099a463

Browse files
committed
Fix job upgrade mode switching corner cases
1 parent 4813d99 commit 099a463

File tree

2 files changed

+44
-34
lines changed

2 files changed

+44
-34
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -239,31 +239,52 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
239239
}
240240

241241
boolean running = ReconciliationUtils.isJobRunning(status);
242+
boolean versionChanged =
243+
flinkVersionChanged(
244+
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec());
245+
242246
if (upgradeMode == UpgradeMode.SAVEPOINT) {
243247
if (running) {
244248
LOG.info("Job is in running state, ready for upgrade with savepoint");
245249
return JobUpgrade.savepoint(false);
250+
} else if (versionChanged
251+
|| deployConfig.get(
252+
KubernetesOperatorConfigOptions
253+
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)) {
254+
LOG.info("Falling back to last-state upgrade mode from savepoint");
255+
ctx.getResource()
256+
.getSpec()
257+
.getJob()
258+
.setUpgradeMode(upgradeMode = UpgradeMode.LAST_STATE);
259+
} else {
260+
LOG.info("Last-state fallback is disabled, waiting for upgradable state");
261+
return JobUpgrade.pendingUpgrade();
246262
}
247-
} else {
248-
// Last state upgrade
263+
}
264+
265+
if (upgradeMode == UpgradeMode.LAST_STATE) {
266+
if (versionChanged) {
267+
// We need some special handling in case of version upgrades where HA based
268+
// last-state upgrade is not possible
269+
boolean savepointPossible =
270+
!StringUtils.isNullOrWhitespaceOnly(
271+
ctx.getObserveConfig()
272+
.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
273+
if (running && savepointPossible) {
274+
LOG.info("Using savepoint to upgrade Flink version");
275+
return JobUpgrade.savepoint(false);
276+
} else if (ReconciliationUtils.isJobCancellable(resource.getStatus())) {
277+
LOG.info("Using last-state upgrade with cancellation to upgrade Flink version");
278+
return JobUpgrade.lastStateUsingCancel();
279+
} else {
280+
LOG.info(
281+
"Neither savepoint nor cancellation is possible, cannot perform stateful version upgrade");
282+
return JobUpgrade.pendingUpgrade();
283+
}
284+
}
285+
249286
boolean cancellable = allowLastStateCancel(ctx);
250287
if (running) {
251-
if (flinkVersionChanged(
252-
ReconciliationUtils.getDeployedSpec(resource), resource.getSpec())) {
253-
// We need some special handling in case of version upgrades, where we prefer
254-
// savepoints if possible conditional that savepoint directory is set
255-
boolean savepointPossible =
256-
!StringUtils.isNullOrWhitespaceOnly(
257-
ctx.getObserveConfig()
258-
.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
259-
if (savepointPossible) {
260-
LOG.info("Using savepoint upgrade mode to upgrade Flink version");
261-
return JobUpgrade.savepoint(false);
262-
} else if (cancellable) {
263-
LOG.info("Using cancel upgrade mode to upgrade Flink version");
264-
return JobUpgrade.lastStateUsingCancel();
265-
}
266-
}
267288
return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
268289
}
269290

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,23 +89,12 @@ protected JobUpgrade getJobUpgrade(
8989
}
9090
var flinkService = ctx.getFlinkService();
9191

92-
boolean lastStateAllowed =
93-
deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.LAST_STATE
94-
|| deployConfig.getBoolean(
95-
KubernetesOperatorConfigOptions
96-
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED);
97-
98-
if (lastStateAllowed
99-
&& HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
92+
if (HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
10093
&& HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
101-
&& !flinkVersionChanged(
102-
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
103-
104-
if (flinkService.isHaMetadataAvailable(deployConfig)) {
105-
LOG.info(
106-
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
107-
return JobUpgrade.lastStateUsingHaMeta();
108-
}
94+
&& flinkService.isHaMetadataAvailable(deployConfig)) {
95+
LOG.info(
96+
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
97+
return JobUpgrade.lastStateUsingHaMeta();
10998
}
11099

111100
var jmDeployStatus = status.getJobManagerDeploymentStatus();

0 commit comments

Comments
 (0)