diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a2f6e30..2c11b169 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ All notable changes to this project will be documented in this file. - Use `json` file extension for log files ([#553]). - The Spark connect controller now watches StatefulSets instead of Deployments (again) ([#573]). +- BREAKING: Move `listenerClass` to `roleConfig` for Spark History Server and Spark Connect. Service names changed. ([#588]). ### Removed @@ -62,6 +63,7 @@ All notable changes to this project will be documented in this file. [#575]: https://github.com/stackabletech/spark-k8s-operator/pull/575 [#584]: https://github.com/stackabletech/spark-k8s-operator/pull/584 [#585]: https://github.com/stackabletech/spark-k8s-operator/pull/585 +[#588]: https://github.com/stackabletech/spark-k8s-operator/pull/588 ## [25.3.0] - 2025-03-21 diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 52a61f09..4f8aeae9 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -1117,13 +1117,6 @@ spec: spec: description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server). properties: - clusterConfig: - default: {} - description: |- - Global Spark history server configuration that applies to all roles. - - This was previously used to hold the listener configuration, which has since moved to the role configuration. - type: object image: anyOf: - required: @@ -1379,9 +1372,6 @@ spec: cleaner: nullable: true type: boolean - listenerClass: - nullable: true - type: string logging: default: containers: {} @@ -1556,11 +1546,16 @@ spec: x-kubernetes-preserve-unknown-fields: true roleConfig: default: + listenerClass: cluster-internal podDisruptionBudget: enabled: true maxUnavailable: null description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. properties: + listenerClass: + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the history server. + type: string podDisruptionBudget: default: enabled: true @@ -1628,9 +1623,6 @@ spec: cleaner: nullable: true type: boolean - listenerClass: - nullable: true - type: string logging: default: containers: {} @@ -1868,13 +1860,6 @@ spec: items: type: string type: array - clusterConfig: - default: {} - description: |- - Global Spark Connect server configuration that applies to all roles. - - This was previously used to hold the listener configuration, which has since moved to the server configuration. - type: object clusterOperation: default: reconciliationPaused: false @@ -2154,8 +2139,10 @@ spec: type: string type: object server: + default: + roleConfig: + listenerClass: cluster-internal description: A Spark Connect server definition. - nullable: true properties: cliOverrides: additionalProperties: @@ -2165,10 +2152,6 @@ spec: config: default: {} properties: - listenerClass: - description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services. - nullable: true - type: string logging: default: containers: {} @@ -2341,6 +2324,16 @@ spec: description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. type: object x-kubernetes-preserve-unknown-fields: true + roleConfig: + default: + listenerClass: cluster-internal + description: Global role config settings for the Spark Connect Server. + properties: + listenerClass: + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark Connect services. + type: string + type: object type: object vectorAggregatorConfigMapName: description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator. diff --git a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc index f2ab3adf..6facf7ff 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc @@ -1,10 +1,10 @@ = Service exposition with listener classes :description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable. -== History services +== Spark History services -The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod. -The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for each Spark History Server pod. +The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.roleConfig.listenerClass`: [source,yaml] ---- @@ -14,13 +14,28 @@ metadata: name: spark-history spec: nodes: - config: + roleConfig: listenerClass: external-unstable # <1> ---- <1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). -For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group. +For the example above, the listener operator creates a service named `spark-history-node` where `spark-history` is the name of the SparkHistoryServer and `node` is the service role (the only service role available for history servers). -== Connect services +== Spark Connect services -Connect pods can be exposed using listener classes in exactly tha same fashion as history servers. +Connect pods can be exposed using listener classes in exactly tha same fashion as History Servers (with the exception for the role): + +[source,yaml] +---- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkConnectServer +metadata: + name: spark-connect +spec: + servers: + roleConfig: + listenerClass: external-unstable # <1> +---- +<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`). + +For the example above, the listener operator creates a service named `spark-connect-server` where `spark-connect` is the name of the SparkConnectServer and `server` is the service role. diff --git a/rust/operator-binary/src/connect/common.rs b/rust/operator-binary/src/connect/common.rs index c38836c6..78ca6e87 100644 --- a/rust/operator-binary/src/connect/common.rs +++ b/rust/operator-binary/src/connect/common.rs @@ -12,7 +12,7 @@ use super::crd::CONNECT_EXECUTOR_ROLE_NAME; use crate::{ connect::crd::{ CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME, - DUMMY_SPARK_CONNECT_GROUP_NAME, + DEFAULT_SPARK_CONNECT_GROUP_NAME, }, crd::constants::OPERATOR_NAME, }; @@ -53,7 +53,7 @@ pub(crate) fn labels<'a, T>( operator_name: OPERATOR_NAME, controller_name: CONNECT_CONTROLLER_NAME, role, - role_group: DUMMY_SPARK_CONNECT_GROUP_NAME, + role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME, } } diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index e25735aa..345bac6d 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -163,6 +163,7 @@ pub async fn reconcile( .context(InvalidSparkConnectServerSnafu)?; let server_config = scs.server_config().context(ServerConfigSnafu)?; + let server_role_config = &scs.spec.server.role_config; let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; let client = &ctx.client; @@ -204,7 +205,7 @@ pub async fn reconcile( let service = server::build_internal_service(scs, &resolved_product_image.app_version_label) .context(BuildServiceSnafu)?; - cluster_resources + let applied_internal_service = cluster_resources .add(client, service.clone()) .await .context(ApplyServiceSnafu)?; @@ -216,7 +217,7 @@ pub async fn reconcile( server::server_properties( scs, &server_config, - &service, + &applied_internal_service, &service_account, &resolved_product_image, ) @@ -271,6 +272,16 @@ pub async fn reconcile( name: scs.name_unchecked(), })?; + // ======================================== + // Server listener + let listener = server::build_listener(scs, server_role_config, &resolved_product_image) + .context(BuildListenerSnafu)?; + + let applied_listener = cluster_resources + .add(client, listener) + .await + .context(ApplyListenerSnafu)?; + // ======================================== // Server stateful set let args = server::command_args(&scs.spec.args); @@ -280,20 +291,11 @@ pub async fn reconcile( &resolved_product_image, &service_account, &server_config_map, + &applied_listener.name_any(), args, ) .context(BuildServerStatefulSetSnafu)?; - // ======================================== - // Server listener - let listener = server::build_listener(scs, &server_config, &resolved_product_image) - .context(BuildListenerSnafu)?; - - cluster_resources - .add(client, listener) - .await - .context(ApplyListenerSnafu)?; - let mut ss_cond_builder = StatefulSetConditionBuilder::default(); ss_cond_builder.add( diff --git a/rust/operator-binary/src/connect/crd.rs b/rust/operator-binary/src/connect/crd.rs index 0009f332..a87eed68 100644 --- a/rust/operator-binary/src/connect/crd.rs +++ b/rust/operator-binary/src/connect/crd.rs @@ -45,7 +45,7 @@ pub const CONNECT_EXECUTOR_ROLE_NAME: &str = "executor"; pub const CONNECT_GRPC_PORT: i32 = 15002; pub const CONNECT_UI_PORT: i32 = 4040; -pub const DUMMY_SPARK_CONNECT_GROUP_NAME: &str = "default"; +pub const DEFAULT_SPARK_CONNECT_GROUP_NAME: &str = "default"; pub const CONNECT_APP_NAME: &str = "spark-connect"; @@ -79,13 +79,6 @@ pub mod versioned { pub struct SparkConnectServerSpec { pub image: ProductImage, - /// Global Spark Connect server configuration that applies to all roles. - /// - /// This was previously used to hold the listener configuration, which has since moved - /// to the server configuration. - #[serde(default)] - pub cluster_config: v1alpha1::SparkConnectServerClusterConfig, - // no doc string - See ClusterOperation struct #[serde(default)] pub cluster_operation: ClusterOperation, @@ -100,17 +93,34 @@ pub mod versioned { pub vector_aggregator_config_map_name: Option, /// A Spark Connect server definition. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub server: Option>, + #[serde(default)] + pub server: SparkConnectServerConfigWrapper, /// Spark Connect executor properties. #[serde(default, skip_serializing_if = "Option::is_none")] pub executor: Option>, } - #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] + /// This struct is a wrapper for the `ServerConfig` in order to keep the `spec.server.roleConfig` setting consistent. + /// It is required since Spark Connect does not utilize the Stackable `Role` and therefore does not offer a `roleConfig`. + #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkConnectServerConfigWrapper { + #[serde(flatten)] + pub config: Option>, + #[serde(default)] + pub role_config: SparkConnectServerRoleConfig, + } + + /// Global role config settings for the Spark Connect Server. + #[derive(Clone, Debug, JsonSchema, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] - pub struct SparkConnectServerClusterConfig {} + struct SparkConnectServerRoleConfig { + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) + /// is used to expose the Spark Connect services. + #[serde(default = "default_listener_class")] + pub listener_class: String, + } #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] #[fragment_attrs( @@ -137,10 +147,6 @@ pub mod versioned { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, - - /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services. - #[serde(default)] - pub listener_class: String, } #[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)] @@ -229,7 +235,6 @@ impl v1alpha1::ServerConfig { }, logging: product_logging::spec::default_logging(), requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME), - listener_class: Some("cluster-internal".into()), } } @@ -259,7 +264,7 @@ impl v1alpha1::SparkConnectServer { pub fn server_config(&self) -> Result { let defaults = v1alpha1::ServerConfig::default_config(); fragment::validate( - match self.spec.server.as_ref().map(|cc| cc.config.clone()) { + match self.spec.server.config.as_ref().map(|cc| cc.config.clone()) { Some(fragment) => { let mut fc = fragment.clone(); fc.merge(&defaults); @@ -287,6 +292,18 @@ impl v1alpha1::SparkConnectServer { } } +impl Default for v1alpha1::SparkConnectServerRoleConfig { + fn default() -> Self { + v1alpha1::SparkConnectServerRoleConfig { + listener_class: default_listener_class(), + } + } +} + +pub fn default_listener_class() -> String { + "cluster-internal".to_string() +} + #[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)] #[serde(rename_all = "camelCase")] pub struct SparkConnectServerStatus { diff --git a/rust/operator-binary/src/connect/executor.rs b/rust/operator-binary/src/connect/executor.rs index 6b213c8d..7c3faf6f 100644 --- a/rust/operator-binary/src/connect/executor.rs +++ b/rust/operator-binary/src/connect/executor.rs @@ -23,7 +23,7 @@ use stackable_operator::{ use super::{ common::{SparkConnectRole, object_name}, - crd::{DUMMY_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer}, + crd::{DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer}, }; use crate::{ connect::{common, crd::v1alpha1}, @@ -354,7 +354,7 @@ pub(crate) fn executor_config_map( let role_group_ref = RoleGroupRef { cluster: ObjectRef::from_obj(scs), role: SparkConnectRole::Executor.to_string(), - role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME.to_string(), }; product_logging::extend_config_map( &role_group_ref, diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index b4019b7a..18cb9d6c 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -40,7 +40,7 @@ use crate::{ connect::{ common::{self, SparkConnectRole, object_name}, crd::{ - CONNECT_GRPC_PORT, CONNECT_UI_PORT, DUMMY_SPARK_CONNECT_GROUP_NAME, + CONNECT_GRPC_PORT, CONNECT_UI_PORT, DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer, v1alpha1, }, }, @@ -148,6 +148,7 @@ pub(crate) fn server_config_map( let jvm_sec_props = common::security_properties( scs.spec .server + .config .as_ref() .and_then(|s| s.config_overrides.get(JVM_SECURITY_PROPERTIES_FILE)), ) @@ -158,6 +159,7 @@ pub(crate) fn server_config_map( let metrics_props = common::metrics_properties( scs.spec .server + .config .as_ref() .and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)), ) @@ -187,7 +189,7 @@ pub(crate) fn server_config_map( .add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props) .add_data(METRICS_PROPERTIES_FILE, metrics_props); - let role_group_ref = dummy_role_group_ref(scs); + let role_group_ref = default_role_group_ref(scs); product_logging::extend_config_map( &role_group_ref, &config.logging, @@ -210,6 +212,7 @@ pub(crate) fn build_stateful_set( resolved_product_image: &ResolvedProductImage, service_account: &ServiceAccount, config_map: &ConfigMap, + listener_name: &str, args: Vec, ) -> Result { let server_role = SparkConnectRole::Server.to_string(); @@ -253,6 +256,7 @@ pub(crate) fn build_stateful_set( let container_env = env(scs .spec .server + .config .as_ref() .map(|s| s.env_overrides.clone()) .as_ref())?; @@ -334,7 +338,7 @@ pub(crate) fn build_stateful_set( // cluster-internal) as the address should still be consistent. let volume_claim_templates = Some(vec![ ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerName(dummy_role_group_ref(scs).object_name()), + &ListenerReference::ListenerName(listener_name.to_string()), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? @@ -344,7 +348,13 @@ pub(crate) fn build_stateful_set( // Merge user defined pod template if available let mut pod_template = pb.build_template(); - if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) { + if let Some(pod_overrides_spec) = scs + .spec + .server + .config + .as_ref() + .map(|s| s.pod_overrides.clone()) + { pod_template.merge_from(pod_overrides_spec); } @@ -371,7 +381,7 @@ pub(crate) fn build_stateful_set( scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string(), - DUMMY_SPARK_CONNECT_GROUP_NAME, + DEFAULT_SPARK_CONNECT_GROUP_NAME, ) .context(LabelBuildSnafu)? .into(), @@ -390,7 +400,11 @@ pub(crate) fn build_internal_service( scs: &v1alpha1::SparkConnectServer, app_version_label: &str, ) -> Result { - let service_name = object_name(&scs.name_any(), SparkConnectRole::Server); + let service_name = format!( + "{cluster}-{role}-headless", + cluster = scs.name_any(), + role = SparkConnectRole::Server + ); let selector = Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string()) @@ -507,6 +521,7 @@ pub(crate) fn server_properties( let config_overrides = scs .spec .server + .config .as_ref() .and_then(|s| s.config_overrides.get(SPARK_DEFAULTS_FILE_NAME)); @@ -581,6 +596,7 @@ fn server_jvm_args( &jvm_args, scs.spec .server + .config .as_ref() .map(|s| &s.product_specific_common_config), ) @@ -602,23 +618,28 @@ fn probe() -> Probe { } } -fn dummy_role_group_ref( +fn default_role_group_ref( scs: &v1alpha1::SparkConnectServer, ) -> RoleGroupRef { RoleGroupRef { cluster: ObjectRef::from_obj(scs), role: SparkConnectRole::Server.to_string(), - role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(), + role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME.to_string(), } } pub(crate) fn build_listener( scs: &v1alpha1::SparkConnectServer, - config: &v1alpha1::ServerConfig, + role_config: &v1alpha1::SparkConnectServerRoleConfig, resolved_product_image: &ResolvedProductImage, ) -> Result { - let listener_name = dummy_role_group_ref(scs).object_name(); - let listener_class = config.listener_class.clone(); + let listener_name = format!( + "{cluster}-{role}", + cluster = scs.name_any(), + role = SparkConnectRole::Server + ); + + let listener_class = role_config.listener_class.clone(); let role = SparkConnectRole::Server.to_string(); let recommended_object_labels = common::labels(scs, &resolved_product_image.app_version_label, &role); diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 87cdbcf1..881eeade 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -32,7 +32,10 @@ use stackable_operator::{ use strum::{Display, EnumIter}; use crate::{ - crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}, + crd::{ + affinity::history_affinity, constants::*, history::v1alpha1::SparkHistoryServerRoleConfig, + logdir::ResolvedLogDir, + }, history::config::jvm::construct_history_jvm_args, }; @@ -62,7 +65,6 @@ pub enum Error { #[versioned(version(name = "v1alpha1"))] pub mod versioned { - /// A Spark cluster history server component. This resource is managed by the Stackable operator /// for Apache Spark. Find more information on how to use it in the /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server). @@ -81,13 +83,6 @@ pub mod versioned { pub struct SparkHistoryServerSpec { pub image: ProductImage, - /// Global Spark history server configuration that applies to all roles. - /// - /// This was previously used to hold the listener configuration, which has since moved - /// to the role configuration. - #[serde(default)] - pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig, - /// Name of the Vector aggregator discovery ConfigMap. /// It must contain the key `ADDRESS` with the address of the Vector aggregator. #[serde(skip_serializing_if = "Option::is_none")] @@ -101,17 +96,28 @@ pub mod versioned { pub spark_conf: BTreeMap, /// A history server node role definition. - pub nodes: Role, + pub nodes: Role, } - #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] + // TODO: move generic version to op-rs? + #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] - pub struct SparkHistoryServerClusterConfig {} + pub struct SparkHistoryServerRoleConfig { + #[serde(flatten)] + pub common: GenericRoleConfig, + + /// This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) + /// is used to expose the history server. + #[serde(default = "default_listener_class")] + pub listener_class: String, + } } impl v1alpha1::SparkHistoryServer { /// Returns a reference to the role. Raises an error if the role is not defined. - pub fn role(&self) -> &Role { + pub fn role( + &self, + ) -> &Role { &self.spec.nodes } @@ -130,6 +136,11 @@ impl v1alpha1::SparkHistoryServer { .cloned() } + /// Return the listener class of the role config. + pub fn node_listener_class(&self) -> &str { + self.spec.nodes.role_config.listener_class.as_str() + } + pub fn merged_config( &self, rolegroup_ref: &RoleGroupRef, @@ -188,7 +199,7 @@ impl v1alpha1::SparkHistoryServer { String, ( Vec, - Role, + Role, ), > = vec![( HISTORY_ROLE_NAME.to_string(), @@ -340,9 +351,6 @@ pub struct HistoryConfig { /// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. #[fragment_attrs(serde(default))] pub requested_secret_lifetime: Option, - - #[serde(default)] - pub listener_class: String, } impl HistoryConfig { @@ -366,7 +374,6 @@ impl HistoryConfig { logging: product_logging::spec::default_logging(), affinity: history_affinity(cluster_name), requested_secret_lifetime: Some(Self::DEFAULT_HISTORY_SECRET_LIFETIME), - listener_class: Some(default_listener_class()), } } } @@ -403,6 +410,15 @@ impl Configuration for HistoryConfigFragment { } } +impl Default for v1alpha1::SparkHistoryServerRoleConfig { + fn default() -> Self { + v1alpha1::SparkHistoryServerRoleConfig { + listener_class: default_listener_class(), + common: Default::default(), + } + } +} + fn default_listener_class() -> String { "cluster-internal".to_owned() } diff --git a/rust/operator-binary/src/history/config/jvm.rs b/rust/operator-binary/src/history/config/jvm.rs index 4022a3ad..8ffb0037 100644 --- a/rust/operator-binary/src/history/config/jvm.rs +++ b/rust/operator-binary/src/history/config/jvm.rs @@ -1,7 +1,5 @@ use snafu::{ResultExt, Snafu}; -use stackable_operator::role_utils::{ - self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role, -}; +use stackable_operator::role_utils::{self, JavaCommonConfig, JvmArgumentOverrides, Role}; use crate::crd::{ constants::{ @@ -9,7 +7,7 @@ use crate::crd::{ STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG, }, - history::HistoryConfigFragment, + history::{HistoryConfigFragment, v1alpha1::SparkHistoryServerRoleConfig}, logdir::ResolvedLogDir, }; @@ -21,7 +19,7 @@ pub enum Error { /// JVM arguments that go into `SPARK_HISTORY_OPTS` pub fn construct_history_jvm_args( - role: &Role, + role: &Role, role_group: &str, logdir: &ResolvedLogDir, ) -> Result { diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 55b17c5d..ce2f1a98 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -328,22 +328,23 @@ pub async fn reconcile( .add(client, sts) .await .context(ApplyStatefulSetSnafu)?; - - let rg_group_listener = build_group_listener( - shs, - &resolved_product_image, - &rgr, - merged_config.listener_class.to_string(), - )?; - cluster_resources - .add(client, rg_group_listener) - .await - .context(ApplyGroupListenerSnafu)?; } + let rg_group_listener = build_group_listener( + shs, + &resolved_product_image, + role_name, + shs.node_listener_class().to_string(), + )?; + + cluster_resources + .add(client, rg_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; + let role_config = &shs.spec.nodes.role_config; add_pdbs( - &role_config.pod_disruption_budget, + &role_config.common.pod_disruption_budget, shs, client, &mut cluster_resources, @@ -364,15 +365,12 @@ pub async fn reconcile( fn build_group_listener( shs: &v1alpha1::SparkHistoryServer, resolved_product_image: &ResolvedProductImage, - rolegroup: &RoleGroupRef, + role: &str, listener_class: String, ) -> Result { - let listener_name = rolegroup.object_name(); - let recommended_object_labels = labels( - shs, - &resolved_product_image.app_version_label, - &rolegroup.role_group, - ); + let listener_name = group_listener_name(shs, role); + + let recommended_object_labels = labels(shs, &resolved_product_image.app_version_label, "none"); let listener_ports = [listener::v1alpha1::ListenerPort { name: "http".to_string(), @@ -390,6 +388,10 @@ fn build_group_listener( .context(BuildListenerSnafu) } +fn group_listener_name(shs: &v1alpha1::SparkHistoryServer, role: &str) -> String { + format!("{cluster}-{role}", cluster = shs.name_any()) +} + pub fn error_policy( _obj: Arc>, error: &Error, @@ -595,7 +597,7 @@ fn build_stateful_set( // cluster-internal) as the address should still be consistent. let volume_claim_templates = Some(vec![ ListenerOperatorVolumeSourceBuilder::new( - &ListenerReference::ListenerName(rolegroupref.object_name()), + &ListenerReference::ListenerName(group_listener_name(shs, &rolegroupref.role)), &recommended_labels, ) .context(BuildListenerVolumeSnafu)? diff --git a/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml b/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml index f3432ccf..cb6003bd 100644 --- a/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml +++ b/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml @@ -16,5 +16,5 @@ spec: "bash", "-x", "-c", - "test 1 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + "test 1 == $(curl http://spark-history-node:18080/api/v1/applications | jq length)", ] diff --git a/tests/templates/kuttl/overrides/20-test-logs.yaml b/tests/templates/kuttl/overrides/20-test-logs.yaml index 60087002..197dd5d6 100644 --- a/tests/templates/kuttl/overrides/20-test-logs.yaml +++ b/tests/templates/kuttl/overrides/20-test-logs.yaml @@ -16,7 +16,7 @@ spec: "bash", "-x", "-c", - "test 1 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + "test 1 == $(curl http://spark-history-node:18080/api/v1/applications | jq length)", ] resources: limits: diff --git a/tests/templates/kuttl/smoke/60-test-logs.yaml b/tests/templates/kuttl/smoke/60-test-logs.yaml index 60087002..197dd5d6 100644 --- a/tests/templates/kuttl/smoke/60-test-logs.yaml +++ b/tests/templates/kuttl/smoke/60-test-logs.yaml @@ -16,7 +16,7 @@ spec: "bash", "-x", "-c", - "test 1 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + "test 1 == $(curl http://spark-history-node:18080/api/v1/applications | jq length)", ] resources: limits: diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 5387d982..460a17f9 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -10,6 +10,20 @@ metadata: status: readyReplicas: 1 --- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server +spec: + type: NodePort +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server-headless +spec: + type: ClusterIP +--- apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 diff --git a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 index 4736d9be..da974901 100644 --- a/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2 @@ -62,8 +62,9 @@ spec: jvmArgumentOverrides: add: - -Dmy.custom.jvm.arg=customValue - config: + roleConfig: listenerClass: external-unstable + config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: diff --git a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 index 676b3a56..e6950b5a 100644 --- a/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 +++ b/tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2 @@ -67,7 +67,7 @@ spec: [ "/usr/bin/python", "/app/example.py", - "sc://spark-connect-server-default", + "sc://spark-connect-server", ] resources: limits: diff --git a/tests/templates/kuttl/spark-history-server/06-assert.yaml b/tests/templates/kuttl/spark-history-server/06-assert.yaml index a8bdbdc8..1381497c 100644 --- a/tests/templates/kuttl/spark-history-server/06-assert.yaml +++ b/tests/templates/kuttl/spark-history-server/06-assert.yaml @@ -10,6 +10,13 @@ metadata: status: readyReplicas: 1 --- +apiVersion: v1 +kind: Service +metadata: + name: spark-history-node +spec: + type: NodePort +--- apiVersion: policy/v1 kind: PodDisruptionBudget metadata: diff --git a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 index a440a164..0ff35de9 100644 --- a/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/06-deploy-history-server.yaml.j2 @@ -44,8 +44,9 @@ spec: # For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options #sparkConf: nodes: - config: + roleConfig: listenerClass: external-unstable + config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} containers: diff --git a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml index 68c36402..074fefac 100644 --- a/tests/templates/kuttl/spark-history-server/20-test-logs.yaml +++ b/tests/templates/kuttl/spark-history-server/20-test-logs.yaml @@ -16,5 +16,5 @@ spec: "bash", "-x", "-c", - "test 2 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + "test 2 == $(curl http://spark-history-node:18080/api/v1/applications | jq length)", ]