Skip to content

Commit 9819665

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35265] Add namespace field to JobReference
1 parent 148f835 commit 9819665

File tree

9 files changed

+68
-35
lines changed

9 files changed

+68
-35
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
171171
| ----------| ---- | ---- |
172172
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
173173
| name | java.lang.String | Name of the Flink resource. |
174+
| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. |
174175

175176
### JobSpec
176177
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,16 @@ public class JobReference {
4444
/** Name of the Flink resource. */
4545
private String name;
4646

47+
/**
48+
* Namespace of the Flink resource. If empty, the operator will use the namespace of the
49+
* snapshot.
50+
*/
51+
private String namespace;
52+
4753
public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?> flinkResource) {
4854
var result = new JobReference();
4955
result.setName(flinkResource.getMetadata().getName());
56+
result.setNamespace(flinkResource.getMetadata().getNamespace());
5057

5158
if (flinkResource instanceof FlinkDeployment) {
5259
result.setKind(JobKind.FLINK_DEPLOYMENT);
@@ -64,6 +71,6 @@ public String toString() {
6471
} else if (kind == JobKind.FLINK_SESSION_JOB) {
6572
kindString = CrdConstants.KIND_SESSION_JOB;
6673
}
67-
return String.format("%s/%s", kindString, name);
74+
return String.format("%s/%s (%s)", namespace, name, kindString);
6875
}
6976
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
181181
return Set.of();
182182
}
183183
return Set.of(
184-
new ResourceID(
185-
snapshot.getSpec()
186-
.getJobReference()
187-
.getName(),
188-
snapshot.getMetadata().getNamespace()));
184+
FlinkStateSnapshotUtils
185+
.getSnapshotJobReferenceResourceId(
186+
snapshot));
189187
})
190188
.withNamespacesInheritedFromController(context)
191189
.followNamespaceChanges(true)
@@ -206,12 +204,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
206204

207205
// If FlinkSessionJob, retrieve deployment
208206
var resourceId =
209-
new ResourceID(
210-
snapshot.getSpec()
211-
.getJobReference()
212-
.getName(),
213-
snapshot.getMetadata()
214-
.getNamespace());
207+
FlinkStateSnapshotUtils
208+
.getSnapshotJobReferenceResourceId(
209+
snapshot);
215210
var flinkSessionJob =
216211
flinkSessionJobEventSource
217212
.get(resourceId)
@@ -226,11 +221,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources(
226221
.getNamespace()));
227222
}
228223
return Set.of(
229-
new ResourceID(
230-
snapshot.getSpec()
231-
.getJobReference()
232-
.getName(),
233-
snapshot.getMetadata().getNamespace()));
224+
FlinkStateSnapshotUtils
225+
.getSnapshotJobReferenceResourceId(
226+
snapshot));
234227
})
235228
.withNamespacesInheritedFromController(context)
236229
.followNamespaceChanges(true)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636

3737
import io.fabric8.kubernetes.api.model.ObjectMeta;
3838
import io.fabric8.kubernetes.client.KubernetesClient;
39+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3940
import org.apache.commons.lang3.StringUtils;
4041

4142
import javax.annotation.Nullable;
4243

4344
import java.time.Instant;
45+
import java.util.Optional;
4446

4547
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
4648
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
@@ -373,4 +375,19 @@ public static void snapshotInProgress(FlinkStateSnapshot snapshot, String trigge
373375
public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) {
374376
snapshot.getStatus().setState(TRIGGER_PENDING);
375377
}
378+
379+
/**
380+
* Extracts the namespace of the job reference from a snapshot resource. This is either
381+
* explicitly specified in the job reference, or it will fallback to the namespace of the
382+
* snapshot.
383+
*
384+
* @param snapshot snapshot resource
385+
* @return namespace with the job reference to be found in
386+
*/
387+
public static ResourceID getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) {
388+
var namespace =
389+
Optional.ofNullable(snapshot.getSpec().getJobReference().getNamespace())
390+
.orElse(snapshot.getMetadata().getNamespace());
391+
return new ResourceID(snapshot.getSpec().getJobReference().getName(), namespace);
392+
}
376393
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
4949
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
5050
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
51+
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
5152
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
5253
import org.apache.flink.kubernetes.utils.Constants;
5354
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -501,8 +502,8 @@ public Optional<String> validateSessionJob(
501502

502503
@Override
503504
public Optional<String> validateStateSnapshot(
504-
FlinkStateSnapshot savepoint, Optional<AbstractFlinkResource<?, ?>> target) {
505-
var spec = savepoint.getSpec();
505+
FlinkStateSnapshot snapshot, Optional<AbstractFlinkResource<?, ?>> target) {
506+
var spec = snapshot.getSpec();
506507

507508
if ((!spec.isSavepoint() && !spec.isCheckpoint())
508509
|| (spec.isSavepoint() && spec.isCheckpoint())) {
@@ -522,13 +523,14 @@ public Optional<String> validateStateSnapshot(
522523
// If the savepoint has already been processed by the operator, we don't need to check the
523524
// job reference.
524525
if (target.isEmpty()
525-
&& (savepoint.getStatus() == null
526+
&& (snapshot.getStatus() == null
526527
|| FlinkStateSnapshotStatus.State.TRIGGER_PENDING.equals(
527-
savepoint.getStatus().getState()))) {
528+
snapshot.getStatus().getState()))) {
529+
var resourceId = FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot);
528530
return Optional.of(
529531
String.format(
530-
"Target for snapshot (%s) in namespace %s was not found",
531-
spec.getJobReference(), savepoint.getMetadata().getNamespace()));
532+
"Target for snapshot %s/%s was not found",
533+
resourceId.getNamespace().orElse(null), resourceId.getName()));
532534
}
533535

534536
return Optional.empty();

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,10 @@ public void testReconcileJobNotFound() {
514514
var snapshot = createSavepoint(deployment);
515515
var errorMessage =
516516
String.format(
517-
"Secondary resource FlinkDeployment/%s for savepoint snapshot-test was not found",
518-
deployment.getMetadata().getName());
517+
"Secondary resource %s/%s (%s) for savepoint snapshot-test was not found",
518+
deployment.getMetadata().getNamespace(),
519+
deployment.getMetadata().getName(),
520+
CrdConstants.KIND_FLINK_DEPLOYMENT);
519521

520522
// First reconcile will trigger the snapshot.
521523
controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment));
@@ -556,8 +558,10 @@ public void testReconcileJobNotRunning() {
556558
var snapshot = createSavepoint(deployment);
557559
var errorMessage =
558560
String.format(
559-
"Secondary resource FlinkDeployment/%s for savepoint snapshot-test is not running",
560-
deployment.getMetadata().getName());
561+
"Secondary resource %s/%s (%s) for savepoint snapshot-test is not running",
562+
deployment.getMetadata().getNamespace(),
563+
deployment.getMetadata().getName(),
564+
CrdConstants.KIND_FLINK_DEPLOYMENT);
561565

562566
controller.reconcile(snapshot, context);
563567

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,19 +1072,19 @@ public void testFlinkStateSnapshotValidator() {
10721072
null);
10731073

10741074
var refName = "does-not-exist";
1075+
var namespace = "default";
10751076
var snapshot =
10761077
TestUtils.buildFlinkStateSnapshotSavepoint(
10771078
false,
10781079
JobReference.builder()
10791080
.kind(JobKind.FLINK_DEPLOYMENT)
10801081
.name(refName)
1082+
.namespace(namespace)
10811083
.build());
10821084
testStateSnapshotValidate(
10831085
snapshot,
10841086
Optional.empty(),
1085-
String.format(
1086-
"Target for snapshot (FlinkDeployment/%s) in namespace test was not found",
1087-
refName));
1087+
String.format("Target for snapshot %s/%s was not found", namespace, refName));
10881088
}
10891089

10901090
private void testStateSnapshotValidateWithModifier(

flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
2626
import org.apache.flink.kubernetes.operator.api.spec.JobKind;
2727
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
28+
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
2829
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
2930

3031
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -101,15 +102,21 @@ private void validateSessionJob(KubernetesResource resource) {
101102
}
102103

103104
private void validateStateSnapshot(KubernetesResource resource) {
104-
FlinkStateSnapshot savepoint =
105-
objectMapper.convertValue(resource, FlinkStateSnapshot.class);
105+
FlinkStateSnapshot snapshot = objectMapper.convertValue(resource, FlinkStateSnapshot.class);
106106

107-
var namespace = savepoint.getMetadata().getNamespace();
108-
var jobRef = savepoint.getSpec().getJobReference();
107+
var jobRef = snapshot.getSpec().getJobReference();
109108

110109
AbstractFlinkResource<?, ?> targetResource = null;
111110
if (jobRef != null && jobRef.getName() != null && jobRef.getKind() != null) {
111+
var namespace =
112+
FlinkStateSnapshotUtils.getSnapshotJobReferenceResourceId(snapshot)
113+
.getNamespace()
114+
.orElseThrow(
115+
() ->
116+
new IllegalArgumentException(
117+
"Cannot determine namespace for snapshot"));
112118
var key = Cache.namespaceKeyFunc(namespace, jobRef.getName());
119+
113120
if (JobKind.FLINK_DEPLOYMENT.equals(jobRef.getKind())) {
114121
targetResource =
115122
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
@@ -124,7 +131,7 @@ private void validateStateSnapshot(KubernetesResource resource) {
124131

125132
for (FlinkResourceValidator validator : validators) {
126133
Optional<String> validationError =
127-
validator.validateStateSnapshot(savepoint, Optional.ofNullable(targetResource));
134+
validator.validateStateSnapshot(snapshot, Optional.ofNullable(targetResource));
128135
if (validationError.isPresent()) {
129136
throw new NotAllowedException(validationError.get());
130137
}

helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ spec:
4848
type: string
4949
name:
5050
type: string
51+
namespace:
52+
type: string
5153
type: object
5254
savepoint:
5355
properties:

0 commit comments

Comments
 (0)