Skip to content

Commit a69cdf2

Browse files
committed
spark connect: move listener to role config
1 parent 8d4bdda commit a69cdf2

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,8 +2146,10 @@ spec:
21462146
type: string
21472147
type: object
21482148
server:
2149+
default:
2150+
role_config:
2151+
listener_class: cluster-internal
21492152
description: A Spark Connect server definition.
2150-
nullable: true
21512153
properties:
21522154
cliOverrides:
21532155
additionalProperties:
@@ -2157,10 +2159,6 @@ spec:
21572159
config:
21582160
default: {}
21592161
properties:
2160-
listenerClass:
2161-
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
2162-
nullable: true
2163-
type: string
21642162
logging:
21652163
default:
21662164
containers: {}
@@ -2333,6 +2331,15 @@ spec:
23332331
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
23342332
type: object
23352333
x-kubernetes-preserve-unknown-fields: true
2334+
role_config:
2335+
properties:
2336+
listener_class:
2337+
default: cluster-internal
2338+
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
2339+
type: string
2340+
type: object
2341+
required:
2342+
- role_config
23362343
type: object
23372344
vectorAggregatorConfigMapName:
23382345
description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator.

rust/operator-binary/src/connect/controller.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ pub async fn reconcile(
163163
.context(InvalidSparkConnectServerSnafu)?;
164164

165165
let server_config = scs.server_config().context(ServerConfigSnafu)?;
166+
let server_role_config = &scs.spec.server.role_config;
166167
let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?;
167168

168169
let client = &ctx.client;
@@ -286,7 +287,7 @@ pub async fn reconcile(
286287

287288
// ========================================
288289
// Server listener
289-
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
290+
let listener = server::build_listener(scs, server_role_config, &resolved_product_image)
290291
.context(BuildListenerSnafu)?;
291292

292293
cluster_resources

rust/operator-binary/src/connect/crd.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,30 @@ pub mod versioned {
9393
pub vector_aggregator_config_map_name: Option<String>,
9494

9595
/// A Spark Connect server definition.
96-
#[serde(default, skip_serializing_if = "Option::is_none")]
97-
pub server: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
96+
#[serde(default)]
97+
pub server: ServerConfigWrapper,
9898

9999
/// Spark Connect executor properties.
100100
#[serde(default, skip_serializing_if = "Option::is_none")]
101101
pub executor: Option<CommonConfiguration<ExecutorConfigFragment, JavaCommonConfig>>,
102102
}
103103

104+
/// This struct i
105+
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Serialize, Deserialize)]
106+
pub struct ServerConfigWrapper {
107+
#[serde(flatten)]
108+
pub config: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
109+
110+
pub role_config: ServerRoleConfig,
111+
}
112+
113+
#[derive(Clone, Debug, JsonSchema, PartialEq, Serialize, Deserialize)]
114+
pub struct ServerRoleConfig {
115+
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
116+
#[serde(default = "default_listener_class")]
117+
pub listener_class: String,
118+
}
119+
104120
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
105121
#[fragment_attrs(
106122
derive(
@@ -126,10 +142,6 @@ pub mod versioned {
126142
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
127143
#[fragment_attrs(serde(default))]
128144
pub requested_secret_lifetime: Option<Duration>,
129-
130-
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
131-
#[serde(default)]
132-
pub listener_class: String,
133145
}
134146

135147
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
@@ -218,7 +230,6 @@ impl v1alpha1::ServerConfig {
218230
},
219231
logging: product_logging::spec::default_logging(),
220232
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
221-
listener_class: Some("cluster-internal".into()),
222233
}
223234
}
224235

@@ -248,7 +259,7 @@ impl v1alpha1::SparkConnectServer {
248259
pub fn server_config(&self) -> Result<v1alpha1::ServerConfig, Error> {
249260
let defaults = v1alpha1::ServerConfig::default_config();
250261
fragment::validate(
251-
match self.spec.server.as_ref().map(|cc| cc.config.clone()) {
262+
match self.spec.server.config.as_ref().map(|cc| cc.config.clone()) {
252263
Some(fragment) => {
253264
let mut fc = fragment.clone();
254265
fc.merge(&defaults);
@@ -276,6 +287,18 @@ impl v1alpha1::SparkConnectServer {
276287
}
277288
}
278289

290+
impl Default for v1alpha1::ServerRoleConfig {
291+
fn default() -> Self {
292+
v1alpha1::ServerRoleConfig {
293+
listener_class: default_listener_class(),
294+
}
295+
}
296+
}
297+
298+
pub fn default_listener_class() -> String {
299+
"cluster-internal".to_string()
300+
}
301+
279302
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)]
280303
#[serde(rename_all = "camelCase")]
281304
pub struct SparkConnectServerStatus {

rust/operator-binary/src/connect/server.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ pub(crate) fn server_config_map(
148148
let jvm_sec_props = common::security_properties(
149149
scs.spec
150150
.server
151+
.config
151152
.as_ref()
152153
.and_then(|s| s.config_overrides.get(JVM_SECURITY_PROPERTIES_FILE)),
153154
)
@@ -158,6 +159,7 @@ pub(crate) fn server_config_map(
158159
let metrics_props = common::metrics_properties(
159160
scs.spec
160161
.server
162+
.config
161163
.as_ref()
162164
.and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)),
163165
)
@@ -253,6 +255,7 @@ pub(crate) fn build_stateful_set(
253255
let container_env = env(scs
254256
.spec
255257
.server
258+
.config
256259
.as_ref()
257260
.map(|s| s.env_overrides.clone())
258261
.as_ref())?;
@@ -344,7 +347,13 @@ pub(crate) fn build_stateful_set(
344347

345348
// Merge user defined pod template if available
346349
let mut pod_template = pb.build_template();
347-
if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) {
350+
if let Some(pod_overrides_spec) = scs
351+
.spec
352+
.server
353+
.config
354+
.as_ref()
355+
.map(|s| s.pod_overrides.clone())
356+
{
348357
pod_template.merge_from(pod_overrides_spec);
349358
}
350359

@@ -507,6 +516,7 @@ pub(crate) fn server_properties(
507516
let config_overrides = scs
508517
.spec
509518
.server
519+
.config
510520
.as_ref()
511521
.and_then(|s| s.config_overrides.get(SPARK_DEFAULTS_FILE_NAME));
512522

@@ -581,6 +591,7 @@ fn server_jvm_args(
581591
&jvm_args,
582592
scs.spec
583593
.server
594+
.config
584595
.as_ref()
585596
.map(|s| &s.product_specific_common_config),
586597
)
@@ -614,11 +625,11 @@ fn dummy_role_group_ref(
614625

615626
pub(crate) fn build_listener(
616627
scs: &v1alpha1::SparkConnectServer,
617-
config: &v1alpha1::ServerConfig,
628+
role_config: &v1alpha1::ServerRoleConfig,
618629
resolved_product_image: &ResolvedProductImage,
619630
) -> Result<listener::v1alpha1::Listener, Error> {
620631
let listener_name = dummy_role_group_ref(scs).object_name();
621-
let listener_class = config.listener_class.clone();
632+
let listener_class = role_config.listener_class.clone();
622633
let role = SparkConnectRole::Server.to_string();
623634
let recommended_object_labels =
624635
common::labels(scs, &resolved_product_image.app_version_label, &role);

0 commit comments

Comments
 (0)