Skip to content

Commit e80dead

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-53706] App reconcile steps should properly handle exceptions in status update
### What changes were proposed in this pull request? This PR adds exception handling for status recorder when persisting status. If encountered exceptions in updating app status, reconciler would finish current reconcile loop and requeue a new one instead of blindly throw the exception. ### Why are the changes needed? SparkAppReconciler is not handling exceptions when updating app status - these failed reconcile loops may end up with endless retry if the status update is caused by conflicts. For example, we observe exceptions like these when app is starting ``` Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://kind-control-plane.vsl:6443/apis/spark.apache.org/v1/namespaces/default/sparkapplications/spark-example-retain-duration/status. Message: Operation cannot be fulfilled on sparkapplications.spark.apache.org "spark-example-retain-duration": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=spark.apache.org, kind=sparkapplications, name=spark-example-retain-duration, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on sparkapplications.spark.apache.org "spark-example-retain-duration": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}). at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:642) ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:622) ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:582) ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?] at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:549) ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?] at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:141) ~[spark-kubernetes-operator-0.5.0-SNAPSHOT-all.jar:?] ``` Why is this happening ? Because reconcile can be triggered again by driver pod status update while another reconcile is in-progress. Without proper exception handling, this would keep recurring, taking unnecessary CPU time and confusing user in logs. Another corner case that would be fixed by this patch: current `AppInitStep` would mark app as `SchedulingFailure` even if the resources are requested as expected and only the status update fails. This patch handles exceptions for resource creation and status update separately. We'd better digest this better: if an exception is thrown while updating app status (which is typically at the last of each reconcile) - operator shall properly finish this reconcile loop and start a new one. App status is fetched from cache at the beginning of each reconcile - and our reconcile steps are ready designed to be idempotent. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested from two perspectives: 1. CIs - also added new unit test to validate update idempotency in init step 2. Check logs : with rthis patch, no more exceptions like above is thrown while running apps ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#341 from jiangzho/status_patch. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a295105 commit e80dead

File tree

9 files changed

+140
-39
lines changed

9 files changed

+140
-39
lines changed

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,30 +103,29 @@ public ReconcileProgress reconcile(
103103
}
104104
}
105105
}
106-
ApplicationStatus updatedStatus =
107-
context
108-
.getResource()
109-
.getStatus()
110-
.appendNewState(
111-
new ApplicationState(
112-
ApplicationStateSummary.DriverRequested, Constants.DRIVER_REQUESTED_MESSAGE));
113-
statusRecorder.persistStatus(context, updatedStatus);
114-
return completeAndDefaultRequeue();
115106
} catch (Exception e) {
116107
if (log.isErrorEnabled()) {
117108
log.error("Failed to request driver resource.", e);
118109
}
119110
String errorMessage =
120111
Constants.SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + buildGeneralErrorMessage(e);
121-
statusRecorder.persistStatus(
122-
context,
112+
ApplicationStatus updatedStatus =
123113
context
124114
.getResource()
125115
.getStatus()
126116
.appendNewState(
127-
new ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage)));
128-
return completeAndImmediateRequeue();
117+
new ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage));
118+
return attemptStatusUpdate(
119+
context, statusRecorder, updatedStatus, completeAndImmediateRequeue());
129120
}
121+
ApplicationStatus updatedStatus =
122+
context
123+
.getResource()
124+
.getStatus()
125+
.appendNewState(
126+
new ApplicationState(
127+
ApplicationStateSummary.DriverRequested, Constants.DRIVER_REQUESTED_MESSAGE));
128+
return attemptStatusUpdate(context, statusRecorder, updatedStatus, completeAndDefaultRequeue());
130129
}
131130

132131
/**

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Optional;
2929

3030
import io.fabric8.kubernetes.api.model.Pod;
31+
import lombok.extern.log4j.Log4j2;
3132

3233
import org.apache.spark.k8s.operator.SparkApplication;
3334
import org.apache.spark.k8s.operator.context.SparkAppContext;
@@ -38,6 +39,7 @@
3839
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
3940

4041
/** Basic reconcile step for application. */
42+
@Log4j2
4143
public abstract class AppReconcileStep {
4244
/**
4345
* Reconciles a specific step for a Spark application.
@@ -77,18 +79,45 @@ protected ReconcileProgress observeDriver(
7779
for (ApplicationState state : stateUpdates) {
7880
currentStatus = currentStatus.appendNewState(state);
7981
}
80-
statusRecorder.persistStatus(context, currentStatus);
81-
return completeAndImmediateRequeue();
82+
return attemptStatusUpdate(
83+
context, statusRecorder, currentStatus, completeAndImmediateRequeue());
8284
}
8385
} else {
8486
ApplicationStatus updatedStatus = currentStatus.appendNewState(driverUnexpectedRemoved());
85-
statusRecorder.persistStatus(context, updatedStatus);
87+
return attemptStatusUpdate(
88+
context, statusRecorder, updatedStatus, completeAndImmediateRequeue());
89+
}
90+
}
91+
92+
/**
93+
* Updates the application status - if the status is successfully persisted, proceed with the
94+
* given progress. Otherwise, completes current reconcile loop immediately and requeue. Latest
95+
* application status would be fetched from cache in next reconcile attempt.
96+
*
97+
* @param context The SparkAppContext for the application.
98+
* @param statusRecorder The SparkAppStatusRecorder for recording status updates.
99+
* @param updatedStatus The updated ApplicationStatus.
100+
* @param progressUponSuccessStatusUpdate The ReconcileProgress if the status update has been
101+
* persisted successfully.
102+
* @return The ReconcileProgress for next steps.
103+
*/
104+
protected ReconcileProgress attemptStatusUpdate(
105+
final SparkAppContext context,
106+
final SparkAppStatusRecorder statusRecorder,
107+
final ApplicationStatus updatedStatus,
108+
final ReconcileProgress progressUponSuccessStatusUpdate) {
109+
110+
if (statusRecorder.persistStatus(context, updatedStatus)) {
111+
return progressUponSuccessStatusUpdate;
112+
} else {
113+
log.warn("Failed to persist status, will retry status update in next reconcile attempt");
86114
return completeAndImmediateRequeue();
87115
}
88116
}
89117

90118
/**
91-
* Updates the application status and re-queues the reconciliation after a specified duration.
119+
* Updates the application status and re-queues the reconciliation after a specified duration. If
120+
* the status update fails, trigger an immediate requeue.
92121
*
93122
* @param context The SparkAppContext for the application.
94123
* @param statusRecorder The SparkAppStatusRecorder for recording status updates.
@@ -101,13 +130,16 @@ protected ReconcileProgress updateStatusAndRequeueAfter(
101130
SparkAppStatusRecorder statusRecorder,
102131
ApplicationStatus updatedStatus,
103132
Duration requeueAfter) {
104-
statusRecorder.persistStatus(context, updatedStatus);
105-
return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
133+
return attemptStatusUpdate(
134+
context,
135+
statusRecorder,
136+
updatedStatus,
137+
ReconcileProgress.completeAndRequeueAfter(requeueAfter));
106138
}
107139

108140
/**
109141
* Appends a new state to the application status, persists it, and re-queues the reconciliation
110-
* after a specified duration.
142+
* after a specified duration. If the status update fails, trigger an immediate requeue.
111143
*
112144
* @param context The SparkAppContext for the application.
113145
* @param statusRecorder The SparkAppStatusRecorder for recording status updates.
@@ -120,7 +152,10 @@ protected ReconcileProgress appendStateAndRequeueAfter(
120152
SparkAppStatusRecorder statusRecorder,
121153
ApplicationState newState,
122154
Duration requeueAfter) {
123-
statusRecorder.appendNewStateAndPersist(context, newState);
155+
if (!statusRecorder.appendNewStateAndPersist(context, newState)) {
156+
log.warn("Status is not persisted successfully, will retry in next reconcile attempt");
157+
return completeAndImmediateRequeue();
158+
}
124159
return ReconcileProgress.completeAndRequeueAfter(requeueAfter);
125160
}
126161

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.spark.k8s.operator.spec.ExecutorInstanceConfig;
3434
import org.apache.spark.k8s.operator.status.ApplicationState;
3535
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
36+
import org.apache.spark.k8s.operator.status.ApplicationStatus;
3637
import org.apache.spark.k8s.operator.utils.PodUtils;
3738
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
3839

@@ -87,13 +88,13 @@ public ReconcileProgress reconcile(
8788
return observeDriver(
8889
context, statusRecorder, Collections.singletonList(new AppDriverRunningObserver()));
8990
} else {
90-
statusRecorder.persistStatus(
91-
context,
91+
ApplicationStatus updatedStatus =
9292
context
9393
.getResource()
9494
.getStatus()
95-
.appendNewState(new ApplicationState(proposedStateSummary, stateMessage)));
96-
return completeAndDefaultRequeue();
95+
.appendNewState(new ApplicationState(proposedStateSummary, stateMessage));
96+
return attemptStatusUpdate(
97+
context, statusRecorder, updatedStatus, completeAndDefaultRequeue());
9798
}
9899
}
99100
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppUnknownStateStep.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ public ReconcileProgress reconcile(
4646
new ApplicationState(ApplicationStateSummary.Failed, Constants.UNKNOWN_STATE_MESSAGE);
4747
Optional<Pod> driver = context.getDriverPod();
4848
driver.ifPresent(pod -> state.setLastObservedDriverStatus(pod.getStatus()));
49-
statusRecorder.persistStatus(context, context.getResource().getStatus().appendNewState(state));
50-
return ReconcileProgress.completeAndImmediateRequeue();
49+
return attemptStatusUpdate(
50+
context,
51+
statusRecorder,
52+
context.getResource().getStatus().appendNewState(state),
53+
ReconcileProgress.completeAndImmediateRequeue());
5154
}
5255
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppValidateStep.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,16 @@ public ReconcileProgress reconcile(
4848
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
4949
if (!isValidApplicationStatus(context.getResource())) {
5050
log.warn("Spark application found with empty status. Resetting to initial state.");
51-
statusRecorder.persistStatus(context, new ApplicationStatus());
51+
return attemptStatusUpdate(context, statusRecorder, new ApplicationStatus(), proceed());
5252
}
5353
if (ClientMode.equals(context.getResource().getSpec())) {
5454
ApplicationState failure =
5555
new ApplicationState(ApplicationStateSummary.Failed, "Client mode is not supported yet.");
56-
statusRecorder.persistStatus(
57-
context, context.getResource().getStatus().appendNewState(failure));
58-
return completeAndImmediateRequeue();
56+
return attemptStatusUpdate(
57+
context,
58+
statusRecorder,
59+
context.getResource().getStatus().appendNewState(failure),
60+
completeAndImmediateRequeue());
5961
}
6062
return proceed();
6163
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,15 @@ public SparkAppStatusRecorder(
5151
*
5252
* @param context The SparkAppContext for the application.
5353
* @param newState The new ApplicationState to append.
54+
* @return true if the status is successfully patched
5455
*/
55-
public void appendNewStateAndPersist(SparkAppContext context, ApplicationState newState) {
56+
public boolean appendNewStateAndPersist(SparkAppContext context, ApplicationState newState) {
5657
ApplicationStatus appStatus = context.getResource().getStatus();
57-
recorderSource.recordStatusUpdateLatency(appStatus, newState);
5858
ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
59-
persistStatus(context, updatedStatus);
59+
boolean statusPersisted = persistStatus(context, updatedStatus);
60+
if (statusPersisted) {
61+
recorderSource.recordStatusUpdateLatency(appStatus, newState);
62+
}
63+
return statusPersisted;
6064
}
6165
}

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,17 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien
127127
*
128128
* @param context The BaseContext containing the resource and client.
129129
* @param newStatus The new status to persist.
130+
* @return true if the status is successfully patched.
130131
*/
131-
public void persistStatus(BaseContext<CR> context, STATUS newStatus) {
132-
context.getResource().setStatus(newStatus);
133-
patchAndStatusWithVersionLocked(context.getResource(), context.getClient());
132+
public boolean persistStatus(BaseContext<CR> context, STATUS newStatus) {
133+
try {
134+
context.getResource().setStatus(newStatus);
135+
patchAndStatusWithVersionLocked(context.getResource(), context.getClient());
136+
return true;
137+
} catch (KubernetesClientException e) {
138+
log.error("Error while persisting status to {}", newStatus, e);
139+
return false;
140+
}
134141
}
135142

136143
/**

spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStepTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import static org.junit.jupiter.api.Assertions.assertFalse;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.eq;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.times;
@@ -171,6 +172,8 @@ void onDemandCleanupForRunningAppExpectDelete() {
171172
when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
172173
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
173174
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
175+
when(mockRecorder.persistStatus(eq(mockAppContext), any())).thenReturn(true);
176+
when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext), any())).thenReturn(true);
174177

175178
try (MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
176179
ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder);
@@ -259,6 +262,8 @@ void onDemandCleanupForTerminatedAppExpectDelete() {
259262
when(mockAppContext.getDriverPod()).thenReturn(Optional.of(driverPod));
260263
when(mockAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
261264
when(mockAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
265+
when(mockRecorder.persistStatus(eq(mockAppContext), any())).thenReturn(true);
266+
when(mockRecorder.appendNewStateAndPersist(eq(mockAppContext), any())).thenReturn(true);
262267

263268
try (MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
264269
ReconcileProgress progress = cleanUpWithReason.reconcile(mockAppContext, mockRecorder);
@@ -317,6 +322,8 @@ void cleanupForAppExpectDeleteWithRecompute() {
317322
when(mockAppContext2.getDriverPreResourcesSpec())
318323
.thenReturn(Collections.singletonList(resource1));
319324
when(mockAppContext2.getDriverResourcesSpec()).thenReturn(Collections.singletonList(resource2));
325+
when(mockRecorder.persistStatus(any(), any())).thenReturn(true);
326+
when(mockRecorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
320327

321328
try (MockedStatic<ReconcilerUtils> utils = Mockito.mockStatic(ReconcilerUtils.class)) {
322329
ReconcileProgress progress1 = cleanUpWithReason.reconcile(mockAppContext1, mockRecorder);

spark-operator/src/test/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStepTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.k8s.operator.reconciler.reconcilesteps;
2121

22+
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.anyList;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@
4950

5051
import org.apache.spark.k8s.operator.SparkApplication;
5152
import org.apache.spark.k8s.operator.context.SparkAppContext;
53+
import org.apache.spark.k8s.operator.reconciler.ReconcileProgress;
5254
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
5355

5456
@EnableKubernetesMockClient(crud = true)
@@ -106,7 +108,10 @@ void driverResourcesHaveOwnerReferencesToDriver() {
106108
when(mocksparkAppContext.getDriverResourcesSpec())
107109
.thenReturn(Collections.singletonList(resourceConfigMapSpec));
108110
when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
109-
appInitStep.reconcile(mocksparkAppContext, recorder);
111+
when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
112+
when(recorder.persistStatus(any(), any())).thenReturn(true);
113+
ReconcileProgress reconcileProgress = appInitStep.reconcile(mocksparkAppContext, recorder);
114+
Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), reconcileProgress);
110115
Pod createdPod = kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
111116
ConfigMap createCM =
112117
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
@@ -135,6 +140,8 @@ void createdPreResourcesPatchedWithOwnerReferencesToDriver() {
135140
.thenReturn(Collections.singletonList(preResourceConfigMapSpec));
136141
when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
137142
when(mocksparkAppContext.getDriverResourcesSpec()).thenReturn(Collections.emptyList());
143+
when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(true);
144+
when(recorder.persistStatus(any(), any())).thenReturn(true);
138145

139146
KubernetesClient mockClient = mock(KubernetesClient.class);
140147
when(mocksparkAppContext.getClient()).thenReturn(mockClient);
@@ -166,8 +173,9 @@ void createdPreResourcesPatchedWithOwnerReferencesToDriver() {
166173
when(mockClient.resourceList(anyList())).thenReturn(mockList);
167174
when(mockList.forceConflicts()).thenReturn(mockServerSideApplicable);
168175

169-
appInitStep.reconcile(mocksparkAppContext, recorder);
176+
ReconcileProgress reconcileProgress = appInitStep.reconcile(mocksparkAppContext, recorder);
170177

178+
Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), reconcileProgress);
171179
ArgumentCaptor<List<ConfigMap>> argument = ArgumentCaptor.forClass(List.class);
172180
verify(mockClient).resourceList(argument.capture());
173181
Assertions.assertEquals(1, argument.getValue().size());
@@ -184,4 +192,39 @@ void createdPreResourcesPatchedWithOwnerReferencesToDriver() {
184192
decoratedConfigMap.getMetadata().getOwnerReferences().get(0).getKind());
185193
Assertions.assertTrue(decoratedConfigMap.getMetadata().getManagedFields().isEmpty());
186194
}
195+
196+
@Test
197+
void appInitStepShouldBeIdempotentWhenStatusUpdateFails() {
198+
AppInitStep appInitStep = new AppInitStep();
199+
SparkAppContext mocksparkAppContext = mock(SparkAppContext.class);
200+
SparkAppStatusRecorder recorder = mock(SparkAppStatusRecorder.class);
201+
SparkApplication application = new SparkApplication();
202+
application.setMetadata(applicationMetadata);
203+
when(mocksparkAppContext.getResource()).thenReturn(application);
204+
when(mocksparkAppContext.getDriverPreResourcesSpec()).thenReturn(Collections.emptyList());
205+
when(mocksparkAppContext.getDriverPodSpec()).thenReturn(driverPodSpec);
206+
when(mocksparkAppContext.getDriverResourcesSpec())
207+
.thenReturn(Collections.singletonList(resourceConfigMapSpec));
208+
when(mocksparkAppContext.getClient()).thenReturn(kubernetesClient);
209+
when(recorder.appendNewStateAndPersist(any(), any())).thenReturn(false, true);
210+
when(recorder.persistStatus(any(), any())).thenReturn(false, true);
211+
212+
// If the first reconcile manages to create everything but fails to update status
213+
ReconcileProgress reconcileProgress1 = appInitStep.reconcile(mocksparkAppContext, recorder);
214+
Assertions.assertEquals(ReconcileProgress.completeAndImmediateRequeue(), reconcileProgress1);
215+
Pod createdPod = kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
216+
ConfigMap createCM =
217+
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
218+
Assertions.assertNotNull(createCM);
219+
Assertions.assertNotNull(createdPod);
220+
221+
// The second reconcile shall update the status without re-creating everything
222+
ReconcileProgress reconcileProgress2 = appInitStep.reconcile(mocksparkAppContext, recorder);
223+
Assertions.assertEquals(ReconcileProgress.completeAndDefaultRequeue(), reconcileProgress2);
224+
createdPod = kubernetesClient.pods().inNamespace("default").withName("driver-pod").get();
225+
createCM =
226+
kubernetesClient.configMaps().inNamespace("default").withName("resource-configmap").get();
227+
Assertions.assertNotNull(createCM);
228+
Assertions.assertNotNull(createdPod);
229+
}
187230
}

0 commit comments

Comments
 (0)