Skip to content

Commit 696843a

Browse files
adwk67sbernauer
andauthored
fix: Pass-through envOverrides (#451)
* fixed envOverrides and extended test * renamed test as it now covers env- and pod-overrides * WIP history server tests * working test for history server * changelog * Update CHANGELOG.md Co-authored-by: Sebastian Bernauer <[email protected]> * Update rust/crd/src/lib.rs Co-authored-by: Sebastian Bernauer <[email protected]> * Update rust/operator-binary/src/history/history_controller.rs Co-authored-by: Sebastian Bernauer <[email protected]> * move merge-env logic to history.rs and improve unit test * Update rust/crd/src/history.rs Co-authored-by: Sebastian Bernauer <[email protected]> * renamed test * used named tuple for readability --------- Co-authored-by: Sebastian Bernauer <[email protected]>
1 parent 4e2006c commit 696843a

35 files changed

+262
-58
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ All notable changes to this project will be documented in this file.
1212
- `volumes`
1313
- `volumeMounts`
1414

15+
### Fixed
16+
17+
- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
18+
1519
[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
20+
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451
1621

1722
## [24.7.0] - 2024-07-24
1823

rust/crd/src/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ pub const SPARK_DEFAULTS_FILE_NAME: &str = "spark-defaults.conf";
7777

7878
pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole";
7979
pub const SPARK_UID: i64 = 1000;
80+
pub const METRICS_PORT: u16 = 18081;

rust/crd/src/history.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use crate::s3logdir::S3LogDir;
2+
use crate::tlscerts;
13
use crate::{affinity::history_affinity, constants::*};
24

35
use product_config::{types::PropertyNameKind, ProductConfigManager};
46
use serde::{Deserialize, Serialize};
57
use snafu::{OptionExt, ResultExt, Snafu};
8+
use stackable_operator::k8s_openapi::api::core::v1::EnvVar;
69
use stackable_operator::role_utils::RoleGroup;
710
use stackable_operator::{
811
commons::{
@@ -232,6 +235,96 @@ impl SparkHistoryServer {
232235
)
233236
.context(InvalidProductConfigSnafu)
234237
}
238+
239+
pub fn merged_env(
240+
&self,
241+
s3logdir: &S3LogDir,
242+
role_group_env_overrides: HashMap<String, String>,
243+
) -> Vec<EnvVar> {
244+
// Maps env var name to env var object. This allows env_overrides to work
245+
// as expected (i.e. users can override the env var value).
246+
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();
247+
let role_env_overrides = &self.role().config.env_overrides;
248+
249+
// This env var prevents the history server from detaching itself from the
250+
// start script because this leads to the Pod terminating immediately.
251+
vars.insert(
252+
"SPARK_NO_DAEMONIZE".to_string(),
253+
EnvVar {
254+
name: "SPARK_NO_DAEMONIZE".to_string(),
255+
value: Some("true".into()),
256+
value_from: None,
257+
},
258+
);
259+
vars.insert(
260+
"SPARK_DAEMON_CLASSPATH".to_string(),
261+
EnvVar {
262+
name: "SPARK_DAEMON_CLASSPATH".to_string(),
263+
value: Some("/stackable/spark/extra-jars/*".into()),
264+
value_from: None,
265+
},
266+
);
267+
268+
let mut history_opts = vec![
269+
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
270+
format!(
271+
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
272+
),
273+
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
274+
];
275+
276+
// if TLS is enabled build truststore
277+
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
278+
history_opts.extend(vec![
279+
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
280+
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
281+
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
282+
]);
283+
}
284+
285+
vars.insert(
286+
"SPARK_HISTORY_OPTS".to_string(),
287+
EnvVar {
288+
name: "SPARK_HISTORY_OPTS".to_string(),
289+
value: Some(history_opts.join(" ")),
290+
value_from: None,
291+
},
292+
);
293+
294+
// apply the role overrides
295+
let mut role_envs = role_env_overrides.iter().map(|(env_name, env_value)| {
296+
(
297+
env_name.clone(),
298+
EnvVar {
299+
name: env_name.clone(),
300+
value: Some(env_value.to_owned()),
301+
value_from: None,
302+
},
303+
)
304+
});
305+
306+
vars.extend(&mut role_envs);
307+
308+
// apply the role-group overrides
309+
let mut role_group_envs =
310+
role_group_env_overrides
311+
.into_iter()
312+
.map(|(env_name, env_value)| {
313+
(
314+
env_name.clone(),
315+
EnvVar {
316+
name: env_name.clone(),
317+
value: Some(env_value),
318+
value_from: None,
319+
},
320+
)
321+
});
322+
323+
vars.extend(&mut role_group_envs);
324+
325+
// convert to Vec
326+
vars.into_values().collect()
327+
}
235328
}
236329

237330
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, Display)]
@@ -363,3 +456,74 @@ impl Configuration for HistoryConfigFragment {
363456
Ok(BTreeMap::new())
364457
}
365458
}
459+
460+
#[cfg(test)]
461+
mod test {
462+
use super::*;
463+
use indoc::indoc;
464+
use stackable_operator::commons::s3::InlinedS3BucketSpec;
465+
466+
#[test]
467+
pub fn test_env_overrides() {
468+
let input = indoc! {r#"
469+
---
470+
apiVersion: spark.stackable.tech/v1alpha1
471+
kind: SparkHistoryServer
472+
metadata:
473+
name: spark-history
474+
spec:
475+
image:
476+
productVersion: 3.5.1
477+
logFileDirectory:
478+
s3:
479+
prefix: eventlogs/
480+
bucket:
481+
reference: spark-history-s3-bucket
482+
nodes:
483+
envOverrides:
484+
TEST_SPARK_HIST_VAR: ROLE
485+
roleGroups:
486+
default:
487+
replicas: 1
488+
config:
489+
cleaner: true
490+
envOverrides:
491+
TEST_SPARK_HIST_VAR: ROLEGROUP
492+
"#};
493+
494+
let deserializer = serde_yaml::Deserializer::from_str(input);
495+
let history: SparkHistoryServer =
496+
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
497+
498+
let s3_log_dir: S3LogDir = S3LogDir {
499+
bucket: InlinedS3BucketSpec {
500+
bucket_name: None,
501+
connection: None,
502+
},
503+
prefix: "prefix".to_string(),
504+
};
505+
506+
let merged_env = history.merged_env(
507+
&s3_log_dir,
508+
history
509+
.spec
510+
.nodes
511+
.role_groups
512+
.get("default")
513+
.unwrap()
514+
.config
515+
.env_overrides
516+
.clone(),
517+
);
518+
519+
let env_map: BTreeMap<&str, Option<String>> = merged_env
520+
.iter()
521+
.map(|env_var| (env_var.name.as_str(), env_var.value.clone()))
522+
.collect();
523+
524+
assert_eq!(
525+
Some(&Some("ROLEGROUP".to_string())),
526+
env_map.get("TEST_SPARK_HIST_VAR")
527+
);
528+
}
529+
}

rust/crd/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,37 @@ impl SparkApplication {
727727
}
728728
}
729729

730+
pub fn merged_env(&self, role: SparkApplicationRole, env: &[EnvVar]) -> Vec<EnvVar> {
731+
// Use a BTreeMap internally to enable replacement of existing keys
732+
let mut env: BTreeMap<&String, EnvVar> = env
733+
.iter()
734+
.map(|env_var| (&env_var.name, env_var.clone()))
735+
.collect();
736+
737+
// Merge the role-specific envOverrides on top
738+
let role_envs = match role {
739+
SparkApplicationRole::Submit => self.spec.job.as_ref().map(|j| &j.env_overrides),
740+
SparkApplicationRole::Driver => self.spec.driver.as_ref().map(|d| &d.env_overrides),
741+
SparkApplicationRole::Executor => {
742+
self.spec.executor.as_ref().map(|e| &e.config.env_overrides)
743+
}
744+
};
745+
if let Some(role_envs) = role_envs {
746+
env.extend(role_envs.iter().map(|(k, v)| {
747+
(
748+
k,
749+
EnvVar {
750+
name: k.clone(),
751+
value: Some(v.clone()),
752+
..Default::default()
753+
},
754+
)
755+
}))
756+
}
757+
758+
env.into_values().collect()
759+
}
760+
730761
pub fn validated_role_config(
731762
&self,
732763
resolved_product_image: &ResolvedProductImage,

rust/crd/src/s3logdir.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ impl S3LogDir {
117117
/// * spark.hadoop.fs.s3a.aws.credentials.provider
118118
/// * spark.hadoop.fs.s3a.access.key
119119
/// * spark.hadoop.fs.s3a.secret.key
120-
/// instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
120+
///
121+
/// Instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
121122
/// on the container start command.
122123
pub fn history_server_spark_config(&self) -> BTreeMap<String, String> {
123124
let mut result = BTreeMap::new();

rust/operator-binary/src/history/history_controller.rs

Lines changed: 14 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ use stackable_operator::{
1515
api::{
1616
apps::v1::{StatefulSet, StatefulSetSpec},
1717
core::v1::{
18-
ConfigMap, EnvVar, PodSecurityContext, Service, ServiceAccount, ServicePort,
19-
ServiceSpec,
18+
ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec,
2019
},
2120
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
2221
},
@@ -36,14 +35,15 @@ use stackable_operator::{
3635
role_utils::RoleGroupRef,
3736
time::Duration,
3837
};
38+
use stackable_spark_k8s_crd::constants::METRICS_PORT;
3939
use stackable_spark_k8s_crd::{
4040
constants::{
4141
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME,
42-
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME,
43-
SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME,
44-
SPARK_UID, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG,
45-
VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG,
46-
VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
42+
JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME, SECRET_ACCESS_KEY,
43+
SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
44+
STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG,
45+
VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG,
46+
VOLUME_MOUNT_PATH_LOG_CONFIG,
4747
},
4848
history,
4949
history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer},
@@ -59,8 +59,6 @@ use stackable_operator::k8s_openapi::DeepMerge;
5959
use stackable_operator::logging::controller::ReconcilerError;
6060
use strum::{EnumDiscriminants, IntoStaticStr};
6161

62-
const METRICS_PORT: u16 = 18081;
63-
6462
#[derive(Snafu, Debug, EnumDiscriminants)]
6563
#[strum_discriminants(derive(IntoStaticStr))]
6664
#[allow(clippy::enum_variant_names)]
@@ -440,6 +438,12 @@ fn build_stateful_set(
440438
..PodSecurityContext::default()
441439
});
442440

441+
let role_group = shs
442+
.rolegroup(rolegroupref)
443+
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;
444+
445+
let merged_env = shs.merged_env(s3_log_dir, role_group.config.env_overrides);
446+
443447
let container_name = "spark-history";
444448
let container = ContainerBuilder::new(container_name)
445449
.context(InvalidContainerNameSnafu)?
@@ -449,7 +453,7 @@ fn build_stateful_set(
449453
.args(command_args(s3_log_dir))
450454
.add_container_port("http", 18080)
451455
.add_container_port("metrics", METRICS_PORT.into())
452-
.add_env_vars(env_vars(s3_log_dir))
456+
.add_env_vars(merged_env)
453457
.add_volume_mounts(s3_log_dir.volume_mounts())
454458
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
455459
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
@@ -477,10 +481,6 @@ fn build_stateful_set(
477481

478482
let mut pod_template = pb.build_template();
479483
pod_template.merge_from(shs.role().config.pod_overrides.clone());
480-
let role_group = shs
481-
.rolegroup(rolegroupref)
482-
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;
483-
484484
pod_template.merge_from(role_group.config.pod_overrides);
485485

486486
Ok(StatefulSet {
@@ -670,46 +670,6 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
670670
vec![String::from("-c"), command.join(" && ")]
671671
}
672672

673-
fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
674-
let mut vars: Vec<EnvVar> = vec![];
675-
676-
// This env var prevents the history server from detaching itself from the
677-
// start script because this leads to the Pod terminating immediately.
678-
vars.push(EnvVar {
679-
name: "SPARK_NO_DAEMONIZE".to_string(),
680-
value: Some("true".into()),
681-
value_from: None,
682-
});
683-
vars.push(EnvVar {
684-
name: "SPARK_DAEMON_CLASSPATH".to_string(),
685-
value: Some("/stackable/spark/extra-jars/*".into()),
686-
value_from: None,
687-
});
688-
689-
let mut history_opts = vec![
690-
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
691-
format!(
692-
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
693-
),
694-
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
695-
];
696-
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
697-
history_opts.extend(vec![
698-
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
699-
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
700-
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
701-
]);
702-
}
703-
704-
vars.push(EnvVar {
705-
name: "SPARK_HISTORY_OPTS".to_string(),
706-
value: Some(history_opts.join(" ")),
707-
value_from: None,
708-
});
709-
// if TLS is enabled build truststore
710-
vars
711-
}
712-
713673
fn labels<'a, T>(
714674
shs: &'a T,
715675
app_version_label: &'a str,

rust/operator-binary/src/spark_k8s_controller.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,9 +479,10 @@ fn pod_template(
479479
) -> Result<PodTemplateSpec> {
480480
let container_name = SparkContainer::Spark.to_string();
481481
let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?;
482+
let merged_env = spark_application.merged_env(role.clone(), env);
482483

483484
cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, s3logdir))
484-
.add_env_vars(env.to_vec())
485+
.add_env_vars(merged_env)
485486
.resources(config.resources.clone().into())
486487
.image_from_product_image(spark_image);
487488

@@ -716,13 +717,14 @@ fn spark_job(
716717
.context(IllegalContainerNameSnafu)?;
717718

718719
let args = [job_commands.join(" ")];
720+
let merged_env = spark_application.merged_env(SparkApplicationRole::Submit, env);
719721

720722
cb.image_from_product_image(spark_image)
721723
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
722724
.args(vec![args.join(" && ")])
723725
.resources(job_config.resources.clone().into())
724726
.add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir))
725-
.add_env_vars(env.to_vec())
727+
.add_env_vars(merged_env)
726728
.add_env_var(
727729
"SPARK_SUBMIT_OPTS",
728730
format!(

0 commit comments

Comments
 (0)