Skip to content

Commit 29d2cd3

Browse files
committed
add listener.rs and replace usages of CurrentlySupportedListenerClasses
1 parent ab4cf70 commit 29d2cd3

File tree

8 files changed

+107
-221
lines changed

8 files changed

+107
-221
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,16 +1124,6 @@ spec:
11241124
properties:
11251125
listenerClass:
11261126
default: cluster-internal
1127-
description: |-
1128-
This field controls which type of Service the Operator creates for this HistoryServer:
1129-
1130-
* cluster-internal: Use a ClusterIP service
1131-
1132-
* external-unstable: Use a NodePort service
1133-
1134-
* external-stable: Use a LoadBalancer service
1135-
1136-
This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
11371127
enum:
11381128
- cluster-internal
11391129
- external-unstable
@@ -1880,21 +1870,12 @@ spec:
18801870
type: array
18811871
clusterConfig:
18821872
default:
1883-
listenerClass: external-unstable
1873+
listenerClass: cluster-internal
18841874
description: Global Spark Connect server configuration that applies to all roles.
18851875
properties:
18861876
listenerClass:
1887-
default: external-unstable
1888-
description: |-
1889-
This field controls which type of Service the Operator creates for this ConnectServer:
1890-
1891-
* cluster-internal: Use a ClusterIP service
1892-
1893-
* external-unstable: Use a NodePort service
1894-
1895-
* external-stable: Use a LoadBalancer service
1896-
1897-
This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
1877+
default: cluster-internal
1878+
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
18981879
enum:
18991880
- cluster-internal
19001881
- external-unstable

rust/operator-binary/src/connect/controller.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -192,21 +192,9 @@ pub async fn reconcile(
192192
.await
193193
.context(ApplyRoleBindingSnafu)?;
194194

195-
// Expose connect server to the outside world
196-
let service = server::build_service(scs, &resolved_product_image.app_version_label, None)
197-
.context(BuildServiceSnafu)?;
198-
cluster_resources
199-
.add(client, service.clone())
200-
.await
201-
.context(ApplyServiceSnafu)?;
202-
203195
// Headless service used by executors connect back to the driver
204-
let service = server::build_service(
205-
scs,
206-
&resolved_product_image.app_version_label,
207-
Some("None".to_string()),
208-
)
209-
.context(BuildServiceSnafu)?;
196+
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
197+
.context(BuildServiceSnafu)?;
210198

211199
cluster_resources
212200
.add(client, service.clone())

rust/operator-binary/src/connect/crd.rs

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use stackable_operator::{
3333
use strum::{Display, EnumIter};
3434

3535
use super::common::SparkConnectRole;
36-
use crate::crd::constants::APP_NAME;
36+
use crate::crd::{constants::APP_NAME, listener::SupportedListenerClasses};
3737

3838
pub const CONNECT_CONTROLLER_NAME: &str = "connect";
3939
pub const CONNECT_FULL_CONTROLLER_NAME: &str = concatcp!(
@@ -107,19 +107,9 @@ pub mod versioned {
107107
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
108108
#[serde(rename_all = "camelCase")]
109109
pub struct SparkConnectServerClusterConfig {
110-
/// This field controls which type of Service the Operator creates for this ConnectServer:
111-
///
112-
/// * cluster-internal: Use a ClusterIP service
113-
///
114-
/// * external-unstable: Use a NodePort service
115-
///
116-
/// * external-stable: Use a LoadBalancer service
117-
///
118-
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
119-
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
120-
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
110+
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
121111
#[serde(default)]
122-
pub listener_class: CurrentlySupportedListenerClasses,
112+
pub listener_class: SupportedListenerClasses,
123113
}
124114

125115
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
@@ -178,29 +168,6 @@ pub mod versioned {
178168
}
179169
}
180170

181-
// TODO: Temporary solution until listener-operator is finished
182-
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
183-
#[serde(rename_all = "PascalCase")]
184-
pub(crate) enum CurrentlySupportedListenerClasses {
185-
#[serde(rename = "cluster-internal")]
186-
ClusterInternal,
187-
#[default]
188-
#[serde(rename = "external-unstable")]
189-
ExternalUnstable,
190-
#[serde(rename = "external-stable")]
191-
ExternalStable,
192-
}
193-
194-
impl CurrentlySupportedListenerClasses {
195-
pub fn k8s_service_type(&self) -> String {
196-
match self {
197-
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
198-
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
199-
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
200-
}
201-
}
202-
}
203-
204171
#[allow(clippy::derive_partial_eq_without_eq)]
205172
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
206173
#[fragment_attrs(

rust/operator-binary/src/connect/server.rs

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -349,35 +349,13 @@ pub(crate) fn build_deployment(
349349
})
350350
}
351351

352-
#[allow(clippy::result_large_err)]
353-
pub(crate) fn build_service(
352+
// This is the headless driver service used for the internal
353+
// communication with the executors as recommended by the Spark docs.
354+
pub(crate) fn build_internal_service(
354355
scs: &v1alpha1::SparkConnectServer,
355356
app_version_label: &str,
356-
service_cluster_ip: Option<String>,
357357
) -> Result<Service, Error> {
358-
let (service_name, service_type, publish_not_ready_addresses) = match service_cluster_ip.clone()
359-
{
360-
Some(_) => (
361-
// These are the properties of the headless driver service used for the internal
362-
// communication with the executors as recommended by the Spark docs.
363-
//
364-
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
365-
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
366-
// "ready" until the Service is "ready" and vice versa.
367-
object_name(&scs.name_any(), SparkConnectRole::Server),
368-
"ClusterIP".to_string(),
369-
Some(true),
370-
),
371-
None => (
372-
format!(
373-
"{}-{}",
374-
object_name(&scs.name_any(), SparkConnectRole::Server),
375-
SparkConnectRole::Server
376-
),
377-
scs.spec.cluster_config.listener_class.k8s_service_type(),
378-
Some(false),
379-
),
380-
};
358+
let service_name = object_name(&scs.name_any(), SparkConnectRole::Server);
381359

382360
let selector = Labels::role_selector(scs, APP_NAME, &SparkConnectRole::Server.to_string())
383361
.context(LabelBuildSnafu)?
@@ -398,8 +376,8 @@ pub(crate) fn build_service(
398376
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
399377
.build(),
400378
spec: Some(ServiceSpec {
401-
type_: Some(service_type),
402-
cluster_ip: service_cluster_ip,
379+
type_: Some("ClusterIP".to_string()),
380+
cluster_ip: Some("None".to_string()),
403381
ports: Some(vec![
404382
ServicePort {
405383
name: Some(String::from(GRPC)),
@@ -413,7 +391,10 @@ pub(crate) fn build_service(
413391
},
414392
]),
415393
selector: Some(selector),
416-
publish_not_ready_addresses,
394+
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
395+
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
396+
// "ready" until the Service is "ready" and vice versa.
397+
publish_not_ready_addresses: Some(true),
417398
..ServiceSpec::default()
418399
}),
419400
status: None,

rust/operator-binary/src/crd/history.rs

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use stackable_operator::{
3232
use strum::{Display, EnumIter};
3333

3434
use crate::{
35-
crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir},
35+
crd::{
36+
affinity::history_affinity, constants::*, listener::SupportedListenerClasses,
37+
logdir::ResolvedLogDir,
38+
},
3639
history::config::jvm::construct_history_jvm_args,
3740
};
3841

@@ -62,6 +65,7 @@ pub enum Error {
6265

6366
#[versioned(version(name = "v1alpha1"))]
6467
pub mod versioned {
68+
6569
/// A Spark cluster history server component. This resource is managed by the Stackable operator
6670
/// for Apache Spark. Find more information on how to use it in the
6771
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server).
@@ -103,42 +107,8 @@ pub mod versioned {
103107
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
104108
#[serde(rename_all = "camelCase")]
105109
pub struct SparkHistoryServerClusterConfig {
106-
/// This field controls which type of Service the Operator creates for this HistoryServer:
107-
///
108-
/// * cluster-internal: Use a ClusterIP service
109-
///
110-
/// * external-unstable: Use a NodePort service
111-
///
112-
/// * external-stable: Use a LoadBalancer service
113-
///
114-
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
115-
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
116-
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
117110
#[serde(default)]
118-
pub listener_class: CurrentlySupportedListenerClasses,
119-
}
120-
}
121-
122-
// TODO: Temporary solution until listener-operator is finished
123-
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
124-
#[serde(rename_all = "PascalCase")]
125-
pub enum CurrentlySupportedListenerClasses {
126-
#[default]
127-
#[serde(rename = "cluster-internal")]
128-
ClusterInternal,
129-
#[serde(rename = "external-unstable")]
130-
ExternalUnstable,
131-
#[serde(rename = "external-stable")]
132-
ExternalStable,
133-
}
134-
135-
impl CurrentlySupportedListenerClasses {
136-
pub fn k8s_service_type(&self) -> String {
137-
match self {
138-
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
139-
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
140-
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
141-
}
111+
pub listener_class: SupportedListenerClasses,
142112
}
143113
}
144114

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use serde::{Deserialize, Serialize};
2+
use stackable_operator::{
3+
config::merge::Atomic,
4+
schemars::{self, JsonSchema},
5+
};
6+
use strum::Display;
7+
8+
impl Atomic for SupportedListenerClasses {}
9+
10+
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
11+
#[serde(rename_all = "PascalCase")]
12+
pub enum SupportedListenerClasses {
13+
#[default]
14+
#[serde(rename = "cluster-internal")]
15+
#[strum(serialize = "cluster-internal")]
16+
ClusterInternal,
17+
18+
#[serde(rename = "external-unstable")]
19+
#[strum(serialize = "external-unstable")]
20+
ExternalUnstable,
21+
22+
#[serde(rename = "external-stable")]
23+
#[strum(serialize = "external-stable")]
24+
ExternalStable,
25+
}

rust/operator-binary/src/crd/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use crate::{
5555
pub mod affinity;
5656
pub mod constants;
5757
pub mod history;
58+
pub mod listener;
5859
pub mod logdir;
5960
pub mod roles;
6061
pub mod tlscerts;

0 commit comments

Comments
 (0)