Skip to content

Commit 563579f

Browse files
jiangzhodongjoon-hyun
authored andcommitted
[SPARK-51347] Enable Ingress and Service Support for Spark Driver
### What changes were proposed in this pull request? This PR adds support for launching Ingress and Services with Spark Applications. ### Why are the changes needed? There's common ask from user to support exposing driver via ingress and service to serve HTTP / HTTPS from outside the cluster - for example, to access Spark UI. As we support podTemplateSupport already, this feature can also be handy for those who would like to expose additional service ports in the same pod. Therefore, this PR aims to introduce this feature in a more general way instead of just exposing Spark UI. ### Does this PR introduce _any_ user-facing change? No - unreleased version ### How was this patch tested? CIs ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#159 from jiangzho/um. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4bb3d40 commit 563579f

File tree

8 files changed

+364
-5
lines changed

8 files changed

+364
-5
lines changed

docs/spark_custom_resources.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,46 @@ Please be advised that Spark still overrides necessary pod configuration in both
9999
more details,
100100
refer [Spark doc](https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template).
101101

102+
## Enable Additional Ingress for Driver
103+
104+
Operator may create [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) for
105+
Spark driver of running applications on demand. For example, to expose Spark UI - which is by
106+
default enabled on driver port 4040, you may configure
107+
108+
```yaml
109+
spec:
110+
driverServiceIngressList:
111+
- serviceMetadata:
112+
name: "spark-ui-service"
113+
serviceSpec:
114+
ports:
115+
- protocol: TCP
116+
port: 80
117+
targetPort: 4040
118+
ingressMetadata:
119+
name: "spark-ui-ingress"
120+
annotations:
121+
nginx.ingress.kubernetes.io/rewrite-target: /
122+
ingressSpec:
123+
ingressClassName: nginx-example
124+
rules:
125+
- http:
126+
paths:
127+
- path: "/"
128+
pathType: Prefix
129+
backend:
130+
service:
131+
name: spark-ui-service
132+
port:
133+
number: 80
134+
```
135+
136+
Spark Operator by default would populate the `.spec.selector` field of the created Service to match
137+
the driver labels. If `.ingressSpec.rules` is not provided, Spark Operator would also populate one
138+
default rule backed by the associated Service. It's recommended to always provide the ingress spec
139+
to make sure it's compatible with your
140+
[IngressController](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/).
141+
102142
## Understanding Failure Types
103143

104144
In addition to the general `Failed` state (that driver pod fails or driver container exits

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ApplicationSpec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,5 @@ public class ApplicationSpec extends BaseSpec {
5454

5555
protected BaseApplicationTemplateSpec driverSpec;
5656
protected BaseApplicationTemplateSpec executorSpec;
57+
protected List<DriverServiceIngressSpec> driverServiceIngressList;
5758
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.spec;
21+
22+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import com.fasterxml.jackson.annotation.JsonInclude;
24+
import io.fabric8.generator.annotation.Required;
25+
import io.fabric8.kubernetes.api.model.ObjectMeta;
26+
import io.fabric8.kubernetes.api.model.ServiceSpec;
27+
import io.fabric8.kubernetes.api.model.networking.v1.IngressSpec;
28+
import lombok.Builder;
29+
import lombok.Data;
30+
31+
@Data
32+
@Builder
33+
@JsonInclude(JsonInclude.Include.NON_NULL)
34+
@JsonIgnoreProperties(ignoreUnknown = true)
35+
public class DriverServiceIngressSpec {
36+
@Required protected ObjectMeta serviceMetadata;
37+
@Required protected ServiceSpec serviceSpec;
38+
39+
@Required protected ObjectMeta ingressMetadata;
40+
protected IngressSpec ingressSpec;
41+
}

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
package org.apache.spark.k8s.operator;
2121

2222
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.Collections;
2325
import java.util.List;
26+
import java.util.stream.Collectors;
2427

2528
import scala.Tuple2;
2629
import scala.collection.immutable.HashMap;
@@ -40,6 +43,8 @@
4043
import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
4144
import org.apache.spark.deploy.k8s.SparkPod;
4245
import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
46+
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
47+
import org.apache.spark.k8s.operator.utils.DriverServiceIngressUtils;
4348

4449
/**
4550
* Resembles resources that would be directly launched by operator. Based on resolved
@@ -59,7 +64,9 @@ public class SparkAppResourceSpec {
5964
private final SparkAppDriverConf kubernetesDriverConf;
6065

6166
public SparkAppResourceSpec(
62-
SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec kubernetesDriverSpec) {
67+
SparkAppDriverConf kubernetesDriverConf,
68+
KubernetesDriverSpec kubernetesDriverSpec,
69+
List<DriverServiceIngressSpec> driverServiceIngressList) {
6370
this.kubernetesDriverConf = kubernetesDriverConf;
6471
String namespace = kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key());
6572
Map<String, String> confFilesMap =
@@ -86,6 +93,7 @@ public SparkAppResourceSpec(
8693
this.driverResources.add(
8794
KubernetesClientUtils.buildConfigMap(
8895
kubernetesDriverConf.configMapNameDriver(), confFilesMap, new HashMap<>()));
96+
this.driverResources.addAll(configureDriverServerIngress(sparkPod, driverServiceIngressList));
8997
this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
9098
this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
9199
}
@@ -126,4 +134,15 @@ private SparkPod addConfigMap(SparkPod pod, Map<String, String> confFilesMap) {
126134
.build();
127135
return new SparkPod(podWithConfigMapVolume, containerWithConfigMapVolume);
128136
}
137+
138+
private List<HasMetadata> configureDriverServerIngress(
139+
SparkPod pod, List<DriverServiceIngressSpec> driverServiceIngressList) {
140+
if (driverServiceIngressList == null || driverServiceIngressList.isEmpty()) {
141+
return Collections.emptyList();
142+
}
143+
return driverServiceIngressList.stream()
144+
.map(spec -> DriverServiceIngressUtils.buildIngressService(spec, pod.pod().getMetadata()))
145+
.flatMap(Collection::stream)
146+
.collect(Collectors.toList());
147+
}
129148
}

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.spark.k8s.operator;
2121

2222
import java.math.BigInteger;
23+
import java.util.List;
2324
import java.util.Map;
2425

2526
import scala.Option;
@@ -37,6 +38,7 @@
3738
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
3839
import org.apache.spark.deploy.k8s.submit.RMainAppResource;
3940
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
41+
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
4042
import org.apache.spark.k8s.operator.utils.ModelUtils;
4143

4244
/**
@@ -82,7 +84,7 @@ public class SparkAppSubmissionWorker {
8284
public SparkAppResourceSpec getResourceSpec(
8385
SparkApplication app, KubernetesClient client, Map<String, String> confOverrides) {
8486
SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
85-
return buildResourceSpec(appDriverConf, client);
87+
return buildResourceSpec(appDriverConf, app.getSpec().getDriverServiceIngressList(), client);
8688
}
8789

8890
protected SparkAppDriverConf buildDriverConf(
@@ -127,11 +129,14 @@ protected SparkAppDriverConf buildDriverConf(
127129
}
128130

129131
protected SparkAppResourceSpec buildResourceSpec(
130-
SparkAppDriverConf kubernetesDriverConf, KubernetesClient client) {
132+
SparkAppDriverConf kubernetesDriverConf,
133+
List<DriverServiceIngressSpec> driverServiceIngressList,
134+
KubernetesClient client) {
131135
KubernetesDriverBuilder builder = new KubernetesDriverBuilder();
132136
KubernetesDriverSpec kubernetesDriverSpec =
133137
builder.buildFromFeatures(kubernetesDriverConf, client);
134-
return new SparkAppResourceSpec(kubernetesDriverConf, kubernetesDriverSpec);
138+
return new SparkAppResourceSpec(
139+
kubernetesDriverConf, kubernetesDriverSpec, driverServiceIngressList);
135140
}
136141

137142
/**
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.utils;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import io.fabric8.kubernetes.api.model.HasMetadata;
27+
import io.fabric8.kubernetes.api.model.ObjectMeta;
28+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
29+
import io.fabric8.kubernetes.api.model.Service;
30+
import io.fabric8.kubernetes.api.model.ServiceBuilder;
31+
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
32+
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
33+
import io.fabric8.kubernetes.api.model.networking.v1.IngressSpec;
34+
import io.fabric8.kubernetes.api.model.networking.v1.IngressSpecBuilder;
35+
36+
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
37+
38+
public final class DriverServiceIngressUtils {
39+
private DriverServiceIngressUtils() {}
40+
41+
/** Build the full spec for ingress and service. */
42+
public static List<HasMetadata> buildIngressService(
43+
DriverServiceIngressSpec spec, ObjectMeta driverPodMetaData) {
44+
List<HasMetadata> resources = new ArrayList<>(2);
45+
Service service = buildService(spec, driverPodMetaData);
46+
resources.add(service);
47+
resources.add(buildIngress(spec, service));
48+
return resources;
49+
}
50+
51+
private static Service buildService(DriverServiceIngressSpec spec, ObjectMeta driverPodMetaData) {
52+
ObjectMeta serviceMeta = new ObjectMetaBuilder(spec.getServiceMetadata()).build();
53+
serviceMeta.setNamespace(driverPodMetaData.getNamespace());
54+
Map<String, String> selectors = spec.getServiceSpec().getSelector();
55+
if (selectors == null || selectors.isEmpty()) {
56+
selectors = driverPodMetaData.getLabels();
57+
}
58+
return new ServiceBuilder()
59+
.withMetadata(serviceMeta)
60+
.withNewSpecLike(spec.getServiceSpec())
61+
.withSelector(selectors)
62+
.endSpec()
63+
.build();
64+
}
65+
66+
private static Ingress buildIngress(DriverServiceIngressSpec spec, Service service) {
67+
ObjectMeta metadata = new ObjectMetaBuilder(spec.getIngressMetadata()).build();
68+
IngressSpec ingressSpec = new IngressSpecBuilder(spec.getIngressSpec()).build();
69+
if ((ingressSpec.getRules() == null || ingressSpec.getRules().isEmpty())
70+
&& service.getSpec().getPorts() != null
71+
&& !service.getSpec().getPorts().isEmpty()) {
72+
// if no rule is provided, populate default path with backend to the associated service
73+
ingressSpec =
74+
new IngressSpecBuilder()
75+
.addNewRule()
76+
.withNewHttp()
77+
.addNewPath()
78+
.withPath("/")
79+
.withPathType("ImplementationSpecific")
80+
.withNewBackend()
81+
.withNewService()
82+
.withName(service.getMetadata().getName())
83+
.withNewPort()
84+
.withNumber(service.getSpec().getPorts().get(0).getPort())
85+
.endPort()
86+
.endService()
87+
.endBackend()
88+
.endPath()
89+
.endHttp()
90+
.endRule()
91+
.build();
92+
}
93+
return new IngressBuilder().withMetadata(metadata).withSpec(ingressSpec).build();
94+
}
95+
}

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ void testDriverResourceIncludesConfigMap() {
6969
when(mockSpec.pod()).thenReturn(sparkPod);
7070
when(mockSpec.systemProperties()).thenReturn(new HashMap<>());
7171

72-
SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, mockSpec);
72+
SparkAppResourceSpec appResourceSpec =
73+
new SparkAppResourceSpec(mockConf, mockSpec, Collections.emptyList());
7374

7475
Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());
7576
Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());

0 commit comments

Comments
 (0)