Skip to content

Commit a82017c

Browse files
authored
Merge branch 'main' into retentionDuration
2 parents e898a4e + 4c6b30c commit a82017c

File tree

7 files changed

+142
-7
lines changed

7 files changed

+142
-7
lines changed

build-tools/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
FROM gradle:8.14.2-jdk17-noble AS builder
17+
FROM gradle:8.14.3-jdk17-noble AS builder
1818

1919
WORKDIR /app
2020

docs/spark_custom_resources.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,13 +290,13 @@ On the other hand, when developing an application, it's possible to configure
290290
applicationTolerations:
291291
# Acceptable values are 'Always', 'OnFailure', 'Never'
292292
# Setting this to 'OnFailure' would retain secondary resources if and only if the app fails
293-
resourceRetentionPolicy: OnFailure
293+
resourceRetainPolicy: OnFailure
294294
# Secondary resources would be garbage collected 10 minutes after app termination
295295
resourceRetainDurationMillis: 600000
296296
```
297297

298298
to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
299-
if resourceRetentionPolicy is set to `Always`, operator would not delete driver resources
299+
if resourceRetainPolicy is set to `Always`, operator would not delete driver resources
300300
when app ends. They would be by default kept with the same lifecycle as the App. It's also
301301
possible to configure `resourceRetainDurationMillis` to define the maximal retain duration for
302302
these resources. Note that this applies only to operator-created resources (driver pod, SparkConf

gradle/wrapper/gradle-wrapper.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
distributionBase=GRADLE_USER_HOME
1919
distributionPath=wrapper/dists
20-
distributionSha256Sum=7197a12f450794931532469d4ff21a59ea2c1cd59a3ec3f89c035c3c420a6999
21-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
20+
distributionSha256Sum=bd71102213493060956ec229d946beee57158dbd89d0e62b91bca0fa2c5f3531
21+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
2222
networkTimeout=10000
2323
validateDistributionUrl=true
2424
zipStoreBase=GRADLE_USER_HOME

gradlew

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ APP_BASE_NAME=${0##*/}
8989
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
9090
' "$PWD" ) || exit
9191
if [ ! -e $APP_HOME/gradle/wrapper/gradle-wrapper.jar -a "$(command -v curl)" ]; then
92-
curl -o $APP_HOME/gradle/wrapper/gradle-wrapper.jar https://raw.githubusercontent.com/gradle/gradle/v8.14.2/gradle/wrapper/gradle-wrapper.jar
92+
curl -o $APP_HOME/gradle/wrapper/gradle-wrapper.jar https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar
9393
fi
9494
# If the file still doesn't exist, let's try `wget` and cross our fingers
9595
if [ ! -e $APP_HOME/gradle/wrapper/gradle-wrapper.jar -a "$(command -v wget)" ]; then
96-
wget -O $APP_HOME/gradle/wrapper/gradle-wrapper.jar https://raw.githubusercontent.com/gradle/gradle/v8.14.2/gradle/wrapper/gradle-wrapper.jar
96+
wget -O $APP_HOME/gradle/wrapper/gradle-wrapper.jar https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar
9797
fi
9898

9999
# Use the maximum available, or set MAX_FD != -1 to use that value.

spark-operator-api/build.gradle

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,43 @@ tasks.register('relocateGeneratedCRD', Copy) {
5757
into "../build-tools/helm/spark-kubernetes-operator/crds"
5858
rename '(.+).yml', '$1.yaml'
5959
}
60+
61+
tasks.register("assertGeneratedCRDMatchesHelmChart") {
62+
dependsOn 'finalizeGeneratedCRD'
63+
description = 'Asserts that the generated CRD yaml matches the staged version in Helm Chart'
64+
doLast {
65+
def currentPath = projectDir.absolutePath
66+
def generatedCRDFileBase = "$currentPath/build/classes/java/main/META-INF/fabric8/"
67+
def stagedCRDFileBase = "$currentPath/../build-tools/helm/spark-kubernetes-operator/crds/"
68+
def generatedAppCRD = [
69+
"yq",
70+
"e",
71+
".spec.versions[0]",
72+
"${generatedCRDFileBase}sparkapplications.spark.apache.org-v1.yml"
73+
].execute().text.trim()
74+
def generatedClusterCRD = [
75+
"yq",
76+
"e",
77+
".spec.versions[0]",
78+
"${generatedCRDFileBase}sparkclusters.spark.apache.org-v1.yml"
79+
].execute().text.trim()
80+
def stagedAppCRD = [
81+
"yq",
82+
"e",
83+
".spec.versions[-1]",
84+
"${stagedCRDFileBase}sparkapplications.spark.apache.org-v1.yaml"
85+
].execute().text.trim()
86+
def stagedClusterCRD = [
87+
"yq",
88+
"e",
89+
".spec.versions[-1]",
90+
"${stagedCRDFileBase}sparkclusters.spark.apache.org-v1.yaml"
91+
].execute().text.trim()
92+
if (generatedAppCRD != stagedAppCRD || generatedClusterCRD != stagedClusterCRD) {
93+
throw new GradleException("Generated CRD yaml does not match the staged version in " +
94+
"Helm Chart, please keep the chart updated.")
95+
}
96+
}
97+
}
98+
99+
test.finalizedBy('assertGeneratedCRDMatchesHelmChart')

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/StatusRecorder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private void patchAndStatusWithVersionLocked(CR resource, KubernetesClient clien
9393
CR updated = client.resource(resource).lockResourceVersion().updateStatus();
9494
resource.getMetadata().setResourceVersion(updated.getMetadata().getResourceVersion());
9595
err = null;
96+
break;
9697
} catch (KubernetesClientException e) {
9798
log.warn("Error while patching status, retrying {}/{}...", i + 1, maxRetry, e);
9899
Thread.sleep(TimeUnit.SECONDS.toMillis(API_RETRY_ATTEMPT_AFTER_SECONDS.getValue()));
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.utils;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.mockito.ArgumentMatchers.any;
24+
import static org.mockito.ArgumentMatchers.assertArg;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.when;
29+
30+
import java.util.List;
31+
32+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33+
import io.fabric8.kubernetes.client.KubernetesClient;
34+
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
35+
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
36+
import org.jetbrains.annotations.NotNull;
37+
import org.junit.jupiter.api.Test;
38+
39+
import org.apache.spark.k8s.operator.SparkApplication;
40+
import org.apache.spark.k8s.operator.context.BaseContext;
41+
import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
42+
import org.apache.spark.k8s.operator.status.ApplicationStatus;
43+
44+
@EnableKubernetesMockClient
45+
@SuppressFBWarnings(
46+
value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD"},
47+
justification = "Unwritten fields are covered by Kubernetes mock client")
48+
class StatusRecorderTest {
49+
50+
public static final String DEFAULT_NS = "default";
51+
KubernetesMockServer server;
52+
KubernetesClient client;
53+
54+
SparkAppStatusListener mockStatusListener = mock(SparkAppStatusListener.class);
55+
56+
StatusRecorder<ApplicationStatus, SparkApplication, SparkAppStatusListener> statusRecorder =
57+
new StatusRecorder<>(
58+
List.of(mockStatusListener), ApplicationStatus.class, SparkApplication.class);
59+
60+
@Test
61+
void retriesFailedStatusPatches() {
62+
var testResource = getSparkApplication("1");
63+
var resourceV2 = getSparkApplication("2");
64+
var resourceV3 = getSparkApplication("3");
65+
66+
BaseContext<SparkApplication> context = mock(BaseContext.class);
67+
when(context.getResource()).thenReturn(testResource);
68+
when(context.getClient()).thenReturn(client);
69+
var path =
70+
"/apis/spark.apache.org/v1/namespaces/"
71+
+ DEFAULT_NS
72+
+ "/sparkapplications/"
73+
+ testResource.getMetadata().getName()
74+
+ "/status";
75+
server.expect().withPath(path).andReturn(500, null).once();
76+
server.expect().withPath(path).andReturn(200, resourceV2).once();
77+
// this should be not called, thus updated resource should have resourceVersion 2
78+
server.expect().withPath(path).andReturn(200, resourceV3).once();
79+
80+
statusRecorder.persistStatus(context, new ApplicationStatus());
81+
82+
verify(mockStatusListener, times(1))
83+
.listenStatus(
84+
assertArg(a -> assertThat(a.getMetadata().getResourceVersion()).isEqualTo("2")),
85+
any(),
86+
any());
87+
}
88+
89+
private static @NotNull SparkApplication getSparkApplication(String resourceVersion) {
90+
var updated = TestUtils.createMockApp(DEFAULT_NS);
91+
updated.getMetadata().setResourceVersion(resourceVersion);
92+
return updated;
93+
}
94+
}

0 commit comments

Comments
 (0)