Skip to content

Commit 0ef627e

Browse files
authored
[FLINK-36110][snapshot] Store periodic snapshot trigger timestamps in memory
1 parent a10fb45 commit 0ef627e

File tree

9 files changed

+370
-98
lines changed

9 files changed

+370
-98
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.time.Duration;
5050
import java.time.Instant;
5151
import java.util.Collection;
52-
import java.util.Collections;
5352
import java.util.Comparator;
5453
import java.util.HashSet;
5554
import java.util.Set;
@@ -241,14 +240,7 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
241240

242241
/** Clean up and dispose savepoints according to the configured max size/age. */
243242
private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
244-
Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
245-
if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
246-
ctx.getOperatorConfig(), ctx.getObserveConfig())) {
247-
snapshots = ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
248-
if (snapshots == null) {
249-
snapshots = Set.of();
250-
}
251-
}
243+
var snapshots = FlinkStateSnapshotUtils.getFlinkStateSnapshotsSupplier(ctx).get();
252244

253245
cleanupSavepointHistoryLegacy(ctx, snapshots);
254246

@@ -301,7 +293,10 @@ Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
301293

302294
var lastCompleteSnapshot =
303295
snapshotList.stream()
304-
.filter(s -> COMPLETED.equals(s.getStatus().getState()))
296+
.filter(
297+
s ->
298+
s.getStatus() != null
299+
&& COMPLETED.equals(s.getStatus().getState()))
305300
.max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
306301
.orElse(null);
307302

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.reconciler;
19+
20+
import org.apache.flink.autoscaler.utils.DateTimeUtils;
21+
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
22+
import org.apache.flink.kubernetes.operator.api.CrdConstants;
23+
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
24+
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
25+
26+
import io.fabric8.kubernetes.api.model.HasMetadata;
27+
import lombok.RequiredArgsConstructor;
28+
29+
import java.time.Instant;
30+
import java.util.Comparator;
31+
import java.util.Map;
32+
import java.util.Set;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.function.Supplier;
35+
36+
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
37+
import static org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType.PERIODIC;
38+
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
39+
40+
/** Class used to store latest timestamps of periodic checkpoint/savepoint. */
41+
@RequiredArgsConstructor
42+
public class SnapshotTriggerTimestampStore {
43+
private final ConcurrentHashMap<String, Instant> checkpointsLastTriggeredCache =
44+
new ConcurrentHashMap<>();
45+
private final ConcurrentHashMap<String, Instant> savepointsLastTriggeredCache =
46+
new ConcurrentHashMap<>();
47+
48+
/**
49+
* Returns the time a periodic snapshot was last triggered for this resource. This is stored in
50+
* memory, on operator start it will use the latest completed FlinkStateSnapshot CR creation
51+
* timestamp. If none found, the return value will be the max of the resource creation timestamp
52+
* and the latest triggered legacy snapshot, and in this case the memory store will also be
53+
* updated with this value.
54+
*
55+
* @param resource Flink resource
56+
* @param snapshotsSupplier supplies related snapshot resources
57+
* @return instant of last trigger
58+
*/
59+
public Instant getLastPeriodicTriggerInstant(
60+
AbstractFlinkResource<?, ?> resource,
61+
SnapshotType snapshotType,
62+
Supplier<Set<FlinkStateSnapshot>> snapshotsSupplier) {
63+
var cache = getCacheForSnapshotType(snapshotType);
64+
if (cache.containsKey(resource.getMetadata().getUid())) {
65+
return cache.get(resource.getMetadata().getUid());
66+
}
67+
68+
var instantOpt =
69+
snapshotsSupplier.get().stream()
70+
.filter(
71+
s ->
72+
s.getStatus() != null
73+
&& COMPLETED.equals(s.getStatus().getState()))
74+
.filter(s -> (snapshotType == SAVEPOINT) == s.getSpec().isSavepoint())
75+
.filter(
76+
s ->
77+
PERIODIC.name()
78+
.equals(
79+
s.getMetadata()
80+
.getLabels()
81+
.get(
82+
CrdConstants
83+
.LABEL_SNAPSHOT_TYPE)))
84+
.map(
85+
s ->
86+
DateTimeUtils.parseKubernetes(
87+
s.getMetadata().getCreationTimestamp()))
88+
.max(Comparator.naturalOrder());
89+
if (instantOpt.isPresent()) {
90+
return instantOpt.get();
91+
}
92+
93+
var legacyInstant = getLegacyTimestamp(resource, snapshotType);
94+
var creationInstant = Instant.parse(resource.getMetadata().getCreationTimestamp());
95+
var maxInstant =
96+
legacyInstant.compareTo(creationInstant) > 0 ? legacyInstant : creationInstant;
97+
98+
updateLastPeriodicTriggerTimestamp(resource, snapshotType, maxInstant);
99+
return maxInstant;
100+
}
101+
102+
/**
103+
* Updates the time a periodic snapshot was last triggered for this resource.
104+
*
105+
* @param resource Kubernetes resource
106+
* @param instant new timestamp
107+
*/
108+
public void updateLastPeriodicTriggerTimestamp(
109+
HasMetadata resource, SnapshotType snapshotType, Instant instant) {
110+
getCacheForSnapshotType(snapshotType).put(resource.getMetadata().getUid(), instant);
111+
}
112+
113+
private Map<String, Instant> getCacheForSnapshotType(SnapshotType snapshotType) {
114+
switch (snapshotType) {
115+
case SAVEPOINT:
116+
return savepointsLastTriggeredCache;
117+
case CHECKPOINT:
118+
return checkpointsLastTriggeredCache;
119+
default:
120+
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
121+
}
122+
}
123+
124+
private Instant getLegacyTimestamp(
125+
AbstractFlinkResource<?, ?> resource, SnapshotType snapshotType) {
126+
SnapshotInfo snapshotInfo;
127+
switch (snapshotType) {
128+
case SAVEPOINT:
129+
snapshotInfo = resource.getStatus().getJobStatus().getSavepointInfo();
130+
break;
131+
case CHECKPOINT:
132+
snapshotInfo = resource.getStatus().getJobStatus().getCheckpointInfo();
133+
break;
134+
default:
135+
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
136+
}
137+
138+
return Instant.ofEpochMilli(snapshotInfo.getLastPeriodicTriggerTimestamp());
139+
}
140+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
3838
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
3939
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
40+
import org.apache.flink.kubernetes.operator.reconciler.SnapshotTriggerTimestampStore;
4041
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
4142
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
4243
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -74,6 +75,9 @@ public abstract class AbstractJobReconciler<
7475

7576
public static final String LAST_STATE_DUMMY_SP_PATH = "KUBERNETES_OPERATOR_LAST_STATE";
7677

78+
private final SnapshotTriggerTimestampStore snapshotTriggerTimestampStore =
79+
new SnapshotTriggerTimestampStore();
80+
7781
public AbstractJobReconciler(
7882
EventRecorder eventRecorder,
7983
StatusRecorder<CR, STATUS> statusRecorder,
@@ -374,13 +378,24 @@ private boolean triggerSnapshotIfNeeded(FlinkResourceContext<CR> ctx, SnapshotTy
374378
var resource = ctx.getResource();
375379
var conf = ctx.getObserveConfig();
376380

377-
Optional<SnapshotTriggerType> triggerOpt =
378-
SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType);
381+
var lastTrigger =
382+
snapshotTriggerTimestampStore.getLastPeriodicTriggerInstant(
383+
resource,
384+
snapshotType,
385+
FlinkStateSnapshotUtils.getFlinkStateSnapshotsSupplier(ctx));
386+
387+
var triggerOpt =
388+
SnapshotUtils.shouldTriggerSnapshot(resource, conf, snapshotType, lastTrigger);
379389
if (triggerOpt.isEmpty()) {
380390
return false;
381391
}
382392
var triggerType = triggerOpt.get();
383393

394+
if (SnapshotTriggerType.PERIODIC.equals(triggerType)) {
395+
snapshotTriggerTimestampStore.updateLastPeriodicTriggerTimestamp(
396+
resource, snapshotType, Instant.now());
397+
}
398+
384399
var createSnapshotResource =
385400
FlinkStateSnapshotUtils.isSnapshotResourceEnabled(ctx.getOperatorConfig(), conf);
386401

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,22 @@
3131
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
3232
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3333
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
34+
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
3435
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3536
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
3637

3738
import io.fabric8.kubernetes.api.model.ObjectMeta;
3839
import io.fabric8.kubernetes.client.KubernetesClient;
3940
import io.javaoperatorsdk.operator.processing.event.ResourceID;
41+
import org.apache.commons.lang3.ObjectUtils;
4042
import org.apache.commons.lang3.StringUtils;
4143

4244
import javax.annotation.Nullable;
4345

4446
import java.time.Instant;
4547
import java.util.Optional;
48+
import java.util.Set;
49+
import java.util.function.Supplier;
4650

4751
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
4852
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
@@ -295,6 +299,27 @@ public static FlinkStateSnapshotReference createReferenceForUpgradeSavepoint(
295299
return FlinkStateSnapshotReference.fromResource(snapshot);
296300
}
297301

302+
/**
303+
* Returns a supplier of all relevant FlinkStateSnapshot resources for a given Flink resource.
304+
* If FlinkStateSnapshot resources are disabled, the supplier returns an empty set.
305+
*
306+
* @param ctx resource context
307+
* @return supplier for FlinkStateSnapshot resources
308+
*/
309+
public static Supplier<Set<FlinkStateSnapshot>> getFlinkStateSnapshotsSupplier(
310+
FlinkResourceContext<?> ctx) {
311+
return () -> {
312+
if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
313+
ctx.getOperatorConfig(), ctx.getObserveConfig())) {
314+
return ObjectUtils.firstNonNull(
315+
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class),
316+
Set.of());
317+
} else {
318+
return Set.of();
319+
}
320+
};
321+
}
322+
298323
/**
299324
* Abandons a FlinkStateSnapshot resource if the referenced job is not found or not running.
300325
*

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,10 @@ public static SnapshotStatus getLastSnapshotStatus(
136136
*/
137137
@VisibleForTesting
138138
public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
139-
AbstractFlinkResource<?, ?> resource, Configuration conf, SnapshotType snapshotType) {
139+
AbstractFlinkResource<?, ?> resource,
140+
Configuration conf,
141+
SnapshotType snapshotType,
142+
Instant lastTrigger) {
140143

141144
var status = resource.getStatus();
142145
var jobStatus = status.getJobStatus();
@@ -153,23 +156,20 @@ public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
153156
Long triggerNonce;
154157
Long reconciledTriggerNonce;
155158
boolean inProgress;
156-
SnapshotInfo snapshotInfo;
157159
String automaticTriggerExpression;
158160

159161
switch (snapshotType) {
160162
case SAVEPOINT:
161163
triggerNonce = jobSpec.getSavepointTriggerNonce();
162164
reconciledTriggerNonce = reconciledJobSpec.getSavepointTriggerNonce();
163165
inProgress = savepointInProgress(jobStatus);
164-
snapshotInfo = jobStatus.getSavepointInfo();
165166
automaticTriggerExpression =
166167
conf.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
167168
break;
168169
case CHECKPOINT:
169170
triggerNonce = jobSpec.getCheckpointTriggerNonce();
170171
reconciledTriggerNonce = reconciledJobSpec.getCheckpointTriggerNonce();
171172
inProgress = checkpointInProgress(jobStatus);
172-
snapshotInfo = jobStatus.getCheckpointInfo();
173173
automaticTriggerExpression =
174174
conf.get(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
175175
break;
@@ -193,14 +193,6 @@ public static Optional<SnapshotTriggerType> shouldTriggerSnapshot(
193193
}
194194
}
195195

196-
var lastTriggerTs = snapshotInfo.getLastPeriodicTriggerTimestamp();
197-
// When the resource is first created/periodic snapshotting enabled we have to compare
198-
// against the creation timestamp for triggering the first periodic savepoint
199-
var lastTrigger =
200-
lastTriggerTs == 0
201-
? Instant.parse(resource.getMetadata().getCreationTimestamp())
202-
: Instant.ofEpochMilli(lastTriggerTs);
203-
204196
if (shouldTriggerAutomaticSnapshot(snapshotType, automaticTriggerExpression, lastTrigger)) {
205197
if (snapshotType == CHECKPOINT && !isSnapshotTriggeringSupported(conf)) {
206198
LOG.warn(

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
2626
import org.apache.flink.kubernetes.operator.api.spec.JobReference;
2727
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
28-
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
2928
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
30-
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
31-
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
3229
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
3330
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3431
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
@@ -408,33 +405,17 @@ public static void reconcileSpec(FlinkDeployment deployment) {
408405
* Sets up an active cron trigger by ensuring that the latest successful snapshot happened
409406
* earlier than the scheduled trigger.
410407
*/
411-
public static void setupCronTrigger(SnapshotType snapshotType, FlinkDeployment deployment) {
408+
public static Instant setupCronTrigger(SnapshotType snapshotType, FlinkDeployment deployment) {
412409

413410
Calendar calendar = Calendar.getInstance();
414411
calendar.set(2022, Calendar.JUNE, 5, 11, 0);
415-
long lastCheckpointTimestamp = calendar.getTimeInMillis();
416412

417413
String cronOptionKey;
418-
419414
switch (snapshotType) {
420415
case SAVEPOINT:
421-
Savepoint lastSavepoint =
422-
Savepoint.of("", lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
423-
deployment
424-
.getStatus()
425-
.getJobStatus()
426-
.getSavepointInfo()
427-
.updateLastSavepoint(lastSavepoint);
428416
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key();
429417
break;
430418
case CHECKPOINT:
431-
Checkpoint lastCheckpoint =
432-
Checkpoint.of(lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
433-
deployment
434-
.getStatus()
435-
.getJobStatus()
436-
.getCheckpointInfo()
437-
.updateLastCheckpoint(lastCheckpoint);
438419
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key();
439420
break;
440421
default:
@@ -443,6 +424,7 @@ public static void setupCronTrigger(SnapshotType snapshotType, FlinkDeployment d
443424

444425
deployment.getSpec().getFlinkConfiguration().put(cronOptionKey, "0 0 12 5 6 ? 2022");
445426
reconcileSpec(deployment);
427+
return calendar.toInstant();
446428
}
447429

448430
/** Testing ResponseProvider. */

0 commit comments

Comments
 (0)