Skip to content

Commit 8479f22

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples
1 parent 9819665 commit 8479f22

File tree

11 files changed

+395
-58
lines changed

11 files changed

+395
-58
lines changed

docs/content/docs/concepts/controller-flow.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase records a point-in-t
9898
The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation flow for all Flink resource types. Let’s take a look at the high level flow before we go into specifics for session, application and session job resources.
9999

100100
1. Check if the resource is ready for reconciliation or if there are any pending operations that should not be interrupted (manual savepoints for example)
101-
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `initialSavepointPath` provided in the spec.
101+
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `flinkStateSnapshotReference` provided in the spec.
102102
3. Next we determine if the desired spec changed and the type of change: `IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to execute further reconciliation logic.
103103
4. If we have upgrade/scale spec changes we execute the upgrade logic specific for the resource type
104104
5. If we did not receive any spec change we still have to ensure that the currently deployed resources are fully reconciled:

docs/content/docs/concepts/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ Flink Kubernetes Operator aims to capture the responsibilities of a human operat
5656
- Collect lag and utilization metrics
5757
- Scale job vertices to the ideal parallelism
5858
- Scale up and down as the load changes
59+
- [Snapshot management]({{< ref "docs/custom-resource/snapshots" >}})
60+
- Manage snapshots via Kubernetes CRs
5961
### Operations
6062
- Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
6163
- Utilizes the well-established [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)

docs/content/docs/custom-resource/job-management.md

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -167,56 +167,6 @@ For this purpose you can use the `restartNonce` top level field in the spec. Set
167167

168168
Restarts work exactly the same way as other application upgrades and follow the semantics detailed in the previous section.
169169

170-
## Savepoint management
171-
172-
Savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
173-
174-
For backup, job forking and other purposes savepoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.
175-
176-
### Manual Savepoint Triggering
177-
178-
Users can trigger savepoints manually by defining a new (different/random) value to the variable `savepointTriggerNonce` in the job specification:
179-
180-
```yaml
181-
job:
182-
...
183-
savepointTriggerNonce: 123
184-
```
185-
186-
Changing the nonce value will trigger a new savepoint. Information about pending and last savepoint is stored in the resource status.
187-
188-
### Periodic Savepoint Triggering
189-
190-
The operator also supports periodic savepoint triggering through the following config option which can be configured on a per job level:
191-
192-
```yaml
193-
flinkConfiguration:
194-
...
195-
kubernetes.operator.periodic.savepoint.interval: 6h
196-
```
197-
198-
There is no guarantee on the timely execution of the periodic savepoints as they might be delayed by unhealthy job status or other interfering user operation.
199-
200-
### Savepoint History
201-
202-
The operator automatically keeps track of the savepoint history triggered by upgrade or manual savepoint operations.
203-
This is necessary so cleanup can be performed by the operator for old savepoints.
204-
205-
Users can control the cleanup behaviour by specifying a maximum age and maximum count for the savepoints in the history.
206-
207-
```
208-
kubernetes.operator.savepoint.history.max.age: 24 h
209-
kubernetes.operator.savepoint.history.max.count: 5
210-
```
211-
212-
{{< hint info >}}
213-
Savepoint cleanup happens lazily and only when the application is running.
214-
It is therefore very likely that savepoints live beyond the max age configuration.
215-
{{< /hint >}}
216-
217-
To disable savepoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false`.
218-
When savepoint cleanup is disabled the operator will still collect and populate the savepoint history but not perform any dispose operations.
219-
220170
## Recovery of missing job deployments
221171

222172
When HA is enabled, the operator can recover the Flink cluster deployments in cases when it was accidentally deleted
@@ -297,16 +247,17 @@ Users have two options to restore a job from a target savepoint / checkpoint
297247

298248
### Redeploy using the savepointRedeployNonce
299249

300-
It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec:
250+
It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `flinkStateSnapshotReference` in the job spec:
301251

302252
```yaml
303253
job:
304-
initialSavepointPath: file://redeploy-target-savepoint
254+
flinkStateSnapshotReference:
255+
path: file://redeploy-target-savepoint
305256
# If not set previously, set to 1, otherwise increment, e.g. 2
306257
savepointRedeployNonce: 1
307258
```
308259

309-
When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty.
260+
When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint path must not be empty.
310261

311262
{{< hint warning >}}
312263
Rollbacks are not supported after redeployments.
@@ -320,7 +271,7 @@ However, this also means that savepoint history is lost and the operator won't c
320271
1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory.
321272
2. Delete the `FlinkDeployment` resource for your application
322273
3. Check that you have the current savepoint, and that your `FlinkDeployment` is deleted completely
323-
4. Modify your `FlinkDeployment` JobSpec and set the `initialSavepointPath` to your last checkpoint location
274+
4. Modify your `FlinkDeployment` JobSpec and set `flinkStateSnapshotReference.path` to your last checkpoint location
324275
5. Recreate the deployment
325276

326277
These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover.

docs/content/docs/custom-resource/overview.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ With these two Custom Resources, we can support two different operational models
3737
- Flink application managed by the `FlinkDeployment`
3838
- Empty Flink session managed by the `FlinkDeployment` + multiple jobs managed by the `FlinkSessionJobs`. The operations on the session jobs are independent of each other.
3939

40+
To help managing snapshots, there is another CR called FlinkStateSnapshot. This can be created by the operator in case of periodic and upgrade savepoints/checkpoints, or manually by the user to trigger a savepoint/checkpoint for a job.
41+
FlinkStateSnapshots will always have a FlinkDeployment or FlinkSessionJob linked to them in their spec.
42+
4043
## FlinkDeployment
4144

4245
FlinkDeployment objects are defined in YAML format by the user and must contain the following required fields:
@@ -218,6 +221,7 @@ Alternatively, if you use helm to install flink-kubernetes-operator, it allows y
218221

219222
## Further information
220223

224+
- [Snapshots]({{< ref "docs/custom-resource/snapshots" >}})
221225
- [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}})
222226
- [Deployment customization and pod templates]({{< ref "docs/custom-resource/pod-template" >}})
223227
- [Full Reference]({{< ref "docs/custom-resource/reference" >}})
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
---
2+
title: "Snapshots"
3+
weight: 3
4+
type: docs
5+
aliases:
6+
- /custom-resource/snapshots.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Snapshots
28+
29+
To create, list and delete snapshots you can use the custom resource called FlinkStateSnapshot.
30+
The operator will use the same controller flow as in the case of FlinkDeployment and FlinkSessionJob to trigger the savepoint/checkpoint and observe its status.
31+
32+
This feature deprecates the old `savepointInfo` and `checkpointInfo` fields found in the Flink resource CR status, alongside with spec fields `initialSavepointPath`, `savepointTriggerNonce` and `checkpointTriggerNonce`.
33+
It is enabled by default using the configuration option `kubernetes.operator.snapshot.resource.enabled`.
34+
If you set this to false, the operator will keep using the deprecated status fields to track snapshots.
35+
36+
## Overview
37+
38+
To create a savepoint or checkpoint, exactly one of the spec fields `savepoint` or `checkpoint` must present.
39+
Furthermore, in case of a savepoint you can signal to the operator that the savepoint already exists using the `alreadyExists` field, and the operator will mark it as a successful snapshot in the next reconciliation phase.
40+
41+
You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using `flinkStateSnapshotReference` in the job spec.
42+
43+
## Examples
44+
45+
### Savepoint
46+
47+
```yaml
48+
apiVersion: flink.apache.org/v1beta1
49+
kind: FlinkStateSnapshot
50+
metadata:
51+
name: example-savepoint
52+
spec:
53+
backoffLimit: 1 # retry count, -1 for infinite, 0 for no retries (default: -1)
54+
jobReference:
55+
kind: FlinkDeployment # FlinkDeployment or FlinkSessionJob
56+
name: example-deployment # name of the resource
57+
savepoint:
58+
alreadyExists: false # optional (default: false), if true, the path is considered to already exist and state will be COMPLETED on first reconciliation
59+
disposeOnDelete: true # optional (default: true), dispose of savepoint when this FlinkStateSnapshot is removed, job needs to be running
60+
formatType: CANONICAL # optional (default: CANONICAL), format type of savepoint
61+
path: /flink-data/savepoints-custom # optional (default: job savepoint path)
62+
```
63+
64+
### Checkpoint
65+
66+
```yaml
67+
apiVersion: flink.apache.org/v1beta1
68+
kind: FlinkStateSnapshot
69+
metadata:
70+
name: example-checkpoint
71+
spec:
72+
backoffLimit: 1
73+
jobReference:
74+
kind: FlinkDeployment
75+
name: example-deployment
76+
checkpoint: {}
77+
```
78+
79+
### Start job from existing snapshot
80+
81+
```yaml
82+
job:
83+
flinkStateSnapshotReference:
84+
namespace: flink # not required if it's in the same namespace
85+
name: example-savepoint
86+
```
87+
88+
{{< hint warning >}}
89+
While it is possible to start a job from a FlinkStateSnapshot with checkpoint type, checkpoint data is owned by Flink, and might be deleted by Flink anytime after triggering the checkpoint.
90+
{{< /hint >}}
91+
92+
93+
## Snapshot CR lifecycle
94+
95+
### Snapshot creation
96+
97+
When a new FlinkStateSnapshot CR is created, in the first reconciliation phase the operator will trigger the savepoint/checkpoint for the linked deployment via REST API.
98+
The resulting trigger ID will be added to the CR Status.
99+
100+
In the next observation phase the operator will check all the in-progress snapshots and query their state.
101+
If the snapshot was successful, the path will be added to the CR Status.
102+
103+
If the triggered snapshot is a savepoint and `spec.savepoint.alreadyExists` is set to true, on the first reconciliation the operator will populate its `status` fields with `COMPLETED` state, and copy the savepoint path found in the spec to `status.path`.
104+
105+
### Snapshot errors
106+
107+
If the operator encountered any errors during snapshot observation/reconciliation, the `error` field will be populated in the CR status and the `failures` field will be incremented by 1.
108+
If the backoff limit specified in the spec is reached, the snapshot will enter a `FAILED` state, and won't be retried.
109+
If it's not reached, the Operator will continuously back off retrying the snapshot (10s, 20s, 40s, ...).
110+
111+
In case of any error there will also be a new Event generated for the snapshot resource containing the error message.
112+
113+
{{< hint info >}}
114+
For checkpoints, after the operator has ensured that the checkpoint was successful, it will attempt to fetch its final path via Flink REST API.
115+
Any errors experienced during this step will generate a Kubernetes event, but will not populate the `error` field, and will mark the checkpoint as `COMPLETED`.
116+
The `path` field will stay empty though.
117+
{{< /hint >}}
118+
119+
### Snapshot abandonment
120+
121+
If the referenced Flink job can't be found or is stopped after triggering a snapshot, the state of the snapshot will be `ABANDONED` and won't be retried.
122+
123+
### Savepoint disposal on deletion
124+
125+
In case of savepoints, if `spec.savepoint.disposeOnDelete` is true, the operator will automatically dispose the savepoint on the filesystem when the CR gets deleted.
126+
This however requires the referenced Flink resource to be alive, as this operation is done using Flink REST API.
127+
128+
This feature is not available for checkpoints.
129+
130+
131+
## Triggering snapshots
132+
133+
Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
134+
In this case, the savepoint path will also be recorded in the `upgradeSnapshotReference` job status field, which the operator will use when restarting the job.
135+
136+
For backup, job forking and other purposes savepoint and checkpoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.
137+
138+
### Manual Checkpoint Triggering
139+
140+
Users can trigger snapshots manually by defining a new (different/random) value to the variable `savepointTriggerNonce` or `checkpointTriggerNonce` in the job specification:
141+
142+
```yaml
143+
job:
144+
...
145+
savepointTriggerNonce: 123
146+
checkpointTriggerNonce: 123
147+
...
148+
```
149+
150+
Changing the nonce value will trigger a new snapshot. If FlinkStateSnapshot resources are enabled, a new snapshot CR will be automatically created.
151+
If disabled, information about pending and last snapshots is stored in the FlinkDeployment/FlinkSessionJob CR status.
152+
153+
### Periodic Snapshot Triggering
154+
155+
The operator also supports periodic snapshot triggering through the following config option which can be configured on a per job level:
156+
157+
```yaml
158+
flinkConfiguration:
159+
...
160+
kubernetes.operator.periodic.savepoint.interval: 6h
161+
kubernetes.operator.periodic.checkpoint.interval: 6h
162+
```
163+
164+
There is no guarantee on the timely execution of the periodic snapshots as they might be delayed by unhealthy job status or other interfering user operation.
165+
166+
### Snapshot History
167+
168+
The operator automatically keeps track of the snapshot history triggered by upgrade, manual and periodic snapshot operations.
169+
This is necessary so cleanup can be performed by the operator for old snapshots.
170+
171+
Users can control the cleanup behaviour by specifying a maximum age and maximum count for the savepoint and checkpoint resources in the history.
172+
173+
```
174+
kubernetes.operator.savepoint.history.max.age: 24 h
175+
kubernetes.operator.savepoint.history.max.count: 5
176+
177+
kubernetes.operator.checkpoint.history.max.age: 24 h
178+
kubernetes.operator.checkpoint.history.max.count: 5
179+
```
180+
181+
{{< hint warning >}}
182+
Checkpoint history history cleanup is only supported if FlinkStateSnapshot resources are enabled.
183+
This operation will only delete the FlinkStateSnapshot CR, and will never delete any checkpoint data on the filesystem.
184+
{{< /hint >}}
185+
186+
{{< hint info >}}
187+
Savepoint cleanup happens lazily and only when the Flink resource associated with the snapshot is running.
188+
It is therefore very likely that savepoints live beyond the max age configuration.
189+
{{< /hint >}}
190+
191+
To also dispose of savepoint data on savepoint cleanup, set `kubernetes.operator.savepoint.dispose-on-delete: true`.
192+
This config will set `spec.savepoint.disposeOnDelete` to true for FlinkStateSnapshot CRs created by periodic savepoints and manual ones created using `savepointTriggerNonce`.
193+
194+
To disable savepoint/checkpoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false` and `kubernetes.operator.checkpoint.cleanup.enabled: false`.
195+

docs/content/docs/operations/upgrade.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,20 @@ Here is a reference example of upgrading a `basic-checkpoint-ha-example` deploym
148148
```
149149
5. Restore the job:
150150

151-
Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.initialSavepointPath` to the savepoint location obtained from the step 1.
151+
Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` to the savepoint location obtained from the step 1.
152152

153153
```
154154
spec:
155155
...
156156
job:
157-
initialSavepointPath: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
157+
flinkStateSnapshotReference:
158+
path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
158159
upgradeMode: savepoint
159160
...
160161
```
161162
Alternatively, we may use this command to edit and deploy the manifest:
162163
```sh
163-
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.initialSavepointPath" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
164+
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.flinkStateSnapshotReference.path" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
164165
```
165166
Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
166167
```

examples/snapshot/checkpoint.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkStateSnapshot
21+
metadata:
22+
name: example-checkpoint
23+
spec:
24+
backoffLimit: 0
25+
jobReference:
26+
kind: FlinkDeployment
27+
name: example-deployment
28+
checkpoint: {} # This will specify that we want a checkpoint

0 commit comments

Comments
 (0)