diff --git a/CHANGELOG.md b/CHANGELOG.md index 0512060a..b1749441 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. config property `requestedSecretLifetime`. This helps reducing frequent Pod restarts ([#501]). - Run a `containerdebug` process in the background of each Spark container to collect debugging information ([#508]). - Aggregate emitted Kubernetes events on the CustomResources ([#515]). +- Support configuring JVM arguments ([#532]). ### Changed @@ -19,6 +20,7 @@ All notable changes to this project will be documented in this file. [#508]: https://github.com/stackabletech/spark-k8s-operator/pull/508 [#514]: https://github.com/stackabletech/spark-k8s-operator/pull/514 [#515]: https://github.com/stackabletech/spark-k8s-operator/pull/515 +[#532]: https://github.com/stackabletech/spark-k8s-operator/pull/532 ## [24.11.1] - 2025-01-10 diff --git a/Cargo.lock b/Cargo.lock index 4e3afcbb..dc7bddf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -2126,15 +2126,14 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" [[package]] name = "ring" -version = "0.17.8" +version = "0.17.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +checksum = "70ac5d832aa16abd7d1def883a8545280c20a60f523a370aa3a9617c2b8550ee" dependencies = [ "cc", "cfg-if", "getrandom", "libc", - "spin", "untrusted", "windows-sys 0.52.0", ] @@ -2532,12 +2531,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index eac4ad07..f6672b00 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -256,6 +256,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. @@ -530,6 +556,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. @@ -620,6 +672,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. @@ -1416,6 +1494,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. @@ -1636,6 +1740,32 @@ spec: default: {} description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object podOverrides: default: {} description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. diff --git a/docs/modules/spark-k8s/pages/usage-guide/configuration-environment-overrides.adoc b/docs/modules/spark-k8s/pages/usage-guide/overrides.adoc similarity index 88% rename from docs/modules/spark-k8s/pages/usage-guide/configuration-environment-overrides.adoc rename to docs/modules/spark-k8s/pages/usage-guide/overrides.adoc index a0ee8eb6..e4f5077a 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/configuration-environment-overrides.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/overrides.adoc @@ -148,3 +148,18 @@ spec: The Spark operator also supports Pod overrides, allowing you to override any property that you can set on a Kubernetes Pod. Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides documentation] to learn more about this feature. + +== JVM argument overrides + +Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores. +Using JVM argument overrides you can configure the JVM arguments xref:concepts:overrides.adoc#jvm-argument-overrides[according to the concepts page]. + +=== Spark application + +WARNING: Please note that you can currently not configure the JVM arguments of the spark-submit call, only on the drivers and executors. + +The JVM arguments will be passed via `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions`. + +=== History server + +JVM arguments can be configured normally and will be passed via the env variable `SPARK_HISTORY_OPTS`. diff --git a/docs/modules/spark-k8s/partials/nav.adoc b/docs/modules/spark-k8s/partials/nav.adoc index 6dcceef5..db281b43 100644 --- a/docs/modules/spark-k8s/partials/nav.adoc +++ b/docs/modules/spark-k8s/partials/nav.adoc @@ -10,7 +10,7 @@ ** xref:spark-k8s:usage-guide/logging.adoc[] ** xref:spark-k8s:usage-guide/history-server.adoc[] ** xref:spark-k8s:usage-guide/examples.adoc[] -** xref:spark-k8s:usage-guide/configuration-environment-overrides.adoc[] +** xref:spark-k8s:usage-guide/overrides.adoc[] ** xref:spark-k8s:usage-guide/operations/index.adoc[] *** xref:spark-k8s:usage-guide/operations/applications.adoc[] *** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[] diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs new file mode 100644 index 00000000..2885efe2 --- /dev/null +++ b/rust/operator-binary/src/config/jvm.rs @@ -0,0 +1,151 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::s3::S3ConnectionSpec, + role_utils::{self, JvmArgumentOverrides}, +}; + +use crate::crd::{ + constants::{ + JVM_SECURITY_PROPERTIES_FILE, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, + VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + logdir::ResolvedLogDir, + tlscerts::tls_secret_names, + v1alpha1::SparkApplication, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to merge jvm argument overrides"))] + MergeJvmArgumentOverrides { source: role_utils::Error }, +} + +/// JVM arguments that go into +/// 1. `spark.driver.extraJavaOptions` +/// 2. `spark.executor.extraJavaOptions` +pub fn construct_extra_java_options( + spark_application: &SparkApplication, + s3_conn: &Option, + log_dir: &Option, +) -> Result<(String, String), Error> { + // Note (@sbernauer): As of 2025-03-04, we did not set any heap related JVM arguments, so I + // kept the implementation as is. We can always re-visit this as needed. + + let mut jvm_args = vec![format!( + "-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" + )]; + + if tls_secret_names(s3_conn, log_dir).is_some() { + jvm_args.extend([ + format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), + format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), + "-Djavax.net.ssl.trustStoreType=pkcs12".to_string(), + ]); + } + + let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args); + let from_driver = match &spark_application.spec.driver { + Some(driver) => &driver.product_specific_common_config.jvm_argument_overrides, + None => &JvmArgumentOverrides::default(), + }; + let from_executor = match &spark_application.spec.executor { + Some(executor) => { + &executor + .config + .product_specific_common_config + .jvm_argument_overrides + } + None => &JvmArgumentOverrides::default(), + }; + + // Please note that the merge order is different than we normally do! + // This is not trivial, as the merge operation is not purely additive (as it is with e.g. `PodTemplateSpec). + let mut from_driver = from_driver.clone(); + let mut from_executor = from_executor.clone(); + from_driver + .try_merge(&operator_generated) + .context(MergeJvmArgumentOverridesSnafu)?; + from_executor + .try_merge(&operator_generated) + .context(MergeJvmArgumentOverridesSnafu)?; + + Ok(( + from_driver.effective_jvm_config_after_merging().join(" "), + from_executor.effective_jvm_config_after_merging().join(" "), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_construct_jvm_arguments_defaults() { + let input = r#" + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: spark-example + spec: + mode: cluster + mainApplicationFile: test.py + sparkImage: + productVersion: 1.2.3 + "#; + + let deserializer = serde_yaml::Deserializer::from_str(input); + let spark_app: SparkApplication = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); + let (driver_extra_java_options, executor_extra_java_options) = + construct_extra_java_options(&spark_app, &None, &None).unwrap(); + + assert_eq!( + driver_extra_java_options, + "-Djava.security.properties=/stackable/log_config/security.properties" + ); + assert_eq!( + executor_extra_java_options, + "-Djava.security.properties=/stackable/log_config/security.properties" + ); + } + + #[test] + fn test_construct_jvm_argument_overrides() { + let input = r#" + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: spark-example + spec: + mode: cluster + mainApplicationFile: test.py + sparkImage: + productVersion: 1.2.3 + driver: + jvmArgumentOverrides: + add: + - -Dhttps.proxyHost=from-driver + executor: + jvmArgumentOverrides: + add: + - -Dhttps.proxyHost=from-executor + removeRegex: + - -Djava.security.properties=.* + "#; + + let deserializer = serde_yaml::Deserializer::from_str(input); + let spark_app: SparkApplication = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); + let (driver_extra_java_options, executor_extra_java_options) = + construct_extra_java_options(&spark_app, &None, &None).unwrap(); + + assert_eq!( + driver_extra_java_options, + "-Djava.security.properties=/stackable/log_config/security.properties -Dhttps.proxyHost=from-driver" + ); + assert_eq!( + executor_extra_java_options, + "-Dhttps.proxyHost=from-executor" + ); + } +} diff --git a/rust/operator-binary/src/config/mod.rs b/rust/operator-binary/src/config/mod.rs new file mode 100644 index 00000000..271c6d99 --- /dev/null +++ b/rust/operator-binary/src/config/mod.rs @@ -0,0 +1 @@ +pub mod jvm; diff --git a/rust/operator-binary/src/crd/history.rs b/rust/operator-binary/src/crd/history.rs index 0bb1cb6a..a2fd0dde 100644 --- a/rust/operator-binary/src/crd/history.rs +++ b/rust/operator-binary/src/crd/history.rs @@ -24,14 +24,17 @@ use stackable_operator::{ ValidatedRoleConfigByPropertyKind, }, product_logging::{self, spec::Logging}, - role_utils::{GenericProductSpecificCommonConfig, Role, RoleGroup, RoleGroupRef}, + role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef}, schemars::{self, JsonSchema}, time::Duration, }; use stackable_versioned::versioned; use strum::{Display, EnumIter}; -use crate::crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}; +use crate::{ + crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir}, + history::config::jvm::construct_history_jvm_args, +}; #[derive(Snafu, Debug)] pub enum Error { @@ -39,14 +42,22 @@ pub enum Error { ProductConfigTransform { source: stackable_operator::product_config_utils::Error, }, + #[snafu(display("invalid product config"))] InvalidProductConfig { source: stackable_operator::product_config_utils::Error, }, + #[snafu(display("fragment validation failure"))] FragmentValidationFailure { source: ValidationError }, + #[snafu(display("the role group {role_group} is not defined"))] CannotRetrieveRoleGroup { role_group: String }, + + #[snafu(display("failed to construct JVM arguments"))] + ConstructJvmArguments { + source: crate::history::config::jvm::Error, + }, } #[versioned(version(name = "v1alpha1"))] @@ -86,7 +97,7 @@ pub mod versioned { pub spark_conf: BTreeMap, /// A history server node role definition. - pub nodes: Role, + pub nodes: Role, } #[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)] @@ -133,7 +144,7 @@ impl CurrentlySupportedListenerClasses { impl v1alpha1::SparkHistoryServer { /// Returns a reference to the role. Raises an error if the role is not defined. - pub fn role(&self) -> &Role { + pub fn role(&self) -> &Role { &self.spec.nodes } @@ -141,7 +152,7 @@ impl v1alpha1::SparkHistoryServer { pub fn rolegroup( &self, rolegroup_ref: &RoleGroupRef, - ) -> Result, Error> { + ) -> Result, Error> { self.spec .nodes .role_groups @@ -205,9 +216,13 @@ impl v1alpha1::SparkHistoryServer { resolved_product_image: &ResolvedProductImage, product_config: &ProductConfigManager, ) -> Result { + #[allow(clippy::type_complexity)] let roles_to_validate: HashMap< String, - (Vec, Role), + ( + Vec, + Role, + ), > = vec![( HISTORY_ROLE_NAME.to_string(), ( @@ -236,102 +251,42 @@ impl v1alpha1::SparkHistoryServer { pub fn merged_env( &self, + role_group: &str, logdir: &ResolvedLogDir, role_group_env_overrides: HashMap, - ) -> Vec { - // Maps env var name to env var object. This allows env_overrides to work - // as expected (i.e. users can override the env var value). - let mut vars: BTreeMap = BTreeMap::new(); - let role_env_overrides = &self.role().config.env_overrides; - - // Needed by the `containerdebug` running in the background of the history container - // to log it's tracing information to. - vars.insert( - "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), - EnvVar { - name: "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), - value: Some(format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug")), - value_from: None, - }, - ); - // This env var prevents the history server from detaching itself from the - // start script because this leads to the Pod terminating immediately. - vars.insert( - "SPARK_NO_DAEMONIZE".to_string(), - EnvVar { - name: "SPARK_NO_DAEMONIZE".to_string(), - value: Some("true".into()), - value_from: None, - }, - ); - vars.insert( - "SPARK_DAEMON_CLASSPATH".to_string(), - EnvVar { - name: "SPARK_DAEMON_CLASSPATH".to_string(), - value: Some("/stackable/spark/extra-jars/*".into()), - value_from: None, - }, - ); - - let mut history_opts = vec![ - format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"), - format!( - "-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" - ), - format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml") - ]; - - // if TLS is enabled build truststore - if logdir.tls_enabled() { - history_opts.extend(vec![ - format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), - format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), - format!("-Djavax.net.ssl.trustStoreType=pkcs12"), - ]); - } - - vars.insert( - "SPARK_HISTORY_OPTS".to_string(), - EnvVar { - name: "SPARK_HISTORY_OPTS".to_string(), - value: Some(history_opts.join(" ")), - value_from: None, - }, - ); - - // apply the role overrides - let mut role_envs = role_env_overrides.iter().map(|(env_name, env_value)| { + ) -> Result, Error> { + let role = self.role(); + let history_jvm_args = construct_history_jvm_args(role, role_group, logdir) + .context(ConstructJvmArgumentsSnafu)?; + let mut envs = BTreeMap::from([ + // Needed by the `containerdebug` running in the background of the history container + // to log it's tracing information to. ( - env_name.clone(), - EnvVar { - name: env_name.clone(), - value: Some(env_value.to_owned()), - value_from: None, - }, - ) - }); - - vars.extend(&mut role_envs); - - // apply the role-group overrides - let mut role_group_envs = - role_group_env_overrides - .into_iter() - .map(|(env_name, env_value)| { - ( - env_name.clone(), - EnvVar { - name: env_name.clone(), - value: Some(env_value), - value_from: None, - }, - ) - }); - - vars.extend(&mut role_group_envs); - - // convert to Vec - vars.into_values().collect() + "CONTAINERDEBUG_LOG_DIRECTORY".to_string(), + format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug"), + ), + // This env var prevents the history server from detaching itself from the + // start script because this leads to the Pod terminating immediately. + ("SPARK_NO_DAEMONIZE".to_string(), "true".to_string()), + ( + "SPARK_DAEMON_CLASSPATH".to_string(), + "/stackable/spark/extra-jars/*".to_string(), + ), + // JVM arguments for the history server + ("SPARK_HISTORY_OPTS".to_string(), history_jvm_args), + ]); + + envs.extend(role.config.env_overrides.clone()); + envs.extend(role_group_env_overrides); + + Ok(envs + .into_iter() + .map(|(name, value)| EnvVar { + name: name.to_owned(), + value: Some(value.to_owned()), + value_from: None, + }) + .collect()) } } @@ -534,18 +489,21 @@ mod test { prefix: "prefix".to_string(), }); - let merged_env = history.merged_env( - &log_dir, - history - .spec - .nodes - .role_groups - .get("default") - .unwrap() - .config - .env_overrides - .clone(), - ); + let merged_env = history + .merged_env( + "default", + &log_dir, + history + .spec + .nodes + .role_groups + .get("default") + .unwrap() + .config + .env_overrides + .clone(), + ) + .unwrap(); let env_map: BTreeMap<&str, Option> = merged_env .iter() diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index ec7a2eb7..1ba79aa5 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -37,18 +37,19 @@ use stackable_operator::{ ValidatedRoleConfigByPropertyKind, }, product_logging, - role_utils::{ - CommonConfiguration, GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, - }, + role_utils::{CommonConfiguration, GenericRoleConfig, JavaCommonConfig, Role, RoleGroup}, schemars::{self, JsonSchema}, time::Duration, utils::crds::raw_object_list_schema, }; use stackable_versioned::versioned; -use crate::crd::roles::{ - RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, SubmitConfig, - SubmitConfigFragment, VolumeMounts, +use crate::{ + config::jvm::construct_extra_java_options, + crd::roles::{ + RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, + SubmitConfig, SubmitConfigFragment, VolumeMounts, + }, }; pub mod affinity; @@ -112,6 +113,9 @@ pub enum Error { #[snafu(display("failed to configure log directory"))] ConfigureLogDir { source: logdir::Error }, + + #[snafu(display("failed to construct JVM arguments"))] + ConstructJvmArguments { source: crate::config::jvm::Error }, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] @@ -173,20 +177,23 @@ pub struct SparkApplicationSpec { /// such as templates, and passes it on to Spark. /// The reason this property uses its own type (SubmitConfigFragment) is because logging is not /// supported for spark-submit processes. + // + // IMPORTANT: Please note that the jvmArgumentOverrides have no effect here! + // However, due to product-config things I wasn't able to remove them. #[serde(default, skip_serializing_if = "Option::is_none")] - pub job: Option>, + pub job: Option>, /// The driver role specifies the configuration that, together with the driver pod template, is used by /// Spark to create driver pods. #[serde(default, skip_serializing_if = "Option::is_none")] - pub driver: Option>, + pub driver: Option>, /// The executor role specifies the configuration that, together with the driver pod template, is used by /// Spark to create the executor pods. /// This is RoleGroup instead of plain CommonConfiguration because it needs to allows for the number of replicas. /// to be specified. #[serde(default, skip_serializing_if = "Option::is_none")] - pub executor: Option>, + pub executor: Option>, /// A map of key/value strings that will be passed directly to spark-submit. #[serde(default)] @@ -511,7 +518,7 @@ impl v1alpha1::SparkApplication { &'a self, app_version: &'a str, role: &'a str, - ) -> ObjectLabels { + ) -> ObjectLabels<'a, v1alpha1::SparkApplication> { ObjectLabels { owner: self, app_name: APP_NAME, @@ -582,23 +589,12 @@ impl v1alpha1::SparkApplication { } } - // Extra JVM opts: - // - java security properties - // - s3 with TLS - let mut extra_java_opts = vec![format!( - "-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" - )]; - if tlscerts::tls_secret_names(s3conn, log_dir).is_some() { - extra_java_opts.extend(vec![ - format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), - format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), - format!("-Djavax.net.ssl.trustStoreType=pkcs12"), - ]); - } - let str_extra_java_opts = extra_java_opts.join(" "); + let (driver_extra_java_options, executor_extra_java_options) = + construct_extra_java_options(self, s3conn, log_dir) + .context(ConstructJvmArgumentsSnafu)?; submit_cmd.extend(vec![ - format!("--conf spark.driver.extraJavaOptions=\"{str_extra_java_opts}\""), - format!("--conf spark.executor.extraJavaOptions=\"{str_extra_java_opts}\""), + format!("--conf spark.driver.extraJavaOptions=\"{driver_extra_java_options}\""), + format!("--conf spark.executor.extraJavaOptions=\"{executor_extra_java_options}\""), ]); // repositories and packages arguments @@ -829,7 +825,7 @@ impl v1alpha1::SparkApplication { } }; - let executor_conf: RoleGroup = + let executor_conf: RoleGroup = if self.spec.executor.is_some() { self.spec.executor.as_ref().unwrap().clone() } else { diff --git a/rust/operator-binary/src/history/config/jvm.rs b/rust/operator-binary/src/history/config/jvm.rs new file mode 100644 index 00000000..eb7504ed --- /dev/null +++ b/rust/operator-binary/src/history/config/jvm.rs @@ -0,0 +1,148 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::role_utils::{ + self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role, +}; + +use crate::crd::{ + constants::{ + JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, METRICS_PORT, + STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_PATH_CONFIG, + VOLUME_MOUNT_PATH_LOG_CONFIG, + }, + history::HistoryConfigFragment, + logdir::ResolvedLogDir, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to merge jvm argument overrides"))] + MergeJvmArgumentOverrides { source: role_utils::Error }, +} + +/// JVM arguments that go into `SPARK_HISTORY_OPTS` +pub fn construct_history_jvm_args( + role: &Role, + role_group: &str, + logdir: &ResolvedLogDir, +) -> Result { + // Note (@sbernauer): As of 2025-03-04, we did not set any heap related JVM arguments, so I + // kept the implementation as is. We can always re-visit this as needed. + + let mut jvm_args = vec![ + format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"), + format!( + "-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" + ), + format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml") + ]; + + if logdir.tls_enabled() { + jvm_args.extend([ + format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), + format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), + "-Djavax.net.ssl.trustStoreType=pkcs12".to_owned(), + ]); + } + + let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args); + let merged = role + .get_merged_jvm_argument_overrides(role_group, &operator_generated) + .context(MergeJvmArgumentOverridesSnafu)?; + Ok(merged.effective_jvm_config_after_merging().join(" ")) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crd::history::v1alpha1::SparkHistoryServer; + + #[test] + fn test_construct_jvm_arguments_defaults() { + let input = r#" + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkHistoryServer + metadata: + name: spark-history + spec: + image: + productVersion: 3.5.2 + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + nodes: + roleGroups: + default: + replicas: 1 + config: + cleaner: true + "#; + + let jvm_config = construct_jvm_config_for_test(input); + + assert_eq!( + jvm_config, + "-Dlog4j.configurationFile=/stackable/log_config/log4j2.properties \ + -Djava.security.properties=/stackable/spark/conf/security.properties \ + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=18081:/stackable/jmx/config.yaml" + ); + } + + #[test] + fn test_construct_jvm_argument_overrides() { + let input = r#" + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkHistoryServer + metadata: + name: spark-history + spec: + image: + productVersion: 3.5.2 + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + nodes: + jvmArgumentOverrides: + add: + - -Dhttps.proxyHost=proxy.my.corp + - -Dhttps.proxyPort=8080 + - -Djava.net.preferIPv4Stack=true + roleGroups: + default: + replicas: 1 + jvmArgumentOverrides: + removeRegex: + - -Dhttps.proxyPort=.* + add: + - -Dhttps.proxyPort=1234 + config: + cleaner: true + "#; + + let jvm_config = construct_jvm_config_for_test(input); + + assert_eq!( + jvm_config, + "-Dlog4j.configurationFile=/stackable/log_config/log4j2.properties \ + -Djava.security.properties=/stackable/spark/conf/security.properties \ + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=18081:/stackable/jmx/config.yaml \ + -Dhttps.proxyHost=proxy.my.corp \ + -Djava.net.preferIPv4Stack=true \ + -Dhttps.proxyPort=1234" + ); + } + + fn construct_jvm_config_for_test(history_server: &str) -> String { + let deserializer = serde_yaml::Deserializer::from_str(history_server); + let history_server: SparkHistoryServer = + serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); + + let role = history_server.role(); + let resolved_log_dir = ResolvedLogDir::Custom("local:/tmp/foo".to_owned()); + + construct_history_jvm_args(role, "default", &resolved_log_dir).unwrap() + } +} diff --git a/rust/operator-binary/src/history/config/mod.rs b/rust/operator-binary/src/history/config/mod.rs new file mode 100644 index 00000000..271c6d99 --- /dev/null +++ b/rust/operator-binary/src/history/config/mod.rs @@ -0,0 +1 @@ +pub mod jvm; diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 96535e2f..5a8acb7c 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -202,6 +202,9 @@ pub enum Error { InvalidSparkHistoryServer { source: error_boundary::InvalidObject, }, + + #[snafu(display("failed to merge environment config and/or overrides"))] + MergeEnv { source: crate::crd::history::Error }, } type Result = std::result::Result; @@ -517,7 +520,13 @@ fn build_stateful_set( .rolegroup(rolegroupref) .with_context(|_| CannotRetrieveRoleGroupSnafu)?; - let merged_env = shs.merged_env(log_dir, role_group.config.env_overrides); + let merged_env = shs + .merged_env( + &rolegroupref.role_group, + log_dir, + role_group.config.env_overrides, + ) + .context(MergeEnvSnafu)?; let container_name = "spark-history"; let container = ContainerBuilder::new(container_name) diff --git a/rust/operator-binary/src/history/mod.rs b/rust/operator-binary/src/history/mod.rs index eca545c7..c374e5b3 100644 --- a/rust/operator-binary/src/history/mod.rs +++ b/rust/operator-binary/src/history/mod.rs @@ -1,2 +1,3 @@ +pub mod config; pub mod history_controller; pub mod operations; diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index a3dc11a3..599ef84e 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -33,6 +33,7 @@ use crate::crd::{ SparkApplication, }; +mod config; mod crd; mod history; mod pod_driver_controller;