Skip to content

Commit 0f39014

Browse files
Mate Czaganygyfora
authored andcommitted
[FLINK-35265] Implement FlinkStateSnapshot custom resource
1 parent 7d90ce8 commit 0f39014

File tree

102 files changed

+5021
-767
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+5021
-767
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
4848

4949
## Spec
5050

51+
### CheckpointSpec
52+
**Class**: org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec
53+
54+
**Description**: Spec for checkpoint state snapshots.
55+
56+
| Parameter | Type | Docs |
57+
| ----------| ---- | ---- |
58+
| checkpointType | org.apache.flink.kubernetes.operator.api.status.CheckpointType | Type of checkpoint to take. |
59+
5160
### FlinkDeploymentSpec
5261
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec
5362

@@ -81,6 +90,29 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
8190
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
8291
| deploymentName | java.lang.String | The name of the target session cluster deployment. |
8392

93+
### FlinkStateSnapshotReference
94+
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference
95+
96+
**Description**: Reference for a FlinkStateSnapshot.
97+
98+
| Parameter | Type | Docs |
99+
| ----------| ---- | ---- |
100+
| namespace | java.lang.String | Namespace of the snapshot resource. |
101+
| name | java.lang.String | Name of the snapshot resource. |
102+
| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. |
103+
104+
### FlinkStateSnapshotSpec
105+
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec
106+
107+
**Description**: Spec that describes a FlinkStateSnapshot.
108+
109+
| Parameter | Type | Docs |
110+
| ----------| ---- | ---- |
111+
| jobReference | org.apache.flink.kubernetes.operator.api.spec.JobReference | Source to take a snapshot of. Not required if it's a savepoint and alreadyExists is true. |
112+
| savepoint | org.apache.flink.kubernetes.operator.api.spec.SavepointSpec | Spec in case of savepoint. |
113+
| checkpoint | org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec | Spec in case of checkpoint. |
114+
| backoffLimit | int | Maximum number of retries before the snapshot is considered as failed. Set to -1 for unlimited or 0 for no retries. |
115+
84116
### FlinkVersion
85117
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkVersion
86118

@@ -109,6 +141,16 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
109141
| labels | java.util.Map<java.lang.String,java.lang.String> | Ingress labels. |
110142
| tls | java.util.List<io.fabric8.kubernetes.api.model.networking.v1.IngressTLS> | Ingress tls. |
111143

144+
### JobKind
145+
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobKind
146+
147+
**Description**: Describes the Kubernetes kind of job reference.
148+
149+
| Value | Docs |
150+
| ----- | ---- |
151+
| FlinkDeployment | FlinkDeployment CR kind. |
152+
| FlinkSessionJob | FlinkSessionJob CR kind. |
153+
112154
### JobManagerSpec
113155
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec
114156

@@ -120,6 +162,16 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
120162
| replicas | int | Number of JobManager replicas. Must be 1 for non-HA deployments. |
121163
| podTemplate | io.fabric8.kubernetes.api.model.PodTemplateSpec | JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. |
122164

165+
### JobReference
166+
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobReference
167+
168+
**Description**: Flink resource reference that can be a FlinkDeployment or FlinkSessionJob.
169+
170+
| Parameter | Type | Docs |
171+
| ----------| ---- | ---- |
172+
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
173+
| name | java.lang.String | Name of the Flink resource. |
174+
123175
### JobSpec
124176
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
125177

@@ -134,10 +186,11 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
134186
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. |
135187
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. |
136188
| initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
189+
| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
137190
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. |
138191
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
139192
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
140-
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
193+
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
141194

142195
### JobState
143196
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
@@ -170,6 +223,18 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
170223
| memory | java.lang.String | Amount of memory allocated to the pod. Example: 1024m, 1g |
171224
| ephemeralStorage | java.lang.String | Amount of ephemeral storage allocated to the pod. Example: 1024m, 2G |
172225

226+
### SavepointSpec
227+
**Class**: org.apache.flink.kubernetes.operator.api.spec.SavepointSpec
228+
229+
**Description**: Spec for savepoint state snapshots.
230+
231+
| Parameter | Type | Docs |
232+
| ----------| ---- | ---- |
233+
| path | java.lang.String | Optional path for the savepoint. |
234+
| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType | Savepoint format to use. |
235+
| disposeOnDelete | java.lang.Boolean | Dispose the savepoints upon CR deletion. |
236+
| alreadyExists | java.lang.Boolean | Indicates that the savepoint already exists on the given path. The Operator will not trigger any new savepoints, just update the status of the resource as a completed snapshot. |
237+
173238
### TaskManagerSpec
174239
**Class**: org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec
175240

@@ -285,6 +350,47 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
285350
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
286351
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
287352

353+
### FlinkStateSnapshotStatus
354+
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus
355+
356+
**Description**: Last observed status of the Flink state snapshot.
357+
358+
| Parameter | Type | Docs |
359+
| ----------| ---- | ---- |
360+
361+
### State
362+
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State
363+
364+
**Description**: Describes state of a snapshot.
365+
366+
| Value | Docs |
367+
| ----- | ---- |
368+
| COMPLETED | Snapshot was successful and available. |
369+
| FAILED | Error during snapshot. |
370+
| IN_PROGRESS | Snapshot in progress. |
371+
| TRIGGER_PENDING | Not yet processed by the operator. |
372+
| ABANDONED | Snapshot abandoned due to job failure/upgrade. |
373+
| state | org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State | Current state of the snapshot. |
374+
| triggerId | java.lang.String | Trigger ID of the snapshot. |
375+
| triggerTimestamp | java.lang.String | Trigger timestamp of a pending snapshot operation. |
376+
| resultTimestamp | java.lang.String | Timestamp when the snapshot was last created/failed. |
377+
| path | java.lang.String | Final path of the snapshot. |
378+
| error | java.lang.String | Optional error information about the FlinkStateSnapshot. |
379+
| failures | int | Number of failures, used for tracking max retries. |
380+
381+
### State
382+
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State
383+
384+
**Description**: Describes state of a snapshot.
385+
386+
| Value | Docs |
387+
| ----- | ---- |
388+
| COMPLETED | Snapshot was successful and available. |
389+
| FAILED | Error during snapshot. |
390+
| IN_PROGRESS | Snapshot in progress. |
391+
| TRIGGER_PENDING | Not yet processed by the operator. |
392+
| ABANDONED | Snapshot abandoned due to job failure/upgrade. |
393+
288394
### JobManagerDeploymentStatus
289395
**Class**: org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus
290396

@@ -310,6 +416,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
310416
| state | java.lang.String | Last observed state of the job. |
311417
| startTime | java.lang.String | Start time of the job. |
312418
| updateTime | java.lang.String | Update time of the job. |
419+
| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
313420
| savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. |
314421
| checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. |
315422

@@ -356,7 +463,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
356463

357464
| Parameter | Type | Docs |
358465
| ----------| ---- | ---- |
359-
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
466+
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
360467
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
361468
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
362469
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@
158158
<td>Boolean</td>
159159
<td>Whether to enable clean up of savepoint history.</td>
160160
</tr>
161+
<tr>
162+
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
163+
<td style="word-wrap: break-word;">false</td>
164+
<td>Boolean</td>
165+
<td>Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.</td>
166+
</tr>
161167
<tr>
162168
<td><h5>kubernetes.operator.savepoint.format.type</h5></td>
163169
<td style="word-wrap: break-word;">CANONICAL</td>
@@ -182,6 +188,12 @@
182188
<td>Duration</td>
183189
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
184190
</tr>
191+
<tr>
192+
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
193+
<td style="word-wrap: break-word;">true</td>
194+
<td>Boolean</td>
195+
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to legacy mode during runtime if the CRD is not found, even if this value is true.</td>
196+
</tr>
185197
<tr>
186198
<td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
187199
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,12 @@
356356
<td>Boolean</td>
357357
<td>Whether to enable clean up of savepoint history.</td>
358358
</tr>
359+
<tr>
360+
<td><h5>kubernetes.operator.savepoint.dispose-on-delete</h5></td>
361+
<td style="word-wrap: break-word;">false</td>
362+
<td>Boolean</td>
363+
<td>Savepoint data for FlinkStateSnapshot resources created by the operator during upgrades and periodic savepoints will be disposed of automatically when the generated Kubernetes resource is deleted.</td>
364+
</tr>
359365
<tr>
360366
<td><h5>kubernetes.operator.savepoint.format.type</h5></td>
361367
<td style="word-wrap: break-word;">CANONICAL</td>
@@ -392,6 +398,12 @@
392398
<td>Duration</td>
393399
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
394400
</tr>
401+
<tr>
402+
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
403+
<td style="word-wrap: break-word;">true</td>
404+
<td>Boolean</td>
405+
<td>Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to legacy mode during runtime if the CRD is not found, even if this value is true.</td>
406+
</tr>
395407
<tr>
396408
<td><h5>kubernetes.operator.startup.stop-on-informer-error</h5></td>
397409
<td style="word-wrap: break-word;">true</td>

e2e-tests/data/flinkdep-cr.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ spec:
3535
high-availability.storageDir: file:///opt/flink/volume/flink-ha
3636
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
3737
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38+
kubernetes.operator.snapshot.resource.enabled: "false"
3839
serviceAccount: flink
3940
podTemplate:
4041
spec:

e2e-tests/data/sessionjob-cr.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ spec:
3535
high-availability.storageDir: file:///opt/flink/volume/flink-ha
3636
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
3737
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38+
kubernetes.operator.snapshot.resource.enabled: "false"
3839
serviceAccount: flink
3940
podTemplate:
4041
spec:

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,15 @@ public static String readable(Instant instant, ZoneId zoneId) {
4949
ZonedDateTime dateTime = instant.atZone(zoneId);
5050
return dateTime.format(DEFAULT_FORMATTER);
5151
}
52+
53+
/**
54+
* Convert an Instant to a format that is used in Kubernetes.
55+
*
56+
* @param instant The Instant to convert.
57+
* @return The Kubernetes format in the system default zone.
58+
*/
59+
public static String kubernetes(Instant instant) {
60+
ZonedDateTime dateTime = instant.atZone(ZoneId.systemDefault());
61+
return dateTime.format(DateTimeFormatter.ISO_INSTANT);
62+
}
5263
}

flink-kubernetes-operator-api/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ under the License.
202202
<includes>
203203
<include>flinkdeployments.flink.apache.org-v1.yml</include>
204204
<include>flinksessionjobs.flink.apache.org-v1.yml</include>
205+
<include>flinkstatesnapshots.flink.apache.org-v1.yml</include>
205206
</includes>
206207
<filtering>false</filtering>
207208
</resource>

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ public class CrdConstants {
2323
public static final String API_VERSION = "v1beta1";
2424
public static final String KIND_SESSION_JOB = "FlinkSessionJob";
2525
public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";
26+
public static final String KIND_FLINK_STATE_SNAPSHOT = "FlinkStateSnapshot";
2627

2728
public static final String LABEL_TARGET_SESSION = "target.session";
2829

2930
public static final String EPHEMERAL_STORAGE = "ephemeral-storage";
31+
32+
public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type";
3033
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api;
19+
20+
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
22+
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
23+
24+
import com.fasterxml.jackson.annotation.JsonInclude;
25+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
26+
import io.fabric8.kubernetes.api.model.Namespaced;
27+
import io.fabric8.kubernetes.client.CustomResource;
28+
import io.fabric8.kubernetes.model.annotation.Group;
29+
import io.fabric8.kubernetes.model.annotation.ShortNames;
30+
import io.fabric8.kubernetes.model.annotation.Version;
31+
32+
/** Custom resource definition for taking manual savepoints of Flink jobs. */
33+
@Experimental
34+
@JsonInclude(JsonInclude.Include.NON_NULL)
35+
@JsonDeserialize()
36+
@Group(CrdConstants.API_GROUP)
37+
@Version(CrdConstants.API_VERSION)
38+
@ShortNames({"flinksnp"})
39+
public class FlinkStateSnapshot
40+
extends CustomResource<FlinkStateSnapshotSpec, FlinkStateSnapshotStatus>
41+
implements Namespaced {
42+
43+
@Override
44+
public FlinkStateSnapshotSpec initSpec() {
45+
return new FlinkStateSnapshotSpec();
46+
}
47+
}

0 commit comments

Comments
 (0)