Skip to content

Commit a8055af

Browse files
maltesanderrazvan
andauthored
Listener: move listener class to role config (#588)
* history: move listenerclass to role config * remove unused SparkConnectServerClusterConfig * spark connect: move listener to role config * rename structs for consistency * fix crd default, add comments * improve crd comment * adapted changelog * adapt listener docs * camelCase structs * wip * rename dummy* to default* and cleanup * consolidate listener / service names * fix docs * consolidate service name * fix test part 1 * fix changelog * clippy * add history listener service check * fix linter * fix docs * Apply suggestions from code review Co-authored-by: Razvan-Daniel Mihai <[email protected]> * fmt --------- Co-authored-by: Razvan-Daniel Mihai <[email protected]>
1 parent c16d0e5 commit a8055af

File tree

20 files changed

+216
-127
lines changed

20 files changed

+216
-127
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ All notable changes to this project will be documented in this file.
3939

4040
- Use `json` file extension for log files ([#553]).
4141
- The Spark connect controller now watches StatefulSets instead of Deployments (again) ([#573]).
42+
- BREAKING: Move `listenerClass` to `roleConfig` for Spark History Server and Spark Connect. Service names changed. ([#588]).
4243

4344
### Removed
4445

@@ -62,6 +63,7 @@ All notable changes to this project will be documented in this file.
6263
[#575]: https://github.com/stackabletech/spark-k8s-operator/pull/575
6364
[#584]: https://github.com/stackabletech/spark-k8s-operator/pull/584
6465
[#585]: https://github.com/stackabletech/spark-k8s-operator/pull/585
66+
[#588]: https://github.com/stackabletech/spark-k8s-operator/pull/588
6567

6668
## [25.3.0] - 2025-03-21
6769

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,13 +1117,6 @@ spec:
11171117
spec:
11181118
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
11191119
properties:
1120-
clusterConfig:
1121-
default: {}
1122-
description: |-
1123-
Global Spark history server configuration that applies to all roles.
1124-
1125-
This was previously used to hold the listener configuration, which has since moved to the role configuration.
1126-
type: object
11271120
image:
11281121
anyOf:
11291122
- required:
@@ -1379,9 +1372,6 @@ spec:
13791372
cleaner:
13801373
nullable: true
13811374
type: boolean
1382-
listenerClass:
1383-
nullable: true
1384-
type: string
13851375
logging:
13861376
default:
13871377
containers: {}
@@ -1556,11 +1546,16 @@ spec:
15561546
x-kubernetes-preserve-unknown-fields: true
15571547
roleConfig:
15581548
default:
1549+
listenerClass: cluster-internal
15591550
podDisruptionBudget:
15601551
enabled: true
15611552
maxUnavailable: null
15621553
description: This is a product-agnostic RoleConfig, which is sufficient for most of the products.
15631554
properties:
1555+
listenerClass:
1556+
default: cluster-internal
1557+
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the history server.
1558+
type: string
15641559
podDisruptionBudget:
15651560
default:
15661561
enabled: true
@@ -1628,9 +1623,6 @@ spec:
16281623
cleaner:
16291624
nullable: true
16301625
type: boolean
1631-
listenerClass:
1632-
nullable: true
1633-
type: string
16341626
logging:
16351627
default:
16361628
containers: {}
@@ -1868,13 +1860,6 @@ spec:
18681860
items:
18691861
type: string
18701862
type: array
1871-
clusterConfig:
1872-
default: {}
1873-
description: |-
1874-
Global Spark Connect server configuration that applies to all roles.
1875-
1876-
This was previously used to hold the listener configuration, which has since moved to the server configuration.
1877-
type: object
18781863
clusterOperation:
18791864
default:
18801865
reconciliationPaused: false
@@ -2154,8 +2139,10 @@ spec:
21542139
type: string
21552140
type: object
21562141
server:
2142+
default:
2143+
roleConfig:
2144+
listenerClass: cluster-internal
21572145
description: A Spark Connect server definition.
2158-
nullable: true
21592146
properties:
21602147
cliOverrides:
21612148
additionalProperties:
@@ -2165,10 +2152,6 @@ spec:
21652152
config:
21662153
default: {}
21672154
properties:
2168-
listenerClass:
2169-
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
2170-
nullable: true
2171-
type: string
21722155
logging:
21732156
default:
21742157
containers: {}
@@ -2341,6 +2324,16 @@ spec:
23412324
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
23422325
type: object
23432326
x-kubernetes-preserve-unknown-fields: true
2327+
roleConfig:
2328+
default:
2329+
listenerClass: cluster-internal
2330+
description: Global role config settings for the Spark Connect Server.
2331+
properties:
2332+
listenerClass:
2333+
default: cluster-internal
2334+
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark Connect services.
2335+
type: string
2336+
type: object
23442337
type: object
23452338
vectorAggregatorConfigMapName:
23462339
description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator.
Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
= Service exposition with listener classes
22
:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable.
33

4-
== History services
4+
== Spark History services
55

6-
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod.
7-
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`:
6+
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each Spark History Server pod.
7+
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.roleConfig.listenerClass`:
88

99
[source,yaml]
1010
----
@@ -14,13 +14,28 @@ metadata:
1414
name: spark-history
1515
spec:
1616
nodes:
17-
config:
17+
roleConfig:
1818
listenerClass: external-unstable # <1>
1919
----
2020
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).
2121

22-
For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group.
22+
For the example above, the listener operator creates a service named `spark-history-node` where `spark-history` is the name of the SparkHistoryServer and `node` is the service role (the only service role available for history servers).
2323

24-
== Connect services
24+
== Spark Connect services
2525

26-
Connect pods can be exposed using listener classes in exactly tha same fashion as history servers.
26+
Connect pods can be exposed using listener classes in exactly tha same fashion as History Servers (with the exception for the role):
27+
28+
[source,yaml]
29+
----
30+
apiVersion: spark.stackable.tech/v1alpha1
31+
kind: SparkConnectServer
32+
metadata:
33+
name: spark-connect
34+
spec:
35+
servers:
36+
roleConfig:
37+
listenerClass: external-unstable # <1>
38+
----
39+
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).
40+
41+
For the example above, the listener operator creates a service named `spark-connect-server` where `spark-connect` is the name of the SparkConnectServer and `server` is the service role.

rust/operator-binary/src/connect/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::crd::CONNECT_EXECUTOR_ROLE_NAME;
1212
use crate::{
1313
connect::crd::{
1414
CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME,
15-
DUMMY_SPARK_CONNECT_GROUP_NAME,
15+
DEFAULT_SPARK_CONNECT_GROUP_NAME,
1616
},
1717
crd::constants::OPERATOR_NAME,
1818
};
@@ -53,7 +53,7 @@ pub(crate) fn labels<'a, T>(
5353
operator_name: OPERATOR_NAME,
5454
controller_name: CONNECT_CONTROLLER_NAME,
5555
role,
56-
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME,
56+
role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME,
5757
}
5858
}
5959

rust/operator-binary/src/connect/controller.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ pub async fn reconcile(
163163
.context(InvalidSparkConnectServerSnafu)?;
164164

165165
let server_config = scs.server_config().context(ServerConfigSnafu)?;
166+
let server_role_config = &scs.spec.server.role_config;
166167
let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?;
167168

168169
let client = &ctx.client;
@@ -204,7 +205,7 @@ pub async fn reconcile(
204205
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
205206
.context(BuildServiceSnafu)?;
206207

207-
cluster_resources
208+
let applied_internal_service = cluster_resources
208209
.add(client, service.clone())
209210
.await
210211
.context(ApplyServiceSnafu)?;
@@ -216,7 +217,7 @@ pub async fn reconcile(
216217
server::server_properties(
217218
scs,
218219
&server_config,
219-
&service,
220+
&applied_internal_service,
220221
&service_account,
221222
&resolved_product_image,
222223
)
@@ -271,6 +272,16 @@ pub async fn reconcile(
271272
name: scs.name_unchecked(),
272273
})?;
273274

275+
// ========================================
276+
// Server listener
277+
let listener = server::build_listener(scs, server_role_config, &resolved_product_image)
278+
.context(BuildListenerSnafu)?;
279+
280+
let applied_listener = cluster_resources
281+
.add(client, listener)
282+
.await
283+
.context(ApplyListenerSnafu)?;
284+
274285
// ========================================
275286
// Server stateful set
276287
let args = server::command_args(&scs.spec.args);
@@ -280,20 +291,11 @@ pub async fn reconcile(
280291
&resolved_product_image,
281292
&service_account,
282293
&server_config_map,
294+
&applied_listener.name_any(),
283295
args,
284296
)
285297
.context(BuildServerStatefulSetSnafu)?;
286298

287-
// ========================================
288-
// Server listener
289-
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
290-
.context(BuildListenerSnafu)?;
291-
292-
cluster_resources
293-
.add(client, listener)
294-
.await
295-
.context(ApplyListenerSnafu)?;
296-
297299
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
298300

299301
ss_cond_builder.add(

rust/operator-binary/src/connect/crd.rs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub const CONNECT_EXECUTOR_ROLE_NAME: &str = "executor";
4545
pub const CONNECT_GRPC_PORT: i32 = 15002;
4646
pub const CONNECT_UI_PORT: i32 = 4040;
4747

48-
pub const DUMMY_SPARK_CONNECT_GROUP_NAME: &str = "default";
48+
pub const DEFAULT_SPARK_CONNECT_GROUP_NAME: &str = "default";
4949

5050
pub const CONNECT_APP_NAME: &str = "spark-connect";
5151

@@ -79,13 +79,6 @@ pub mod versioned {
7979
pub struct SparkConnectServerSpec {
8080
pub image: ProductImage,
8181

82-
/// Global Spark Connect server configuration that applies to all roles.
83-
///
84-
/// This was previously used to hold the listener configuration, which has since moved
85-
/// to the server configuration.
86-
#[serde(default)]
87-
pub cluster_config: v1alpha1::SparkConnectServerClusterConfig,
88-
8982
// no doc string - See ClusterOperation struct
9083
#[serde(default)]
9184
pub cluster_operation: ClusterOperation,
@@ -100,17 +93,34 @@ pub mod versioned {
10093
pub vector_aggregator_config_map_name: Option<String>,
10194

10295
/// A Spark Connect server definition.
103-
#[serde(default, skip_serializing_if = "Option::is_none")]
104-
pub server: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
96+
#[serde(default)]
97+
pub server: SparkConnectServerConfigWrapper,
10598

10699
/// Spark Connect executor properties.
107100
#[serde(default, skip_serializing_if = "Option::is_none")]
108101
pub executor: Option<CommonConfiguration<ExecutorConfigFragment, JavaCommonConfig>>,
109102
}
110103

111-
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
104+
/// This struct is a wrapper for the `ServerConfig` in order to keep the `spec.server.roleConfig` setting consistent.
105+
/// It is required since Spark Connect does not utilize the Stackable `Role` and therefore does not offer a `roleConfig`.
106+
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Serialize, Deserialize)]
107+
#[serde(rename_all = "camelCase")]
108+
pub struct SparkConnectServerConfigWrapper {
109+
#[serde(flatten)]
110+
pub config: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
111+
#[serde(default)]
112+
pub role_config: SparkConnectServerRoleConfig,
113+
}
114+
115+
/// Global role config settings for the Spark Connect Server.
116+
#[derive(Clone, Debug, JsonSchema, PartialEq, Serialize, Deserialize)]
112117
#[serde(rename_all = "camelCase")]
113-
pub struct SparkConnectServerClusterConfig {}
118+
struct SparkConnectServerRoleConfig {
119+
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html)
120+
/// is used to expose the Spark Connect services.
121+
#[serde(default = "default_listener_class")]
122+
pub listener_class: String,
123+
}
114124

115125
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
116126
#[fragment_attrs(
@@ -137,10 +147,6 @@ pub mod versioned {
137147
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
138148
#[fragment_attrs(serde(default))]
139149
pub requested_secret_lifetime: Option<Duration>,
140-
141-
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
142-
#[serde(default)]
143-
pub listener_class: String,
144150
}
145151

146152
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
@@ -229,7 +235,6 @@ impl v1alpha1::ServerConfig {
229235
},
230236
logging: product_logging::spec::default_logging(),
231237
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
232-
listener_class: Some("cluster-internal".into()),
233238
}
234239
}
235240

@@ -259,7 +264,7 @@ impl v1alpha1::SparkConnectServer {
259264
pub fn server_config(&self) -> Result<v1alpha1::ServerConfig, Error> {
260265
let defaults = v1alpha1::ServerConfig::default_config();
261266
fragment::validate(
262-
match self.spec.server.as_ref().map(|cc| cc.config.clone()) {
267+
match self.spec.server.config.as_ref().map(|cc| cc.config.clone()) {
263268
Some(fragment) => {
264269
let mut fc = fragment.clone();
265270
fc.merge(&defaults);
@@ -287,6 +292,18 @@ impl v1alpha1::SparkConnectServer {
287292
}
288293
}
289294

295+
impl Default for v1alpha1::SparkConnectServerRoleConfig {
296+
fn default() -> Self {
297+
v1alpha1::SparkConnectServerRoleConfig {
298+
listener_class: default_listener_class(),
299+
}
300+
}
301+
}
302+
303+
pub fn default_listener_class() -> String {
304+
"cluster-internal".to_string()
305+
}
306+
290307
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)]
291308
#[serde(rename_all = "camelCase")]
292309
pub struct SparkConnectServerStatus {

rust/operator-binary/src/connect/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use stackable_operator::{
2323

2424
use super::{
2525
common::{SparkConnectRole, object_name},
26-
crd::{DUMMY_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer},
26+
crd::{DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer},
2727
};
2828
use crate::{
2929
connect::{common, crd::v1alpha1},
@@ -354,7 +354,7 @@ pub(crate) fn executor_config_map(
354354
let role_group_ref = RoleGroupRef {
355355
cluster: ObjectRef::from_obj(scs),
356356
role: SparkConnectRole::Executor.to_string(),
357-
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(),
357+
role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME.to_string(),
358358
};
359359
product_logging::extend_config_map(
360360
&role_group_ref,

0 commit comments

Comments
 (0)