Skip to content

Commit d1827a4

Browse files
authored
[FLINK-35414] Rework last-state upgrade mode to support job cancellation as suspend mechanism
1 parent 699acae commit d1827a4

File tree

46 files changed

+1372
-896
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1372
-896
lines changed

docs/content/docs/custom-resource/job-management.md

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,13 @@ Supported values: `stateless`, `savepoint`, `last-state`
8484

8585
The `upgradeMode` setting controls both the stop and restore mechanisms as detailed in the following table:
8686

87-
| | Stateless | Last State | Savepoint |
88-
|------------------------|-------------------------|--------------------------------------------|----------------------------------------|
89-
| Config Requirement | None | Checkpointing & HA Enabled | Checkpoint/Savepoint directory defined |
90-
| Job Status Requirement | None | HA metadata available | Job Running* |
91-
| Suspend Mechanism | Cancel / Delete | Delete Flink deployment (keep HA metadata) | Cancel with savepoint |
92-
| Restore Mechanism | Deploy from empty state | Recover last state using HA metadata | Restore From savepoint |
93-
| Production Use | Not recommended | Recommended | Recommended |
87+
| | Stateless | Last State | Savepoint |
88+
|------------------------|-----------------|------------------------------------|----------------------------------------|
89+
| Config Requirement | None | Checkpointing Enabled | Checkpoint/Savepoint directory defined |
90+
| Job Status Requirement | None | Job or HA metadata accessible | Job Running* |
91+
| Suspend Mechanism | Cancel / Delete | Cancel / Delete (keep HA metadata) | Cancel with savepoint |
92+
| Restore Mechanism | Empty state | Use HA metadata or last cp/sp | Restore From savepoint |
93+
| Production Use | Not recommended | Recommended | Recommended |
9494

9595

9696
*\* When HA is enabled the `savepoint` upgrade mode may fall back to the `last-state` behaviour in cases where the job is in an unhealthy state.*
@@ -149,10 +149,6 @@ spec:
149149
state: running
150150
```
151151
152-
{{< hint warning >}}
153-
Last state upgrade mode is currently only supported for `FlinkDeployments`.
154-
{{< /hint >}}
155-
156152
### Application restarts without spec change
157153
158154
There are cases when users would like to restart the Flink deployments to deal with some transient problem.

docs/content/docs/custom-resource/overview.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,6 @@ COPY flink-hadoop-fs-1.19-SNAPSHOT.jar $FLINK_PLUGINS_DIR/hadoop-fs/
215215

216216
Alternatively, if you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
217217

218-
### Limitations
219-
220-
- Last-state upgradeMode is currently not supported for FlinkSessionJobs
221-
222218
## Further information
223219

224220
- [Snapshots]({{< ref "docs/custom-resource/snapshots" >}})

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
452452

453453
| Parameter | Type | Docs |
454454
| ----------| ---- | ---- |
455-
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
455+
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
456456
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
457457
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
458458
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@
122122
<td>Boolean</td>
123123
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
124124
</tr>
125+
<tr>
126+
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
127+
<td style="word-wrap: break-word;">false</td>
128+
<td>Boolean</td>
129+
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
130+
</tr>
125131
<tr>
126132
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
127133
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@
212212
<td>Boolean</td>
213213
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
214214
</tr>
215+
<tr>
216+
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
217+
<td style="word-wrap: break-word;">false</td>
218+
<td>Boolean</td>
219+
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
220+
</tr>
215221
<tr>
216222
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
217223
<td style="word-wrap: break-word;">(none)</td>

e2e-tests/test_sessionjob_operations.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ if [ "$location" == "" ];then
5454
exit 1
5555
fi
5656

57+
echo "Starting sessionjob savepoint upgrade test"
5758
# Testing savepoint mode upgrade
5859
# Update the FlinkSessionJob and trigger the savepoint upgrade
5960
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
@@ -67,6 +68,24 @@ assert_available_slots 1 $CLUSTER_ID
6768

6869
echo "Successfully run the sessionjob savepoint upgrade test"
6970

71+
flink_version=$(kubectl get $SESSION_CLUSTER_IDENTIFIER -o yaml | yq '.spec.flinkVersion')
72+
73+
if [ "$flink_version" != "v1_16" ]; then
74+
echo "Starting sessionjob last-state upgrade test"
75+
# Testing last-state mode upgrade
76+
# Update the FlinkSessionJob and trigger the last-state upgrade
77+
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }'
78+
79+
# Check the job was restarted with the new parallelism
80+
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1
81+
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
82+
assert_available_slots 0 $CLUSTER_ID
83+
84+
echo "Successfully run the sessionjob last-state upgrade test"
85+
else
86+
echo "Skipping last-state test for flink version 1.16"
87+
fi
88+
7089
# Test Operator restart
7190
echo "Delete session job " + $SESSION_JOB_NAME
7291
kubectl delete flinksessionjob $SESSION_JOB_NAME

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/docs/CrdReferenceDoclet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ public Void scan(Element e, Integer depth) {
219219
}
220220
break;
221221
case FIELD:
222+
if (e.getModifiers().contains(Modifier.STATIC)) {
223+
return null;
224+
}
222225
out.println(
223226
"| "
224227
+ getNameOrJsonPropValue(e)

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.flink.kubernetes.operator.api.status;
1919

2020
import org.apache.flink.annotation.Experimental;
21-
import org.apache.flink.annotation.Internal;
2221
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2322
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
2423
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2524

26-
import com.fasterxml.jackson.annotation.JsonIgnore;
2725
import io.fabric8.crd.generator.annotation.PrinterColumn;
2826
import lombok.AllArgsConstructor;
2927
import lombok.Data;
@@ -98,10 +96,4 @@ public ResourceLifecycleState getLifecycleState() {
9896

9997
return ResourceLifecycleState.DEPLOYED;
10098
}
101-
102-
/**
103-
* Internal flag to signal that due to some condition we need to schedule a new reconciliation
104-
* loop immediately. For example autoscaler overrides have changed and we need to apply them.
105-
*/
106-
@JsonIgnore @Internal private boolean immediateReconciliationNeeded = false;
10799
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2222
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
23-
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2423
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
2524
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
2625

@@ -100,22 +99,4 @@ public boolean isLastReconciledSpecStable() {
10099
public boolean isBeforeFirstDeployment() {
101100
return lastReconciledSpec == null;
102101
}
103-
104-
/**
105-
* This method is only here for backward compatibility reasons. The current version of the
106-
* operator does not leave the resources in UPGRADING state during in-place scaling therefore
107-
* this method will always return false.
108-
*
109-
* @return True if in-place scaling is in progress.
110-
*/
111-
@JsonIgnore
112-
@Deprecated
113-
public boolean scalingInProgress() {
114-
if (isBeforeFirstDeployment() || state != ReconciliationState.UPGRADING) {
115-
return false;
116-
}
117-
var job = deserializeLastReconciledSpec().getJob();
118-
// For regular full upgrades the jobstate is suspended in UPGRADING state
119-
return job != null && job.getState() == JobState.RUNNING;
120-
}
121102
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import lombok.Builder;
2525
import lombok.Data;
2626
import lombok.NoArgsConstructor;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.ArrayList;
2931
import java.util.List;
@@ -40,6 +42,9 @@ public class SavepointInfo implements SnapshotInfo {
4042
* Last completed savepoint by the operator for manual and periodic snapshots. Only used if
4143
* FlinkStateSnapshot resources are disabled.
4244
*/
45+
private static final Logger LOG = LoggerFactory.getLogger(SavepointInfo.class);
46+
47+
/** Last completed savepoint by the operator. */
4348
private Savepoint lastSavepoint;
4449

4550
/** Trigger id of a pending savepoint operation. */
@@ -82,7 +87,11 @@ public void resetTrigger() {
8287
* @param savepoint Savepoint to be added.
8388
*/
8489
public void updateLastSavepoint(Savepoint savepoint) {
85-
if (lastSavepoint == null || !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
90+
if (savepoint == null) {
91+
lastSavepoint = null;
92+
} else if (lastSavepoint == null
93+
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
94+
LOG.debug("Updating last savepoint to {}", savepoint);
8695
lastSavepoint = savepoint;
8796
savepointHistory.add(savepoint);
8897
if (savepoint.getTriggerType() == SnapshotTriggerType.PERIODIC) {

0 commit comments

Comments
 (0)