Skip to content

Commit e711ac0

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35265] Remove type field from CheckpointSpec
1 parent 0f39014 commit e711ac0

File tree

8 files changed

+18
-44
lines changed

8 files changed

+18
-44
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
5151
### CheckpointSpec
5252
**Class**: org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec
5353

54-
**Description**: Spec for checkpoint state snapshots.
54+
**Description**: Spec for checkpoint state snapshots. This is an empty class, used to instruct the operator to
55+
trigger a checkpoint.
5556

5657
| Parameter | Type | Docs |
5758
| ----------| ---- | ---- |
58-
| checkpointType | org.apache.flink.kubernetes.operator.api.status.CheckpointType | Type of checkpoint to take. |
5959

6060
### FlinkDeploymentSpec
6161
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/CheckpointSpec.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,19 @@
1818
package org.apache.flink.kubernetes.operator.api.spec;
1919

2020
import org.apache.flink.annotation.Experimental;
21-
import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
2221

2322
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
24-
import lombok.AllArgsConstructor;
2523
import lombok.Builder;
2624
import lombok.Data;
2725
import lombok.NoArgsConstructor;
2826

29-
/** Spec for checkpoint state snapshots. */
27+
/**
28+
* Spec for checkpoint state snapshots. This is an empty class, used to instruct the operator to
29+
* trigger a checkpoint.
30+
*/
3031
@Experimental
3132
@Data
3233
@NoArgsConstructor
33-
@AllArgsConstructor
3434
@Builder
3535
@JsonIgnoreProperties(ignoreUnknown = true)
36-
public class CheckpointSpec {
37-
/** Type of checkpoint to take. */
38-
private CheckpointType checkpointType = CheckpointType.FULL;
39-
}
36+
public class CheckpointSpec {}

flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,10 @@ public static FlinkStateSnapshot buildFlinkStateSnapshotCheckpoint(
249249
String namespace,
250250
CheckpointType checkpointType,
251251
JobReference jobReference) {
252-
var checkpointSpec = CheckpointSpec.builder().checkpointType(checkpointType).build();
253252
var spec =
254253
FlinkStateSnapshotSpec.builder()
255254
.jobReference(jobReference)
256-
.checkpoint(checkpointSpec)
255+
.checkpoint(new CheckpointSpec())
257256
.build();
258257
var snapshot = new FlinkStateSnapshot();
259258
snapshot.setSpec(spec);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,15 +412,15 @@ private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotTy
412412

413413
break;
414414
case CHECKPOINT:
415-
var checkpointType =
416-
conf.get(KubernetesOperatorConfigOptions.OPERATOR_CHECKPOINT_TYPE);
417415
if (createSnapshotResource) {
418416
FlinkStateSnapshotUtils.createCheckpointResource(
419-
ctx.getKubernetesClient(), resource, checkpointType, triggerType);
417+
ctx.getKubernetesClient(), resource, triggerType);
420418

421419
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
422420
triggerType, resource, CHECKPOINT);
423421
} else {
422+
var checkpointType =
423+
conf.get(KubernetesOperatorConfigOptions.OPERATOR_CHECKPOINT_TYPE);
424424
var triggerId =
425425
ctx.getFlinkService()
426426
.triggerCheckpoint(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/snapshot/StateSnapshotReconciler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.reconciler.snapshot;
1919

2020
import org.apache.flink.configuration.CheckpointingOptions;
21+
import org.apache.flink.core.execution.CheckpointType;
2122
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2223
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
2324
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
@@ -216,12 +217,7 @@ private Optional<String> triggerCheckpointOrSavepoint(
216217
throw new IllegalArgumentException(
217218
"Manual checkpoint triggering is not supported for this Flink job (requires Flink 1.17+)");
218219
}
219-
return Optional.of(
220-
flinkService.triggerCheckpoint(
221-
jobId,
222-
org.apache.flink.core.execution.CheckpointType.valueOf(
223-
spec.getCheckpoint().getCheckpointType().name()),
224-
conf));
220+
return Optional.of(flinkService.triggerCheckpoint(jobId, CheckpointType.FULL, conf));
225221
} else {
226222
throw new IllegalArgumentException(
227223
"Snapshot must specify either savepoint or checkpoint spec");

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec;
2828
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
2929
import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
30-
import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
3130
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
3231
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
3332
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -176,21 +175,17 @@ public static FlinkStateSnapshot createSavepointResource(
176175
*
177176
* @param kubernetesClient kubernetes client
178177
* @param resource Flink resource associated
179-
* @param checkpointType type of checkpoint
180178
* @param triggerType trigger type
181179
* @return created snapshot
182180
*/
183181
public static FlinkStateSnapshot createCheckpointResource(
184182
KubernetesClient kubernetesClient,
185183
AbstractFlinkResource<?, ?> resource,
186-
CheckpointType checkpointType,
187184
SnapshotTriggerType triggerType) {
188-
var checkpointSpec = CheckpointSpec.builder().checkpointType(checkpointType).build();
189-
190185
var snapshotSpec =
191186
FlinkStateSnapshotSpec.builder()
192187
.jobReference(JobReference.fromFlinkResource(resource))
193-
.checkpoint(checkpointSpec)
188+
.checkpoint(new CheckpointSpec())
194189
.build();
195190

196191
var resourceName = getFlinkStateSnapshotName(CHECKPOINT, triggerType, resource);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
2828
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
2929
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
30-
import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
3130
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
3231
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
3332
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -173,14 +172,12 @@ public void testGetFlinkStateSnapshotsForResource() {
173172
@Test
174173
public void testCreatePeriodicCheckpointResource() {
175174
var deployment = initDeployment();
176-
var checkpointType = CheckpointType.FULL;
177175

178176
var snapshot =
179177
FlinkStateSnapshotUtils.createCheckpointResource(
180-
client, deployment, checkpointType, SnapshotTriggerType.PERIODIC);
178+
client, deployment, SnapshotTriggerType.PERIODIC);
181179

182180
assertTrue(snapshot.getSpec().isCheckpoint());
183-
assertEquals(checkpointType, snapshot.getSpec().getCheckpoint().getCheckpointType());
184181
assertEquals(
185182
deployment.getMetadata().getName(), snapshot.getSpec().getJobReference().getName());
186183
assertEquals(JobKind.FLINK_DEPLOYMENT, snapshot.getSpec().getJobReference().getKind());
@@ -238,9 +235,8 @@ public void testCreateCheckpointResource() {
238235

239236
var snapshot =
240237
FlinkStateSnapshotUtils.createCheckpointResource(
241-
client, deployment, CheckpointType.FULL, SnapshotTriggerType.MANUAL);
242-
assertCheckpointResource(
243-
snapshot, deployment, SnapshotTriggerType.MANUAL, CheckpointType.FULL);
238+
client, deployment, SnapshotTriggerType.MANUAL);
239+
assertCheckpointResource(snapshot, deployment, SnapshotTriggerType.MANUAL);
244240
}
245241

246242
@ParameterizedTest
@@ -389,13 +385,11 @@ private void assertSavepointResource(
389385
private void assertCheckpointResource(
390386
FlinkStateSnapshot snapshot,
391387
FlinkDeployment deployment,
392-
SnapshotTriggerType triggerType,
393-
CheckpointType checkpointType) {
388+
SnapshotTriggerType triggerType) {
394389
assertEquals(
395390
triggerType.name(),
396391
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE));
397392
assertTrue(snapshot.getSpec().isCheckpoint());
398-
assertEquals(checkpointType, snapshot.getSpec().getCheckpoint().getCheckpointType());
399393

400394
assertEquals(
401395
deployment.getMetadata().getName(), snapshot.getSpec().getJobReference().getName());

helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@ spec:
3838
backoffLimit:
3939
type: integer
4040
checkpoint:
41-
properties:
42-
checkpointType:
43-
enum:
44-
- FULL
45-
- INCREMENTAL
46-
- UNKNOWN
47-
type: string
4841
type: object
4942
jobReference:
5043
properties:

0 commit comments

Comments
 (0)