Skip to content

Commit 298291e

Browse files
csviridongjoon-hyun
authored andcommitted
[SPARK-52754] Fix repeated status update retries if no error
### What changes were proposed in this pull request? PR fixes a small issues with status patch, that retries the status update even if it succeeded before. ### Why are the changes needed? - Breaking the retry look when status update succeeded - added unit test ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#280 from csviri/fix-retry. Authored-by: Attila Mészáros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 47447e0 commit 298291e

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien
9393
CR updated = client.resource(resource).lockResourceVersion().updateStatus();
9494
resource.getMetadata().setResourceVersion(updated.getMetadata().getResourceVersion());
9595
err = null;
96+
break;
9697
} catch (KubernetesClientException e) {
9798
log.warn("Error while patching status, retrying {}/{}...", i + 1, maxRetry, e);
9899
Thread.sleep(TimeUnit.SECONDS.toMillis(API_RETRY_ATTEMPT_AFTER_SECONDS.getValue()));
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.utils;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.assertArg;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.when;
29+
30+
import java.util.List;
31+
32+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33+
import io.fabric8.kubernetes.client.KubernetesClient;
34+
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
35+
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
36+
import org.jetbrains.annotations.NotNull;
37+
import org.junit.jupiter.api.Test;
38+
39+
import org.apache.spark.k8s.operator.SparkApplication;
40+
import org.apache.spark.k8s.operator.context.BaseContext;
41+
import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
42+
import org.apache.spark.k8s.operator.status.ApplicationStatus;
43+
44+
@EnableKubernetesMockClient
45+
@SuppressFBWarnings(
46+
value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD"},
47+
justification = "Unwritten fields are covered by Kubernetes mock client")
48+
class StatusRecorderTest {
49+
50+
public static final String DEFAULT_NS = "default";
51+
KubernetesMockServer server;
52+
KubernetesClient client;
53+
54+
SparkAppStatusListener mockStatusListener = mock(SparkAppStatusListener.class);
55+
56+
StatusRecorder<ApplicationStatus, SparkApplication, SparkAppStatusListener> statusRecorder =
57+
new StatusRecorder<>(
58+
List.of(mockStatusListener), ApplicationStatus.class, SparkApplication.class);
59+
60+
@Test
61+
void retriesFailedStatusPatches() {
62+
var testResource = getSparkApplication("1");
63+
var resourceV2 = getSparkApplication("2");
64+
var resourceV3 = getSparkApplication("3");
65+
66+
BaseContext<SparkApplication> context = mock(BaseContext.class);
67+
when(context.getResource()).thenReturn(testResource);
68+
when(context.getClient()).thenReturn(client);
69+
var path =
70+
"/apis/spark.apache.org/v1/namespaces/"
71+
+ DEFAULT_NS
72+
+ "/sparkapplications/"
73+
+ testResource.getMetadata().getName()
74+
+ "/status";
75+
server.expect().withPath(path).andReturn(500, null).once();
76+
server.expect().withPath(path).andReturn(200, resourceV2).once();
77+
// this should be not called, thus updated resource should have resourceVersion 2
78+
server.expect().withPath(path).andReturn(200, resourceV3).once();
79+
80+
statusRecorder.persistStatus(context, new ApplicationStatus());
81+
82+
verify(mockStatusListener, times(1))
83+
.listenStatus(
84+
assertArg(a -> assertThat(a.getMetadata().getResourceVersion()).isEqualTo("2")),
85+
any(),
86+
any());
87+
}
88+
89+
private static @NotNull SparkApplication getSparkApplication(String resourceVersion) {
90+
var updated = TestUtils.createMockApp(DEFAULT_NS);
91+
updated.getMetadata().setResourceVersion(resourceVersion);
92+
return updated;
93+
}
94+
}

0 commit comments

Comments
 (0)