Skip to content

Commit d2c0173

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-36109][snapshot] Add labels to snapshot resources
1 parent 28fb289 commit d2c0173

File tree

15 files changed

+344
-25
lines changed

15 files changed

+344
-25
lines changed

docs/content/docs/custom-resource/snapshots.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ This however requires the referenced Flink resource to be alive, as this operati
127127

128128
This feature is not available for checkpoints.
129129

130-
131130
## Triggering snapshots
132131

133132
Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
@@ -208,3 +207,19 @@ Legacy savepoints found in FlinkDeployment/FlinkSessionJob CRs under the depreca
208207
- For max count and FlinkStateSnapshot resources **disabled**, it will be cleaned up when `savepointHistory` exceeds max count
209208
- For max count and FlinkStateSnapshot resources **enabled**, it will be cleaned up when `savepointHistory` + number of FlinkStateSnapshot CRs related to the job exceed max count
210209

210+
211+
## Advanced Snapshot Filtering
212+
213+
At the end of each snapshot reconciliation phase, the operator will update its labels to reflect the latest status and spec of the resources.
214+
This will allow the Kubernetes API server to filter snapshots without having to query all resources, since filtering by status or spec fields of custom resources is not supported in Kubernetes by default.
215+
Example queries with label selectors using `kubectl`:
216+
```shell
217+
# Query all checkpoints
218+
kubectl -n flink get flinksnp -l 'snapshot.type=CHECKPOINT'
219+
220+
# Query all savepoints with states
221+
kubectl -n flink get flinksnp -l 'snapshot.state in (COMPLETED,ABANDONED),snapshot.type=SAVEPOINT'
222+
223+
# Query all savepoints/checkpoints with job reference
224+
kubectl -n flink get flinksnp -l 'job-reference.kind=FlinkDeployment,job-reference.name=test-job'
225+
```

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/CrdConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ public class CrdConstants {
3030
public static final String EPHEMERAL_STORAGE = "ephemeral-storage";
3131

3232
public static final String LABEL_SNAPSHOT_TYPE = "snapshot.type";
33+
public static final String LABEL_SNAPSHOT_TRIGGER_TYPE = "snapshot.trigger-type";
34+
public static final String LABEL_SNAPSHOT_STATE = "snapshot.state";
35+
public static final String LABEL_SNAPSHOT_JOB_REFERENCE_KIND = "job-reference.kind";
36+
public static final String LABEL_SNAPSHOT_JOB_REFERENCE_NAME = "job-reference.name";
3337
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import lombok.Getter;
3434
import lombok.RequiredArgsConstructor;
3535

36+
import java.util.Map;
3637
import java.util.Optional;
3738

3839
/** Context for reconciling a snapshot. */
@@ -42,6 +43,7 @@ public class FlinkStateSnapshotContext {
4243

4344
private final FlinkStateSnapshot resource;
4445
private final FlinkStateSnapshotStatus originalStatus;
46+
private final Map<String, String> originalLabels;
4547
private final Context<FlinkStateSnapshot> josdkContext;
4648
private final FlinkConfigManager configManager;
4749

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.slf4j.LoggerFactory;
4747

4848
import java.time.Duration;
49+
import java.util.HashMap;
4950
import java.util.Map;
5051
import java.util.Objects;
5152
import java.util.Set;
@@ -84,6 +85,8 @@ public UpdateControl<FlinkStateSnapshot> reconcile(
8485
reconciler.reconcile(ctx);
8586
}
8687

88+
updateLabels(ctx);
89+
8790
notifyListenersAndMetricManager(ctx);
8891
return getUpdateControl(ctx);
8992
}
@@ -157,13 +160,27 @@ public Map<String, EventSource> prepareEventSources(
157160
EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context));
158161
}
159162

163+
/**
164+
* Checks whether status and/or labels were changed on this resource, and returns an
165+
* UpdateControl instance accordingly. Unless the snapshot state is terminal, the update control
166+
* will be configured to reschedule the reconciliation.
167+
*
168+
* @param ctx snapshot context
169+
* @return update control
170+
*/
160171
private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotContext ctx) {
161172
var resource = ctx.getResource();
162-
UpdateControl<FlinkStateSnapshot> updateControl;
163-
if (!ctx.getOriginalStatus().equals(resource.getStatus())) {
173+
var updateControl = UpdateControl.<FlinkStateSnapshot>noUpdate();
174+
175+
var labelsChanged = resourceLabelsChanged(ctx);
176+
var statusChanged = resourceStatusChanged(ctx);
177+
178+
if (labelsChanged && statusChanged) {
179+
updateControl = UpdateControl.updateResourceAndPatchStatus(resource);
180+
} else if (labelsChanged) {
181+
updateControl = UpdateControl.updateResource(resource);
182+
} else if (statusChanged) {
164183
updateControl = UpdateControl.patchStatus(resource);
165-
} else {
166-
updateControl = UpdateControl.noUpdate();
167184
}
168185

169186
switch (resource.getStatus().getState()) {
@@ -177,7 +194,7 @@ private UpdateControl<FlinkStateSnapshot> getUpdateControl(FlinkStateSnapshotCon
177194
}
178195

179196
private void notifyListenersAndMetricManager(FlinkStateSnapshotContext ctx) {
180-
if (!ctx.getOriginalStatus().equals(ctx.getResource().getStatus())) {
197+
if (resourceStatusChanged(ctx)) {
181198
statusRecorder.notifyListeners(ctx.getResource(), ctx.getOriginalStatus());
182199
}
183200
metricManager.onUpdate(ctx.getResource());
@@ -201,4 +218,38 @@ private boolean validateSnapshot(FlinkStateSnapshotContext ctx) {
201218
}
202219
return true;
203220
}
221+
222+
/**
223+
* Updates FlinkStateSnapshot resource labels with labels that represent its current state and
224+
* spec.
225+
*
226+
* @param ctx snapshot context
227+
*/
228+
private void updateLabels(FlinkStateSnapshotContext ctx) {
229+
var labels = new HashMap<>(ctx.getResource().getMetadata().getLabels());
230+
labels.putAll(
231+
FlinkStateSnapshotUtils.getSnapshotLabels(
232+
ctx.getResource(), ctx.getSecondaryResource()));
233+
ctx.getResource().getMetadata().setLabels(labels);
234+
}
235+
236+
/**
237+
* Checks if the resource status has changed since the start of reconciliation.
238+
*
239+
* @param ctx snapshot context
240+
* @return true if resource status changed
241+
*/
242+
private boolean resourceStatusChanged(FlinkStateSnapshotContext ctx) {
243+
return !ctx.getOriginalStatus().equals(ctx.getResource().getStatus());
244+
}
245+
246+
/**
247+
* Checks if the resource labels have changed since the start of reconciliation.
248+
*
249+
* @param ctx snapshot context
250+
* @return true if resource labels changed
251+
*/
252+
private boolean resourceLabelsChanged(FlinkStateSnapshotContext ctx) {
253+
return !ctx.getOriginalLabels().equals(ctx.getResource().getMetadata().getLabels());
254+
}
204255
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Instant getLastPeriodicTriggerInstant(
8080
.getLabels()
8181
.get(
8282
CrdConstants
83-
.LABEL_SNAPSHOT_TYPE)))
83+
.LABEL_SNAPSHOT_TRIGGER_TYPE)))
8484
.map(
8585
s ->
8686
DateTimeUtils.parseKubernetes(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3636
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
3737

38+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
39+
3840
import io.javaoperatorsdk.operator.api.reconciler.Context;
3941
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4042
import org.slf4j.Logger;
@@ -76,7 +78,11 @@ public FlinkResourceContextFactory(
7678
public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
7779
FlinkStateSnapshot savepoint, Context<FlinkStateSnapshot> josdkContext) {
7880
return new FlinkStateSnapshotContext(
79-
savepoint, savepoint.getStatus().toBuilder().build(), josdkContext, configManager);
81+
savepoint,
82+
savepoint.getStatus().toBuilder().build(),
83+
ImmutableMap.copyOf(savepoint.getMetadata().getLabels()),
84+
josdkContext,
85+
configManager);
8086
}
8187

8288
public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR> getResourceContext(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class EventSourceUtils {
6161
.map(Enum::name)
6262
.collect(Collectors.joining(","));
6363
var labelSelector =
64-
String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TYPE, labelFilters);
64+
String.format("%s in (%s)", CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, labelFilters);
6565
var configuration =
6666
InformerConfiguration.from(FlinkStateSnapshot.class, context)
6767
.withLabelSelector(labelSelector)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import javax.annotation.Nullable;
4343

4444
import java.time.Instant;
45+
import java.util.HashMap;
46+
import java.util.Map;
4547
import java.util.Optional;
4648
import java.util.Set;
4749
import java.util.function.Supplier;
@@ -66,7 +68,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot(
6668
var metadata = new ObjectMeta();
6769
metadata.setNamespace(namespace);
6870
metadata.setName(name);
69-
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name());
71+
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE, triggerType.name());
7072

7173
var snapshot = new FlinkStateSnapshot();
7274
snapshot.setSpec(spec);
@@ -84,7 +86,7 @@ protected static FlinkStateSnapshot createFlinkStateSnapshot(
8486
*/
8587
public static SnapshotTriggerType getSnapshotTriggerType(FlinkStateSnapshot snapshot) {
8688
var triggerTypeStr =
87-
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE);
89+
snapshot.getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE);
8890
try {
8991
return SnapshotTriggerType.valueOf(triggerTypeStr);
9092
} catch (NullPointerException | IllegalArgumentException e) {
@@ -345,7 +347,9 @@ public static void snapshotSuccessful(
345347
public static void snapshotInProgress(FlinkStateSnapshot snapshot, String triggerId) {
346348
snapshot.getMetadata()
347349
.getLabels()
348-
.putIfAbsent(CrdConstants.LABEL_SNAPSHOT_TYPE, SnapshotTriggerType.MANUAL.name());
350+
.putIfAbsent(
351+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
352+
SnapshotTriggerType.MANUAL.name());
349353
snapshot.getStatus().setState(IN_PROGRESS);
350354
snapshot.getStatus().setTriggerId(triggerId);
351355
snapshot.getStatus().setTriggerTimestamp(DateTimeUtils.kubernetes(Instant.now()));
@@ -361,6 +365,52 @@ public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
361365
snapshot.getStatus().setState(TRIGGER_PENDING);
362366
}
363367

368+
/**
369+
* Creates a map of labels that can be applied to a snapshot resource based on its current spec
370+
* and status. As described in FLINK-36109, we should set up selectable spec fields instead of
371+
* labels once the Kubernetes feature is GA and widely supported.
372+
*
373+
* @param snapshot snapshot instance
374+
* @param secondaryResourceOpt optional referenced Flink resource
375+
* @return map of auto-generated labels
376+
*/
377+
public static Map<String, String> getSnapshotLabels(
378+
FlinkStateSnapshot snapshot,
379+
Optional<AbstractFlinkResource<?, ?>> secondaryResourceOpt) {
380+
var labels = new HashMap<String, String>();
381+
labels.put(
382+
CrdConstants.LABEL_SNAPSHOT_TYPE,
383+
snapshot.getSpec().isSavepoint()
384+
? SnapshotType.SAVEPOINT.name()
385+
: SnapshotType.CHECKPOINT.name());
386+
labels.put(
387+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
388+
snapshot.getMetadata()
389+
.getLabels()
390+
.getOrDefault(
391+
CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE,
392+
SnapshotTriggerType.MANUAL.name()));
393+
394+
Optional.ofNullable(snapshot.getStatus())
395+
.ifPresent(
396+
status ->
397+
labels.put(
398+
CrdConstants.LABEL_SNAPSHOT_STATE,
399+
status.getState().name()));
400+
401+
secondaryResourceOpt.ifPresent(
402+
secondaryResource -> {
403+
labels.put(
404+
CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_KIND,
405+
secondaryResource.getKind());
406+
labels.put(
407+
CrdConstants.LABEL_SNAPSHOT_JOB_REFERENCE_NAME,
408+
secondaryResource.getMetadata().getName());
409+
});
410+
411+
return labels;
412+
}
413+
364414
/**
365415
* Extracts the namespace of the job reference from a snapshot resource. This is either
366416
* explicitly specified in the job reference, or it will fallback to the namespace of the

0 commit comments

Comments
 (0)