Skip to content

Commit 3b27032

Browse files
committed
[FLINK-36109][snapshot] Add labels to snapshot resources
1 parent 699acae commit 3b27032

File tree

15 files changed

+344
-25
lines changed

15 files changed

+344
-25
lines changed

docs/content/docs/custom-resource/snapshots.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ This however requires the referenced Flink resource to be alive, as this operati
127127

128128
This feature is not available for checkpoints.
129129

130-
131130
## Triggering snapshots
132131

133132
Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
@@ -208,3 +207,19 @@ Legacy savepoints found in FlinkDeployment/FlinkSessionJob CRs under the depreca
208207
- For max count and FlinkStateSnapshot resources **disabled**, it will be cleaned up when `savepointHistory` exceeds max count
209208
- For max count and FlinkStateSnapshot resources **enabled**, it will be cleaned up when `savepointHistory` + number of FlinkStateSnapshot CRs related to the job exceed max count
210209

210+
211+
## Advanced Snapshot Filtering
212+
213+
At the end of each snapshot reconciliation phase, the operator will update its labels to reflect the latest status and spec of the resources.
214+
This will allow the Kubernetes API server to filter snapshots without having to query all resources, since filtering by status or spec fields of custom resources is not supported in Kubernetes by default.
215+
Example queries with label selectors using `kubectl`:
216+
```shell
217+
# Query all checkpoints
218+
kubectl -n flink get flinksnp -l 'snapshot.type=CHECKPOINT'
219+
220+
# Query all savepoints with states
221+
kubectl -n flink get flinksnp -l 'snapshot.state in (COMPLETED,ABANDONED),snapshot.type=SAVEPOINT'
222+
223+
# Query all savepoints/checkpoints with job reference
224+
kubectl -n flink get flinksnp -l 'job-reference.kind=FlinkDeployment,job-reference.name=test-job'
225+
```

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ public class CrdConstants {
3030
public static final String EPHEMERAL_STORAGE = "ephemeral-storage";
3131

3232
public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type";
33+
public static final String LABEL_SNAPSHOT_TRIGGER_TYPE = "snapshot.trigger-type";
34+
public static final String LABEL_SNAPSHOT_STATE = "snapshot.state";
35+
public static final String LABEL_SNAPSHOT_JOB_REFERENCE_KIND = "job-reference.kind";
36+
public static final String LABEL_SNAPSHOT_JOB_REFERENCE_NAME = "job-reference.name";
3337
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import lombok.Getter;
3434
import lombok.RequiredArgsConstructor;
3535

36+
import java.util.Map;
3637
import java.util.Optional;
3738

3839
/** Context for reconciling a snapshot. */
@@ -42,6 +43,7 @@ public class FlinkStateSnapshotContext {
4243

4344
private final FlinkStateSnapshot resource;
4445
private final FlinkStateSnapshotStatus originalStatus;
46+
private final Map<String, String> originalLabels;
4547
private final Context<FlinkStateSnapshot> josdkContext;
4648
private final FlinkConfigManager configManager;
4749

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.slf4j.LoggerFactory;
4646

4747
import java.time.Duration;
48+
import java.util.HashMap;
4849
import java.util.Map;
4950
import java.util.Objects;
5051
import java.util.Set;
@@ -82,6 +83,8 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
8283
reconciler.reconcile(ctx);
8384
}
8485

86+
updateLabels(ctx);
87+
8588
notifyListeners(ctx);
8689
return getUpdateControl(ctx);
8790
}
@@ -154,13 +157,27 @@ public Map<String, EventSource> prepareEventSources(
154157
EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context));
155158
}
156159

160+
/**
161+
* Checks whether status and/or labels were changed on this resource, and returns an
162+
* UpdateControl instance accordingly. Unless the snapshot state is terminal, the update control
163+
* will be configured to reschedule the reconciliation.
164+
*
165+
* @param ctx snapshot context
166+
* @return update control
167+
*/
157168
private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotContext ctx) {
158169
var resource = ctx.getResource();
159-
UpdateControl<FlinkStateSnapshot> updateControl;
160-
if (!ctx.getOriginalStatus().equals(resource.getStatus())) {
170+
var updateControl = UpdateControl.<FlinkStateSnapshot>noUpdate();
171+
172+
var labelsChanged = resourceLabelsChanged(ctx);
173+
var statusChanged = resourceStatusChanged(ctx);
174+
175+
if (labelsChanged && statusChanged) {
176+
updateControl = UpdateControl.updateResourceAndPatchStatus(resource);
177+
} else if (labelsChanged) {
178+
updateControl = UpdateControl.updateResource(resource);
179+
} else if (statusChanged) {
161180
updateControl = UpdateControl.patchStatus(resource);
162-
} else {
163-
updateControl = UpdateControl.noUpdate();
164181
}
165182

166183
switch (resource.getStatus().getState()) {
@@ -174,7 +191,7 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
174191
}
175192

176193
private void notifyListeners(FlinkStateSnapshotContext ctx) {
177-
if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) {
194+
if (resourceStatusChanged(ctx)) {
178195
statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus());
179196
}
180197
}
@@ -197,4 +214,38 @@ private boolean validateSnapshot(FlinkStateSnapshotContext ctx) {
197214
}
198215
return true;
199216
}
217+
218+
/**
219+
* Updates FlinkStateSnapshot resource labels with labels that represent its current state and
220+
* spec.
221+
*
222+
* @param ctx snapshot context
223+
*/
224+
private void updateLabels(FlinkStateSnapshotContext ctx) {
225+
var labels = new HashMap<>(ctx.getResource().getMetadata().getLabels());
226+
labels.putAll(
227+
FlinkStateSnapshotUtils.getSnapshotLabels(
228+
ctx.getResource(), ctx.getSecondaryResource()));
229+
ctx.getResource().getMetadata().setLabels(labels);
230+
}
231+
232+
/**
233+
* Checks if the resource status has changed since the start of reconciliation.
234+
*
235+
* @param ctx snapshot context
236+
* @return true if resource status changed
237+
*/
238+
private boolean resourceStatusChanged(FlinkStateSnapshotContext ctx) {
239+
return !ctx.getOriginalStatus().equals(ctx.getResource().getStatus());
240+
}
241+
242+
/**
243+
* Checks if the resource labels have changed since the start of reconciliation.
244+
*
245+
* @param ctx snapshot context
246+
* @return true if resource labels changed
247+
*/
248+
private boolean resourceLabelsChanged(FlinkStateSnapshotContext ctx) {
249+
return !ctx.getOriginalLabels().equals(ctx.getResource().getMetadata().getLabels());
250+
}
200251
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Instant getLastPeriodicTriggerInstant(
8080
.getLabels()
8181
.get(
8282
CrdConstants
83-
.LABEL_SNAPSHOT_TYPE)))
83+
.LABEL_SNAPSHOT_TRIGGER_TYPE)))
8484
.map(
8585
s ->
8686
DateTimeUtils.parseKubernetes(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3636
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
3737

38+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
39+
3840
import io.javaoperatorsdk.operator.api.reconciler.Context;
3941
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4042
import org.slf4j.Logger;
@@ -76,7 +78,11 @@ public FlinkResourceContextFactory(
7678
public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
7779
FlinkStateSnapshot savepoint, Context<FlinkStateSnapshot> josdkContext) {
7880
return new FlinkStateSnapshotContext(
79-
savepoint, savepoint.getStatus().toBuilder().build(), josdkContext, configManager);
81+
savepoint,
82+
savepoint.getStatus().toBuilder().build(),
83+
ImmutableMap.copyOf(savepoint.getMetadata().getLabels()),
84+
josdkContext,
85+
configManager);
8086
}
8187

8288
public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR> getResourceContext(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class EventSourceUtils {
6161
.map(Enum::name)
6262
.collect(Collectors.joining(","));
6363
var labelSelector =
64-
String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TYPE, labelFilters);
64+
String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, labelFilters);
6565
var configuration =
6666
InformerConfiguration.from(FlinkStateSnapshot.class, context)
6767
.withLabelSelector(labelSelector)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import javax.annotation.Nullable;
4343

4444
import java.time.Instant;
45+
import java.util.HashMap;
46+
import java.util.Map;
4547
import java.util.Optional;
4648
import java.util.Set;
4749
import java.util.function.Supplier;
@@ -66,7 +68,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot(
6668
var metadata = new ObjectMeta();
6769
metadata.setNamespace(namespace);
6870
metadata.setName(name);
69-
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name());
71+
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, triggerType.name());
7072

7173
var snapshot = new FlinkStateSnapshot();
7274
snapshot.setSpec(spec);
@@ -84,7 +86,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot(
8486
*/
8587
public static SnapshotTriggerType getSnapshotTriggerType(FlinkStateSnapshot snapshot) {
8688
var triggerTypeStr =
87-
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE);
89+
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE);
8890
try {
8991
return SnapshotTriggerType.valueOf(triggerTypeStr);
9092
} catch (NullPointerException | IllegalArgumentException e) {
@@ -345,7 +347,9 @@ public static void snapshotSuccessful(
345347
public static void snapshotInProgress(FlinkStateSnapshot snapshot, String triggerId) {
346348
snapshot.getMetadata()
347349
.getLabels()
348-
.putIfAbsent(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name());
350+
.putIfAbsent(
351+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
352+
SnapshotTriggerType.MANUAL.name());
349353
snapshot.getStatus().setState(IN_PROGRESS);
350354
snapshot.getStatus().setTriggerId(triggerId);
351355
snapshot.getStatus().setTriggerTimestamp(DateTimeUtils.kubernetes(Instant.now()));
@@ -361,6 +365,52 @@ public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
361365
snapshot.getStatus().setState(TRIGGER_PENDING);
362366
}
363367

368+
/**
369+
* Creates a map of labels that can be applied to a snapshot resource based on its current spec
370+
* and status. As described in FLINK-36109, we should set up selectable spec fields instead of
371+
* labels once the Kubernetes feature is GA and widely supported.
372+
*
373+
* @param snapshot snapshot instance
374+
* @param secondaryResourceOpt optional referenced Flink resource
375+
* @return map of auto-generated labels
376+
*/
377+
public static Map<String, String> getSnapshotLabels(
378+
FlinkStateSnapshot snapshot,
379+
Optional<AbstractFlinkResource<?, ?>> secondaryResourceOpt) {
380+
var labels = new HashMap<String, String>();
381+
labels.put(
382+
CrdConstants.LABEL_SNAPSHOT_TYPE,
383+
snapshot.getSpec().isSavepoint()
384+
? SnapshotType.SAVEPOINT.name()
385+
: SnapshotType.CHECKPOINT.name());
386+
labels.put(
387+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
388+
snapshot.getMetadata()
389+
.getLabels()
390+
.getOrDefault(
391+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
392+
SnapshotTriggerType.MANUAL.name()));
393+
394+
Optional.ofNullable(snapshot.getStatus())
395+
.ifPresent(
396+
status ->
397+
labels.put(
398+
CrdConstants.LABEL_SNAPSHOT_STATE,
399+
status.getState().name()));
400+
401+
secondaryResourceOpt.ifPresent(
402+
secondaryResource -> {
403+
labels.put(
404+
CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND,
405+
secondaryResource.getKind());
406+
labels.put(
407+
CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME,
408+
secondaryResource.getMetadata().getName());
409+
});
410+
411+
return labels;
412+
}
413+
364414
/**
365415
* Extracts the namespace of the job reference from a snapshot resource. This is either
366416
* explicitly specified in the job reference, or it will fallback to the namespace of the

0 commit comments

Comments
 (0)