Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ All notable changes to this project will be documented in this file.

- Use `json` file extension for log files ([#553]).
- The Spark connect controller now watches StatefulSets instead of Deployments (again) ([#573]).
- BREAKING: Move `listenerClass` to `roleConfig` for Spark History Server and Spark Connect. Service names changed. ([#588]).

### Removed

Expand All @@ -62,6 +63,7 @@ All notable changes to this project will be documented in this file.
[#575]: https://github.com/stackabletech/spark-k8s-operator/pull/575
[#584]: https://github.com/stackabletech/spark-k8s-operator/pull/584
[#585]: https://github.com/stackabletech/spark-k8s-operator/pull/585
[#588]: https://github.com/stackabletech/spark-k8s-operator/pull/588

## [25.3.0] - 2025-03-21

Expand Down
43 changes: 18 additions & 25 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1117,13 +1117,6 @@ spec:
spec:
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
properties:
clusterConfig:
default: {}
description: |-
Global Spark history server configuration that applies to all roles.

This was previously used to hold the listener configuration, which has since moved to the role configuration.
type: object
image:
anyOf:
- required:
Expand Down Expand Up @@ -1379,9 +1372,6 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1556,11 +1546,16 @@ spec:
x-kubernetes-preserve-unknown-fields: true
roleConfig:
default:
listenerClass: cluster-internal
podDisruptionBudget:
enabled: true
maxUnavailable: null
description: This is a product-agnostic RoleConfig, which is sufficient for most of the products.
properties:
listenerClass:
default: cluster-internal
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the history server.
type: string
podDisruptionBudget:
default:
enabled: true
Expand Down Expand Up @@ -1628,9 +1623,6 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1868,13 +1860,6 @@ spec:
items:
type: string
type: array
clusterConfig:
default: {}
description: |-
Global Spark Connect server configuration that applies to all roles.

This was previously used to hold the listener configuration, which has since moved to the server configuration.
type: object
clusterOperation:
default:
reconciliationPaused: false
Expand Down Expand Up @@ -2154,8 +2139,10 @@ spec:
type: string
type: object
server:
default:
roleConfig:
listenerClass: cluster-internal
description: A Spark Connect server definition.
nullable: true
properties:
cliOverrides:
additionalProperties:
Expand All @@ -2165,10 +2152,6 @@ spec:
config:
default: {}
properties:
listenerClass:
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -2341,6 +2324,16 @@ spec:
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
type: object
x-kubernetes-preserve-unknown-fields: true
roleConfig:
default:
listenerClass: cluster-internal
description: Global role config settings for the Spark Connect Server.
properties:
listenerClass:
default: cluster-internal
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark Connect services.
type: string
type: object
type: object
vectorAggregatorConfigMapName:
description: Name of the Vector aggregator discovery ConfigMap. It must contain the key `ADDRESS` with the address of the Vector aggregator.
Expand Down
29 changes: 22 additions & 7 deletions docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
= Service exposition with listener classes
:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable.

== History services
== Spark History services

The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod.
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`:
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each Spark History Server pod.
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.roleConfig.listenerClass`:

[source,yaml]
----
Expand All @@ -14,13 +14,28 @@ metadata:
name: spark-history
spec:
nodes:
config:
roleConfig:
listenerClass: external-unstable # <1>
----
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).

For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group.
For the example above, the listener operator creates a service named `spark-history-node` where `spark-history` is the name of the SparkHistoryServer and `node` is the service role (the only service role available for history servers).

== Connect services
== Spark Connect services

Connect pods can be exposed using listener classes in exactly tha same fashion as history servers.
Connect pods can be exposed using listener classes in exactly tha same fashion as History Servers (with the exception for the role):

[source,yaml]
----
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkConnectServer
metadata:
name: spark-connect
spec:
servers:
roleConfig:
listenerClass: external-unstable # <1>
----
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).

For the example above, the listener operator creates a service named `spark-connect-server` where `spark-connect` is the name of the SparkConnectServer and `server` is the service role.
4 changes: 2 additions & 2 deletions rust/operator-binary/src/connect/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::crd::CONNECT_EXECUTOR_ROLE_NAME;
use crate::{
connect::crd::{
CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, CONNECT_SERVER_ROLE_NAME,
DUMMY_SPARK_CONNECT_GROUP_NAME,
DEFAULT_SPARK_CONNECT_GROUP_NAME,
},
crd::constants::OPERATOR_NAME,
};
Expand Down Expand Up @@ -53,7 +53,7 @@ pub(crate) fn labels<'a, T>(
operator_name: OPERATOR_NAME,
controller_name: CONNECT_CONTROLLER_NAME,
role,
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME,
role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME,
}
}

Expand Down
26 changes: 14 additions & 12 deletions rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub async fn reconcile(
.context(InvalidSparkConnectServerSnafu)?;

let server_config = scs.server_config().context(ServerConfigSnafu)?;
let server_role_config = &scs.spec.server.role_config;
let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?;

let client = &ctx.client;
Expand Down Expand Up @@ -204,7 +205,7 @@ pub async fn reconcile(
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
.context(BuildServiceSnafu)?;

cluster_resources
let applied_internal_service = cluster_resources
.add(client, service.clone())
.await
.context(ApplyServiceSnafu)?;
Expand All @@ -216,7 +217,7 @@ pub async fn reconcile(
server::server_properties(
scs,
&server_config,
&service,
&applied_internal_service,
&service_account,
&resolved_product_image,
)
Expand Down Expand Up @@ -271,6 +272,16 @@ pub async fn reconcile(
name: scs.name_unchecked(),
})?;

// ========================================
// Server listener
let listener = server::build_listener(scs, server_role_config, &resolved_product_image)
.context(BuildListenerSnafu)?;

let applied_listener = cluster_resources
.add(client, listener)
.await
.context(ApplyListenerSnafu)?;

// ========================================
// Server stateful set
let args = server::command_args(&scs.spec.args);
Expand All @@ -280,20 +291,11 @@ pub async fn reconcile(
&resolved_product_image,
&service_account,
&server_config_map,
&applied_listener.name_any(),
args,
)
.context(BuildServerStatefulSetSnafu)?;

// ========================================
// Server listener
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
.context(BuildListenerSnafu)?;

cluster_resources
.add(client, listener)
.await
.context(ApplyListenerSnafu)?;

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

ss_cond_builder.add(
Expand Down
53 changes: 35 additions & 18 deletions rust/operator-binary/src/connect/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub const CONNECT_EXECUTOR_ROLE_NAME: &str = "executor";
pub const CONNECT_GRPC_PORT: i32 = 15002;
pub const CONNECT_UI_PORT: i32 = 4040;

pub const DUMMY_SPARK_CONNECT_GROUP_NAME: &str = "default";
pub const DEFAULT_SPARK_CONNECT_GROUP_NAME: &str = "default";

pub const CONNECT_APP_NAME: &str = "spark-connect";

Expand Down Expand Up @@ -79,13 +79,6 @@ pub mod versioned {
pub struct SparkConnectServerSpec {
pub image: ProductImage,

/// Global Spark Connect server configuration that applies to all roles.
///
/// This was previously used to hold the listener configuration, which has since moved
/// to the server configuration.
#[serde(default)]
pub cluster_config: v1alpha1::SparkConnectServerClusterConfig,

// no doc string - See ClusterOperation struct
#[serde(default)]
pub cluster_operation: ClusterOperation,
Expand All @@ -100,17 +93,34 @@ pub mod versioned {
pub vector_aggregator_config_map_name: Option<String>,

/// A Spark Connect server definition.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub server: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
#[serde(default)]
pub server: SparkConnectServerConfigWrapper,

/// Spark Connect executor properties.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub executor: Option<CommonConfiguration<ExecutorConfigFragment, JavaCommonConfig>>,
}

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
/// This struct is a wrapper for the `ServerConfig` in order to keep the `spec.server.roleConfig` setting consistent.
/// It is required since Spark Connect does not utilize the Stackable `Role` and therefore does not offer a `roleConfig`.
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConnectServerConfigWrapper {
#[serde(flatten)]
pub config: Option<CommonConfiguration<ServerConfigFragment, JavaCommonConfig>>,
#[serde(default)]
pub role_config: SparkConnectServerRoleConfig,
}

/// Global role config settings for the Spark Connect Server.
#[derive(Clone, Debug, JsonSchema, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConnectServerClusterConfig {}
struct SparkConnectServerRoleConfig {
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html)
/// is used to expose the Spark Connect services.
#[serde(default = "default_listener_class")]
pub listener_class: String,
}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
#[fragment_attrs(
Expand All @@ -137,10 +147,6 @@ pub mod versioned {
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
#[fragment_attrs(serde(default))]
pub requested_secret_lifetime: Option<Duration>,

/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
#[serde(default)]
pub listener_class: String,
}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
Expand Down Expand Up @@ -229,7 +235,6 @@ impl v1alpha1::ServerConfig {
},
logging: product_logging::spec::default_logging(),
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
listener_class: Some("cluster-internal".into()),
}
}

Expand Down Expand Up @@ -259,7 +264,7 @@ impl v1alpha1::SparkConnectServer {
pub fn server_config(&self) -> Result<v1alpha1::ServerConfig, Error> {
let defaults = v1alpha1::ServerConfig::default_config();
fragment::validate(
match self.spec.server.as_ref().map(|cc| cc.config.clone()) {
match self.spec.server.config.as_ref().map(|cc| cc.config.clone()) {
Some(fragment) => {
let mut fc = fragment.clone();
fc.merge(&defaults);
Expand Down Expand Up @@ -287,6 +292,18 @@ impl v1alpha1::SparkConnectServer {
}
}

impl Default for v1alpha1::SparkConnectServerRoleConfig {
fn default() -> Self {
v1alpha1::SparkConnectServerRoleConfig {
listener_class: default_listener_class(),
}
}
}

pub fn default_listener_class() -> String {
"cluster-internal".to_string()
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConnectServerStatus {
Expand Down
4 changes: 2 additions & 2 deletions rust/operator-binary/src/connect/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use stackable_operator::{

use super::{
common::{SparkConnectRole, object_name},
crd::{DUMMY_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer},
crd::{DEFAULT_SPARK_CONNECT_GROUP_NAME, SparkConnectContainer},
};
use crate::{
connect::{common, crd::v1alpha1},
Expand Down Expand Up @@ -354,7 +354,7 @@ pub(crate) fn executor_config_map(
let role_group_ref = RoleGroupRef {
cluster: ObjectRef::from_obj(scs),
role: SparkConnectRole::Executor.to_string(),
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(),
role_group: DEFAULT_SPARK_CONNECT_GROUP_NAME.to_string(),
};
product_logging::extend_config_map(
&role_group_ref,
Expand Down
Loading