Skip to content

Commit 248479d

Browse files
committed
move merge-env logic to history.rs and improve unit test
1 parent 6f4b9da commit 248479d

File tree

3 files changed

+122
-123
lines changed

3 files changed

+122
-123
lines changed

rust/crd/src/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,4 @@ pub const SPARK_DEFAULTS_FILE_NAME: &str = "spark-defaults.conf";
7777

7878
pub const SPARK_CLUSTER_ROLE: &str = "spark-k8s-clusterrole";
7979
pub const SPARK_UID: i64 = 1000;
80+
pub const METRICS_PORT: u16 = 18081;

rust/crd/src/history.rs

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use crate::s3logdir::S3LogDir;
2+
use crate::tlscerts;
13
use crate::{affinity::history_affinity, constants::*};
24

35
use product_config::{types::PropertyNameKind, ProductConfigManager};
46
use serde::{Deserialize, Serialize};
57
use snafu::{OptionExt, ResultExt, Snafu};
8+
use stackable_operator::k8s_openapi::api::core::v1::EnvVar;
69
use stackable_operator::role_utils::RoleGroup;
710
use stackable_operator::{
811
commons::{
@@ -232,6 +235,93 @@ impl SparkHistoryServer {
232235
)
233236
.context(InvalidProductConfigSnafu)
234237
}
238+
239+
pub fn merged_env(
240+
&self,
241+
s3logdir: &S3LogDir,
242+
role_group_env_overrides: HashMap<String, String>,
243+
) -> Vec<EnvVar> {
244+
// Maps env var name to env var object. This allows env_overrides to work
245+
// as expected (i.e. users can override the env var value).
246+
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();
247+
let role_env_overrides = &self.role().config.env_overrides;
248+
249+
// This env var prevents the history server from detaching itself from the
250+
// start script because this leads to the Pod terminating immediately.
251+
vars.insert(
252+
"SPARK_NO_DAEMONIZE".to_string(),
253+
EnvVar {
254+
name: "SPARK_NO_DAEMONIZE".to_string(),
255+
value: Some("true".into()),
256+
value_from: None,
257+
},
258+
);
259+
vars.insert(
260+
"SPARK_DAEMON_CLASSPATH".to_string(),
261+
EnvVar {
262+
name: "SPARK_DAEMON_CLASSPATH".to_string(),
263+
value: Some("/stackable/spark/extra-jars/*".into()),
264+
value_from: None,
265+
},
266+
);
267+
268+
let mut history_opts = vec![
269+
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
270+
format!(
271+
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
272+
),
273+
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
274+
];
275+
276+
// if TLS is enabled build truststore
277+
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
278+
history_opts.extend(vec![
279+
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
280+
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
281+
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
282+
]);
283+
}
284+
285+
vars.insert(
286+
"SPARK_HISTORY_OPTS".to_string(),
287+
EnvVar {
288+
name: "SPARK_HISTORY_OPTS".to_string(),
289+
value: Some(history_opts.join(" ")),
290+
value_from: None,
291+
},
292+
);
293+
294+
// apply the role overrides
295+
let mut role_envs = role_env_overrides.iter().map(|env_var| {
296+
(
297+
env_var.0.clone(),
298+
EnvVar {
299+
name: env_var.0.clone(),
300+
value: Some(env_var.1.to_owned()),
301+
value_from: None,
302+
},
303+
)
304+
});
305+
306+
vars.extend(&mut role_envs);
307+
308+
// apply the role-group overrides
309+
let mut role_group_envs = role_group_env_overrides.into_iter().map(|env_var| {
310+
(
311+
env_var.0.clone(),
312+
EnvVar {
313+
name: env_var.0.clone(),
314+
value: Some(env_var.1),
315+
value_from: None,
316+
},
317+
)
318+
});
319+
320+
vars.extend(&mut role_group_envs);
321+
322+
// convert to Vec
323+
vars.into_values().collect()
324+
}
235325
}
236326

237327
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, Display)]
@@ -368,6 +458,7 @@ impl Configuration for HistoryConfigFragment {
368458
mod test {
369459
use super::*;
370460
use indoc::indoc;
461+
use stackable_operator::commons::s3::InlinedS3BucketSpec;
371462

372463
#[test]
373464
pub fn test_env() {
@@ -401,17 +492,16 @@ mod test {
401492
let history: SparkHistoryServer =
402493
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
403494

404-
assert_eq!(
405-
Some(&"ROLE".to_string()),
406-
history
407-
.spec
408-
.nodes
409-
.config
410-
.env_overrides
411-
.get("TEST_SPARK_HIST_VAR")
412-
);
413-
assert_eq!(
414-
Some(&"ROLEGROUP".to_string()),
495+
let s3_log_dir: S3LogDir = S3LogDir {
496+
bucket: InlinedS3BucketSpec {
497+
bucket_name: None,
498+
connection: None,
499+
},
500+
prefix: "prefix".to_string(),
501+
};
502+
503+
let merged_env = history.merged_env(
504+
&s3_log_dir,
415505
history
416506
.spec
417507
.nodes
@@ -420,7 +510,17 @@ mod test {
420510
.unwrap()
421511
.config
422512
.env_overrides
423-
.get("TEST_SPARK_HIST_VAR")
513+
.clone(),
514+
);
515+
516+
let env_map: BTreeMap<&str, Option<String>> = merged_env
517+
.iter()
518+
.map(|env_var| (env_var.name.as_str(), env_var.value.clone()))
519+
.collect();
520+
521+
assert_eq!(
522+
Some(&Some("ROLEGROUP".to_string())),
523+
env_map.get("TEST_SPARK_HIST_VAR")
424524
);
425525
}
426526
}

rust/operator-binary/src/history/history_controller.rs

Lines changed: 9 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ use stackable_operator::{
1515
api::{
1616
apps::v1::{StatefulSet, StatefulSetSpec},
1717
core::v1::{
18-
ConfigMap, EnvVar, PodSecurityContext, Service, ServiceAccount, ServicePort,
19-
ServiceSpec,
18+
ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec,
2019
},
2120
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
2221
},
@@ -36,14 +35,15 @@ use stackable_operator::{
3635
role_utils::RoleGroupRef,
3736
time::Duration,
3837
};
38+
use stackable_spark_k8s_crd::constants::METRICS_PORT;
3939
use stackable_spark_k8s_crd::{
4040
constants::{
4141
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME,
42-
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME,
43-
SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME,
44-
SPARK_UID, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG,
45-
VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG,
46-
VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
42+
JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME, SECRET_ACCESS_KEY,
43+
SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
44+
STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG,
45+
VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG,
46+
VOLUME_MOUNT_PATH_LOG_CONFIG,
4747
},
4848
history,
4949
history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer},
@@ -59,8 +59,6 @@ use stackable_operator::k8s_openapi::DeepMerge;
5959
use stackable_operator::logging::controller::ReconcilerError;
6060
use strum::{EnumDiscriminants, IntoStaticStr};
6161

62-
const METRICS_PORT: u16 = 18081;
63-
6462
#[derive(Snafu, Debug, EnumDiscriminants)]
6563
#[strum_discriminants(derive(IntoStaticStr))]
6664
#[allow(clippy::enum_variant_names)]
@@ -444,11 +442,7 @@ fn build_stateful_set(
444442
.rolegroup(rolegroupref)
445443
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;
446444

447-
let merged_env_vars = env_vars(
448-
s3_log_dir,
449-
shs.role().config.env_overrides.clone(),
450-
role_group.config.env_overrides,
451-
);
445+
let merged_env = shs.merged_env(s3_log_dir, role_group.config.env_overrides);
452446

453447
let container_name = "spark-history";
454448
let container = ContainerBuilder::new(container_name)
@@ -459,7 +453,7 @@ fn build_stateful_set(
459453
.args(command_args(s3_log_dir))
460454
.add_container_port("http", 18080)
461455
.add_container_port("metrics", METRICS_PORT.into())
462-
.add_env_vars(merged_env_vars)
456+
.add_env_vars(merged_env)
463457
.add_volume_mounts(s3_log_dir.volume_mounts())
464458
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
465459
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
@@ -487,10 +481,6 @@ fn build_stateful_set(
487481

488482
let mut pod_template = pb.build_template();
489483
pod_template.merge_from(shs.role().config.pod_overrides.clone());
490-
let role_group = shs
491-
.rolegroup(rolegroupref)
492-
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;
493-
494484
pod_template.merge_from(role_group.config.pod_overrides);
495485

496486
Ok(StatefulSet {
@@ -680,98 +670,6 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
680670
vec![String::from("-c"), command.join(" && ")]
681671
}
682672

683-
fn env_vars(
684-
s3logdir: &S3LogDir,
685-
role_env_overrides: HashMap<String, String>,
686-
role_group_env_overrides: HashMap<String, String>,
687-
) -> Vec<EnvVar> {
688-
// Maps env var name to env var object. This allows env_overrides to work
689-
// as expected (i.e. users can override the env var value).
690-
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();
691-
692-
// This env var prevents the history server from detaching itself from the
693-
// start script because this leads to the Pod terminating immediately.
694-
vars.insert(
695-
"SPARK_NO_DAEMONIZE".to_string(),
696-
EnvVar {
697-
name: "SPARK_NO_DAEMONIZE".to_string(),
698-
value: Some("true".into()),
699-
value_from: None,
700-
},
701-
);
702-
vars.insert(
703-
"SPARK_DAEMON_CLASSPATH".to_string(),
704-
EnvVar {
705-
name: "SPARK_DAEMON_CLASSPATH".to_string(),
706-
value: Some("/stackable/spark/extra-jars/*".into()),
707-
value_from: None,
708-
},
709-
);
710-
711-
let mut history_opts = vec![
712-
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
713-
format!(
714-
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
715-
),
716-
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
717-
];
718-
719-
// if TLS is enabled build truststore
720-
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
721-
history_opts.extend(vec![
722-
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
723-
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
724-
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
725-
]);
726-
}
727-
728-
vars.insert(
729-
"SPARK_HISTORY_OPTS".to_string(),
730-
EnvVar {
731-
name: "SPARK_HISTORY_OPTS".to_string(),
732-
value: Some(history_opts.join(" ")),
733-
value_from: None,
734-
},
735-
);
736-
737-
// apply the role overrides
738-
let mut role_envs = role_env_overrides
739-
.into_iter()
740-
.map(|env_var| {
741-
(
742-
env_var.0.clone(),
743-
EnvVar {
744-
name: env_var.0.clone(),
745-
value: Some(env_var.1),
746-
value_from: None,
747-
},
748-
)
749-
})
750-
.collect();
751-
752-
vars.append(&mut role_envs);
753-
754-
// apply the role-group overrides
755-
let mut role_group_envs = role_group_env_overrides
756-
.into_iter()
757-
.map(|env_var| {
758-
(
759-
env_var.0.clone(),
760-
EnvVar {
761-
name: env_var.0.clone(),
762-
value: Some(env_var.1),
763-
value_from: None,
764-
},
765-
)
766-
})
767-
.collect();
768-
769-
vars.append(&mut role_group_envs);
770-
771-
// convert to Vec
772-
vars.into_values().collect()
773-
}
774-
775673
fn labels<'a, T>(
776674
shs: &'a T,
777675
app_version_label: &'a str,

0 commit comments

Comments
 (0)