Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ All notable changes to this project will be documented in this file.
- `volumes`
- `volumeMounts`

### Fixed

- Fix envOverrides for spark cluster and history server ([#451]).

[#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450
[#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451

## [24.7.0] - 2024-07-24

Expand Down
61 changes: 61 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,64 @@ impl Configuration for HistoryConfigFragment {
Ok(BTreeMap::new())
}
}

#[cfg(test)]
mod test {
use super::*;
use indoc::indoc;

#[test]
pub fn test_env() {
let input = indoc! {r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
image:
productVersion: 3.5.1
logFileDirectory:
s3:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
nodes:
envOverrides:
TEST_SPARK_HIST_VAR: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR: ROLEGROUP
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

assert_eq!(
Some(&"ROLE".to_string()),
history
.spec
.nodes
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
assert_eq!(
Some(&"ROLEGROUP".to_string()),
history
.spec
.nodes
.role_groups
.get("default")
.unwrap()
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
}
}
30 changes: 30 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,36 @@ impl SparkApplication {
}
}

pub fn merged_env(&self, role: SparkApplicationRole, env: &[EnvVar]) -> Vec<EnvVar> {
// use a BTree internally to enable replacement of existing keys
let mut env_vars: BTreeMap<String, EnvVar> = BTreeMap::new();

for e in env {
env_vars.insert(e.clone().name, e.to_owned());
}

if let Some(env_map) = match role {
SparkApplicationRole::Submit => self.spec.job.clone().map(|j| j.env_overrides),
SparkApplicationRole::Driver => self.spec.driver.clone().map(|d| d.env_overrides),
SparkApplicationRole::Executor => {
self.spec.executor.clone().map(|r| r.config.env_overrides)
}
} {
for (k, v) in env_map {
env_vars.insert(
k.clone(),
EnvVar {
name: k,
value: Some(v),
value_from: None,
},
);
}
}

env_vars.into_values().collect()
}

pub fn validated_role_config(
&self,
resolved_product_image: &ResolvedProductImage,
Expand Down
3 changes: 2 additions & 1 deletion rust/crd/src/s3logdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ impl S3LogDir {
/// * spark.hadoop.fs.s3a.aws.credentials.provider
/// * spark.hadoop.fs.s3a.access.key
/// * spark.hadoop.fs.s3a.secret.key
/// instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
///
/// Instead, the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are set
/// on the container start command.
pub fn history_server_spark_config(&self) -> BTreeMap<String, String> {
let mut result = BTreeMap::new();
Expand Down
102 changes: 82 additions & 20 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,16 @@ fn build_stateful_set(
..PodSecurityContext::default()
});

let role_group = shs
.rolegroup(rolegroupref)
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;

let merged_env_vars = env_vars(
s3_log_dir,
shs.role().config.clone().env_overrides,
role_group.config.env_overrides,
);

let container_name = "spark-history";
let container = ContainerBuilder::new(container_name)
.context(InvalidContainerNameSnafu)?
Expand All @@ -449,7 +459,7 @@ fn build_stateful_set(
.args(command_args(s3_log_dir))
.add_container_port("http", 18080)
.add_container_port("metrics", METRICS_PORT.into())
.add_env_vars(env_vars(s3_log_dir))
.add_env_vars(merged_env_vars)
.add_volume_mounts(s3_log_dir.volume_mounts())
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
Expand Down Expand Up @@ -670,21 +680,33 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
vec![String::from("-c"), command.join(" && ")]
}

fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
let mut vars: Vec<EnvVar> = vec![];
fn env_vars(
s3logdir: &S3LogDir,
role_env_overrides: HashMap<String, String>,
role_group_env_overrides: HashMap<String, String>,
) -> Vec<EnvVar> {
// Maps env var name to env var object. This allows env_overrides to work
// as expected (i.e. users can override the env var value).
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();

// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.push(EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
});
vars.push(EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
});
vars.insert(
"SPARK_NO_DAEMONIZE".to_string(),
EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
},
);
vars.insert(
"SPARK_DAEMON_CLASSPATH".to_string(),
EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
},
);

let mut history_opts = vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
Expand All @@ -693,6 +715,8 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
];

// if TLS is enabled build truststore
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
history_opts.extend(vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
Expand All @@ -701,13 +725,51 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
]);
}

vars.push(EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
});
// if TLS is enabled build truststore
vars
vars.insert(
"SPARK_HISTORY_OPTS".to_string(),
EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
},
);

// apply the role overrides
let mut role_envs = role_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_envs);

// apply the role-group overrides
let mut role_group_envs = role_group_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_group_envs);

// convert to Vec
vars.into_values().collect()
}

fn labels<'a, T>(
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,10 @@ fn pod_template(
) -> Result<PodTemplateSpec> {
let container_name = SparkContainer::Spark.to_string();
let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?;
let merged_env = spark_application.merged_env(role.clone(), env);

cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.resources(config.resources.clone().into())
.image_from_product_image(spark_image);

Expand Down Expand Up @@ -716,13 +717,14 @@ fn spark_job(
.context(IllegalContainerNameSnafu)?;

let args = [job_commands.join(" ")];
let merged_env = spark_application.merged_env(SparkApplicationRole::Submit, env);

cb.image_from_product_image(spark_image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.resources(job_config.resources.clone().into())
.add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir))
.add_env_vars(env.to_vec())
.add_env_vars(merged_env)
.add_env_var(
"SPARK_SUBMIT_OPTS",
format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ spec:
# For possible properties see: https://spark.apache.org/docs/latest/monitoring.html#spark-history-server-configuration-options
#sparkConf:
nodes:
envOverrides:
TEST_SPARK_HIST_VAR_ROLE: ROLE
TEST_SPARK_HIST_VAR_FROM_RG: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR_FROM_RG: ROLEGROUP
TEST_SPARK_HIST_VAR_RG: ROLEGROUP
podOverrides:
spec:
containers:
Expand Down
10 changes: 10 additions & 0 deletions tests/templates/kuttl/overrides/07-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 30
commands:
- script: |
POD=$(kubectl -n $NAMESPACE get pod -l app.kubernetes.io/instance=spark-history -o name | head -n 1 | sed -e 's#pod/##')
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_ROLE").value' | grep 'ROLE'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_RG").value' | grep 'ROLEGROUP'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_HIST_VAR_FROM_RG").value' | grep 'ROLEGROUP'
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ spec:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
env:
- name: TEST_SPARK_VAR_0
value: ORIGINAL
- name: TEST_SPARK_VAR_1
value: DONOTREPLACE
job:
envOverrides: &envOverrides
TEST_SPARK_VAR_0: REPLACED
podOverrides:
spec:
containers:
Expand All @@ -37,6 +44,7 @@ spec:
cpu: 1500m
memory: 1024Mi
driver:
envOverrides: *envOverrides
podOverrides:
spec:
containers:
Expand All @@ -50,6 +58,7 @@ spec:
memory: 1024Mi
executor:
replicas: 1
envOverrides: *envOverrides
podOverrides:
spec:
containers:
Expand Down
15 changes: 15 additions & 0 deletions tests/templates/kuttl/overrides/11-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 30
commands:
- script: kubectl -n $NAMESPACE get job spark-pi-s3-1 -o yaml | yq '.spec.template.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get job spark-pi-s3-1 -o yaml | yq '.spec.template.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
- script: kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o json | jq -r '.data."template.yaml"' | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
- script: |
POD=$(kubectl -n $NAMESPACE get pod -l app.kubernetes.io/instance=spark-pi-s3-1 -o name | head -n 1 | sed -e 's#pod/##')
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_0").value' | grep 'REPLACED'
kubectl -n $NAMESPACE get pod $POD -o yaml | yq '.spec.containers[0].env[] | select (.name == "TEST_SPARK_VAR_1").value' | grep 'DONOTREPLACE'
2 changes: 1 addition & 1 deletion tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tests:
- spark
- s3-use-tls
- openshift
- name: pod_overrides
- name: overrides
dimensions:
- spark
- openshift
Expand Down
Loading