Skip to content

Commit 7586a98

Browse files
committed
Implement stuff for SparkApplication
1 parent 5dd8f74 commit 7586a98

File tree

5 files changed

+178
-19
lines changed

5 files changed

+178
-19
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use snafu::{ResultExt, Snafu};
2+
use stackable_operator::{
3+
commons::s3::S3ConnectionSpec,
4+
role_utils::{self, JvmArgumentOverrides},
5+
};
6+
7+
use crate::crd::{
8+
constants::{
9+
JVM_SECURITY_PROPERTIES_FILE, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE,
10+
VOLUME_MOUNT_PATH_LOG_CONFIG,
11+
},
12+
logdir::ResolvedLogDir,
13+
tlscerts::tls_secret_names,
14+
v1alpha1::SparkApplication,
15+
};
16+
17+
#[derive(Snafu, Debug)]
18+
pub enum Error {
19+
#[snafu(display("failed to merge jvm argument overrides"))]
20+
MergeJvmArgumentOverrides { source: role_utils::Error },
21+
}
22+
23+
/// JVM arguments that go into
24+
/// 1.`spark.driver.extraJavaOptions`
25+
/// 2. `spark.executor.extraJavaOptions`
26+
27+
pub fn construct_extra_java_options(
28+
spark_application: &SparkApplication,
29+
s3_conn: &Option<S3ConnectionSpec>,
30+
log_dir: &Option<ResolvedLogDir>,
31+
) -> Result<(String, String), Error> {
32+
// Note (@sbernauer): As of 2025-03-04, we did not set any heap related JVM arguments, so I
33+
// kept the implementation as is. We can always re-visit this as needed.
34+
35+
let mut jvm_args = vec![format!(
36+
"-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
37+
)];
38+
39+
if tls_secret_names(s3_conn, log_dir).is_some() {
40+
jvm_args.extend([
41+
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
42+
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
43+
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
44+
]);
45+
}
46+
47+
let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args);
48+
let from_driver = match &spark_application.spec.driver {
49+
Some(driver) => &driver.product_specific_common_config.jvm_argument_overrides,
50+
None => &JvmArgumentOverrides::default(),
51+
};
52+
let from_executor = match &spark_application.spec.executor {
53+
Some(executor) => {
54+
&executor
55+
.config
56+
.product_specific_common_config
57+
.jvm_argument_overrides
58+
}
59+
None => &JvmArgumentOverrides::default(),
60+
};
61+
62+
// Please note that the merge order is different than we normally do!
63+
// This is not trivial, as the merge operation is not purely additive (as it is with e.g. `PodTemplateSpec).
64+
let mut from_driver = from_driver.clone();
65+
let mut from_executor = from_executor.clone();
66+
from_driver
67+
.try_merge(&operator_generated)
68+
.context(MergeJvmArgumentOverridesSnafu)?;
69+
from_executor
70+
.try_merge(&operator_generated)
71+
.context(MergeJvmArgumentOverridesSnafu)?;
72+
73+
Ok((
74+
from_driver.effective_jvm_config_after_merging().join("\n"),
75+
from_executor
76+
.effective_jvm_config_after_merging()
77+
.join("\n"),
78+
))
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
use indoc::indoc;
84+
85+
use super::*;
86+
87+
#[test]
88+
fn test_construct_jvm_arguments_defaults() {
89+
let input = r#"
90+
apiVersion: spark.stackable.tech/v1alpha1
91+
kind: SparkApplication
92+
metadata:
93+
name: spark-example
94+
spec:
95+
mode: cluster
96+
mainApplicationFile: test.py
97+
sparkImage:
98+
productVersion: 1.2.3
99+
"#;
100+
101+
let deserializer = serde_yaml::Deserializer::from_str(input);
102+
let spark_app: SparkApplication =
103+
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
104+
let (driver_extra_java_options, executor_extra_java_options) =
105+
construct_extra_java_options(&spark_app, &None, &None).unwrap();
106+
107+
assert_eq!(
108+
driver_extra_java_options,
109+
"-Djava.security.properties=/stackable/log_config/security.properties"
110+
);
111+
assert_eq!(
112+
executor_extra_java_options,
113+
"-Djava.security.properties=/stackable/log_config/security.properties"
114+
);
115+
}
116+
117+
#[test]
118+
fn test_construct_jvm_argument_overrides() {
119+
let input = r#"
120+
apiVersion: spark.stackable.tech/v1alpha1
121+
kind: SparkApplication
122+
metadata:
123+
name: spark-example
124+
spec:
125+
mode: cluster
126+
mainApplicationFile: test.py
127+
sparkImage:
128+
productVersion: 1.2.3
129+
driver:
130+
jvmArgumentOverrides:
131+
add:
132+
- -Dhttps.proxyHost=from-driver
133+
executor:
134+
jvmArgumentOverrides:
135+
add:
136+
- -Dhttps.proxyHost=from-executor
137+
removeRegex:
138+
- -Djava.security.properties=.*
139+
"#;
140+
141+
let deserializer = serde_yaml::Deserializer::from_str(input);
142+
let spark_app: SparkApplication =
143+
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
144+
let (driver_extra_java_options, executor_extra_java_options) =
145+
construct_extra_java_options(&spark_app, &None, &None).unwrap();
146+
147+
assert_eq!(
148+
driver_extra_java_options,
149+
indoc! {"
150+
-Djava.security.properties=/stackable/log_config/security.properties
151+
-Dhttps.proxyHost=from-driver"}
152+
);
153+
assert_eq!(
154+
executor_extra_java_options,
155+
"-Dhttps.proxyHost=from-executor"
156+
);
157+
}
158+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod jvm;

rust/operator-binary/src/crd/mod.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ use stackable_operator::{
4444
};
4545
use stackable_versioned::versioned;
4646

47-
use crate::crd::roles::{
48-
RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, SubmitConfig,
49-
SubmitConfigFragment, VolumeMounts,
47+
use crate::{
48+
config::jvm::construct_extra_java_options,
49+
crd::roles::{
50+
RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode,
51+
SubmitConfig, SubmitConfigFragment, VolumeMounts,
52+
},
5053
};
5154

5255
pub mod affinity;
@@ -110,6 +113,9 @@ pub enum Error {
110113

111114
#[snafu(display("failed to configure log directory"))]
112115
ConfigureLogDir { source: logdir::Error },
116+
117+
#[snafu(display("failed to construct JVM arguments"))]
118+
ConstructJvmArguments { source: crate::config::jvm::Error },
113119
}
114120

115121
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)]
@@ -171,6 +177,9 @@ pub struct SparkApplicationSpec {
171177
/// such as templates, and passes it on to Spark.
172178
/// The reason this property uses its own type (SubmitConfigFragment) is because logging is not
173179
/// supported for spark-submit processes.
180+
//
181+
// IMPORTANT: Please note that the jvmArgumentOverrides have no effect here!
182+
// However, due to product-config things I wasn't able to remove them.
174183
#[serde(default, skip_serializing_if = "Option::is_none")]
175184
pub job: Option<CommonConfiguration<SubmitConfigFragment, JavaCommonConfig>>,
176185

@@ -580,23 +589,12 @@ impl v1alpha1::SparkApplication {
580589
}
581590
}
582591

583-
// Extra JVM opts:
584-
// - java security properties
585-
// - s3 with TLS
586-
let mut extra_java_opts = vec![format!(
587-
"-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
588-
)];
589-
if tlscerts::tls_secret_names(s3conn, log_dir).is_some() {
590-
extra_java_opts.extend(vec![
591-
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
592-
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
593-
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
594-
]);
595-
}
596-
let str_extra_java_opts = extra_java_opts.join(" ");
592+
let (driver_extra_java_options, executor_extra_java_options) =
593+
construct_extra_java_options(&self, s3conn, log_dir)
594+
.context(ConstructJvmArgumentsSnafu)?;
597595
submit_cmd.extend(vec![
598-
format!("--conf spark.driver.extraJavaOptions=\"{str_extra_java_opts}\""),
599-
format!("--conf spark.executor.extraJavaOptions=\"{str_extra_java_opts}\""),
596+
format!("--conf spark.driver.extraJavaOptions=\"{driver_extra_java_options}\""),
597+
format!("--conf spark.executor.extraJavaOptions=\"{executor_extra_java_options}\""),
600598
]);
601599

602600
// repositories and packages arguments

rust/operator-binary/src/history/config/jvm.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub fn construct_history_jvm_args(
5050
.context(MergeJvmArgumentOverridesSnafu)?;
5151
Ok(merged.effective_jvm_config_after_merging().join("\n"))
5252
}
53+
5354
#[cfg(test)]
5455
mod tests {
5556
use indoc::indoc;

rust/operator-binary/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::crd::{
3333
SparkApplication,
3434
};
3535

36+
mod config;
3637
mod crd;
3738
mod history;
3839
mod pod_driver_controller;

0 commit comments

Comments
 (0)