Skip to content

Commit 5e304b0

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-52173] Fix OwnerReference Patching for Spark Application Driver Resources
### What changes were proposed in this pull request? This PR fixes the logic to patch ownerReference on pre-resources for application. It would decorate the proposed spec instead of created resource with generated fields. ### Why are the changes needed? At initialization phase, operator would request pre-resources for driver first, then apply the owner references on these resources with created driver pod uid. The latter patching is currently failing as it's decorating on created resource (which have managed fields set). ``` Message: metadata.managedFields must be nil. ``` > ... when using the Apply operation you cannot define managedFields in the body of the request that you submit. https://kubernetes.io/docs/reference/using-api/server-side-apply/ The patch body shall not include generated `managedFields` . ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests added ### Was this patch authored or co-authored using generative AI tooling? No Closes #204 from jiangzho/cleanup. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 24b2abd commit 5e304b0

File tree

2 files changed

+193
-8
lines changed
  • spark-operator/src

2 files changed

+193
-8
lines changed

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import java.time.Duration;
2828
import java.time.Instant;
29-
import java.util.ArrayList;
3029
import java.util.List;
3130
import java.util.Optional;
3231

@@ -70,13 +69,11 @@ public ReconcileProgress reconcile(
7069
}
7170
}
7271
try {
73-
List<HasMetadata> createdPreResources = new ArrayList<>();
74-
for (HasMetadata resource : context.getDriverPreResourcesSpec()) {
72+
List<HasMetadata> preResourcesSpec = context.getDriverPreResourcesSpec();
73+
for (HasMetadata resource : preResourcesSpec) {
7574
Optional<HasMetadata> createdResource =
7675
ReconcilerUtils.getOrCreateSecondaryResource(context.getClient(), resource);
77-
if (createdResource.isPresent()) {
78-
createdPreResources.add(createdResource.get());
79-
} else {
76+
if (createdResource.isEmpty()) {
8077
return appendStateAndImmediateRequeue(
8178
context, statusRecorder, creationFailureState(resource));
8279
}
@@ -86,8 +83,8 @@ public ReconcileProgress reconcile(
8683
context.getClient(), context.getDriverPodSpec());
8784
if (driverPod.isPresent()) {
8885
DriverResourceDecorator decorator = new DriverResourceDecorator(driverPod.get());
89-
createdPreResources.forEach(decorator::decorate);
90-
context.getClient().resourceList(createdPreResources).forceConflicts().serverSideApply();
86+
preResourcesSpec.forEach(decorator::decorate);
87+
context.getClient().resourceList(preResourcesSpec).forceConflicts().serverSideApply();
9188
List<HasMetadata> driverResources = context.getDriverResourcesSpec();
9289
driverResources.forEach(decorator::decorate);
9390
for (HasMetadata resource : driverResources) {
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
21+
22+
import static org.mockito.ArgumentMatchers.anyList;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
26+
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
32+
import io.fabric8.kubernetes.api.model.ConfigMap;
33+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
34+
import io.fabric8.kubernetes.api.model.FieldsV1;
35+
import io.fabric8.kubernetes.api.model.ManagedFieldsEntry;
36+
import io.fabric8.kubernetes.api.model.ObjectMeta;
37+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
38+
import io.fabric8.kubernetes.api.model.Pod;
39+
import io.fabric8.kubernetes.api.model.PodBuilder;
40+
import io.fabric8.kubernetes.client.KubernetesClient;
41+
import io.fabric8.kubernetes.client.dsl.NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable;
42+
import io.fabric8.kubernetes.client.dsl.NamespaceableResource;
43+
import io.fabric8.kubernetes.client.dsl.ServerSideApplicable;
44+
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
45+
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
46+
import org.junit.jupiter.api.Assertions;
47+
import org.junit.jupiter.api.Test;
48+
import org.mockito.ArgumentCaptor;
49+
50+
import org.apache.spark.k8s.operator.SparkApplication;
51+
import org.apache.spark.k8s.operator.context.SparkAppContext;
52+
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
53+
54+
@EnableKubernetesMockClient(crud = true)
55+
@SuppressWarnings("PMD")
56+
@SuppressFBWarnings(
57+
value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD", "UUF_UNUSED_FIELD"},
58+
justification = "Unwritten fields are covered by Kubernetes mock client")
59+
class AppInitStepTest {
60+
private KubernetesMockServer mockServer;
61+
private KubernetesClient kubernetesClient;
62+
63+
private final ConfigMap preResourceConfigMapSpec =
64+
new ConfigMapBuilder()
65+
.withNewMetadata()
66+
.withName("pre-configmap")
67+
.withNamespace("default")
68+
.endMetadata()
69+
.withData(Map.of("foo1", "bar1"))
70+
.build();
71+
72+
private final ConfigMap resourceConfigMapSpec =
73+
new ConfigMapBuilder()
74+
.withNewMetadata()
75+
.withName("resource-configmap")
76+
.withNamespace("default")
77+
.endMetadata()
78+
.withData(Map.of("foo", "bar"))
79+
.build();
80+
private final Pod driverPodSpec =
81+
new PodBuilder()
82+
.withNewMetadata()
83+
.withName("driver-pod")
84+
.withNamespace("default")
85+
.endMetadata()
86+
.editOrNewSpec()
87+
.addNewContainer()
88+
.withName("driver-container")
89+
.withImage("spark")
90+
.endContainer()
91+
.endSpec()
92+
.build();
93+
94+
private final ObjectMeta applicationMetadata =
95+
new ObjectMetaBuilder().withName("sparkapp1").withNamespace("default").build();
96+
97+
@Test
98+
void driverResourcesHaveOwnerReferencesToDriver() {
99+
AppInitStep appInitStep = new AppInitStep();
100+
SparkAppContext mocksparkAppContext = mock(SparkAppContext.class);
101+
SparkAppStatusRecorder recorder = mock(SparkAppStatusRecorder.class);
102+
SparkApplication application = new SparkApplication();
103+
application.setMetadata(applicationMetadata);
104+
when(mocksparkAppContext.getResource()).thenReturn(application);
105+
when(mocksparkAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
106+
when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
107+
when(mocksparkAppContext.getDriverResourcesSpec())
108+
.thenReturn(Collections.singletonList(resourceConfigMapSpec));
109+
when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
110+
appInitStep.reconcile(mocksparkAppContext, recorder);
111+
Pod createdPod = kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
112+
ConfigMap createCM =
113+
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
114+
Assertions.assertNotNull(createCM);
115+
Assertions.assertNotNull(createdPod);
116+
Assertions.assertEquals(1, createCM.getMetadata().getOwnerReferences().size());
117+
Assertions.assertEquals(
118+
createdPod.getMetadata().getName(),
119+
createCM.getMetadata().getOwnerReferences().get(0).getName());
120+
Assertions.assertEquals(
121+
createdPod.getMetadata().getUid(),
122+
createCM.getMetadata().getOwnerReferences().get(0).getUid());
123+
Assertions.assertEquals(
124+
createdPod.getKind(), createCM.getMetadata().getOwnerReferences().get(0).getKind());
125+
}
126+
127+
@Test
128+
void createdPreResourcesPatchedWithOwnerReferencesToDriver() {
129+
AppInitStep appInitStep = new AppInitStep();
130+
SparkAppContext mocksparkAppContext = mock(SparkAppContext.class);
131+
SparkAppStatusRecorder recorder = mock(SparkAppStatusRecorder.class);
132+
SparkApplication application = new SparkApplication();
133+
application.setMetadata(applicationMetadata);
134+
when(mocksparkAppContext.getResource()).thenReturn(application);
135+
when(mocksparkAppContext.getDriverPreResourcesSpec())
136+
.thenReturn(Collections.singletonList(preResourceConfigMapSpec));
137+
when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
138+
when(mocksparkAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
139+
140+
KubernetesClient mockClient = mock(KubernetesClient.class);
141+
when(mocksparkAppContext.getClient()).thenReturn(mockClient);
142+
143+
ConfigMap createdConfigMap =
144+
new ConfigMapBuilder(preResourceConfigMapSpec)
145+
.editOrNewMetadata()
146+
.withManagedFields(
147+
new ManagedFieldsEntry(
148+
"v1", "FieldsV1", new FieldsV1(), "foo", "foo", "foo", "foo"))
149+
.endMetadata()
150+
.build();
151+
Pod createdPod =
152+
new PodBuilder(driverPodSpec).editOrNewMetadata().withUid("foobar").endMetadata().build();
153+
154+
NamespaceableResource<ConfigMap> mockCreatedNamespaceableResource =
155+
mock(NamespaceableResource.class);
156+
when(mockCreatedNamespaceableResource.get()).thenReturn(createdConfigMap);
157+
NamespaceableResource<Pod> mockCreatedPod = mock(NamespaceableResource.class);
158+
when(mockCreatedPod.get()).thenReturn(createdPod);
159+
160+
when(mockClient.resource(preResourceConfigMapSpec))
161+
.thenReturn(mockCreatedNamespaceableResource);
162+
when(mockClient.resource(driverPodSpec)).thenReturn(mockCreatedPod);
163+
164+
ServerSideApplicable mockServerSideApplicable = mock(ServerSideApplicable.class);
165+
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable mockList =
166+
mock(NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable.class);
167+
when(mockClient.resourceList(anyList())).thenReturn(mockList);
168+
when(mockList.forceConflicts()).thenReturn(mockServerSideApplicable);
169+
170+
appInitStep.reconcile(mocksparkAppContext, recorder);
171+
172+
ArgumentCaptor<List<ConfigMap>> argument = ArgumentCaptor.forClass(List.class);
173+
verify(mockClient).resourceList(argument.capture());
174+
Assertions.assertEquals(1, argument.getValue().size());
175+
ConfigMap decoratedConfigMap = argument.getValue().get(0);
176+
Assertions.assertEquals(1, decoratedConfigMap.getMetadata().getOwnerReferences().size());
177+
Assertions.assertEquals(
178+
createdPod.getMetadata().getName(),
179+
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getName());
180+
Assertions.assertEquals(
181+
createdPod.getMetadata().getUid(),
182+
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getUid());
183+
Assertions.assertEquals(
184+
createdPod.getKind(),
185+
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getKind());
186+
Assertions.assertTrue(decoratedConfigMap.getMetadata().getManagedFields().isEmpty());
187+
}
188+
}

0 commit comments

Comments
 (0)