Skip to content

Commit 0006b60

Browse files
Use a generic ResolvedLogDir instead of the concrete S3LogDir
1 parent 611261a commit 0006b60

File tree

7 files changed

+186
-96
lines changed

7 files changed

+186
-96
lines changed

rust/crd/src/history.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::s3logdir::S3LogDir;
2-
use crate::tlscerts;
1+
use crate::s3logdir::ResolvedLogDir;
32
use crate::{affinity::history_affinity, constants::*};
43

54
use product_config::{types::PropertyNameKind, ProductConfigManager};
@@ -235,7 +234,7 @@ impl SparkHistoryServer {
235234

236235
pub fn merged_env(
237236
&self,
238-
s3logdir: &S3LogDir,
237+
logdir: &ResolvedLogDir,
239238
role_group_env_overrides: HashMap<String, String>,
240239
) -> Vec<EnvVar> {
241240
// Maps env var name to env var object. This allows env_overrides to work
@@ -271,7 +270,7 @@ impl SparkHistoryServer {
271270
];
272271

273272
// if TLS is enabled build truststore
274-
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
273+
if logdir.tls_enabled() {
275274
history_opts.extend(vec![
276275
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
277276
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
@@ -329,6 +328,7 @@ impl SparkHistoryServer {
329328
pub enum LogFileDirectorySpec {
330329
#[strum(serialize = "s3")]
331330
S3(S3LogFileDirectorySpec),
331+
CustomLogDirectory(String),
332332
}
333333

334334
#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)]
@@ -456,6 +456,8 @@ impl Configuration for HistoryConfigFragment {
456456

457457
#[cfg(test)]
458458
mod test {
459+
use crate::s3logdir::S3LogDir;
460+
459461
use super::*;
460462
use indoc::indoc;
461463
use stackable_operator::commons::{
@@ -495,7 +497,7 @@ mod test {
495497
let history: SparkHistoryServer =
496498
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
497499

498-
let s3_log_dir: S3LogDir = S3LogDir {
500+
let log_dir = ResolvedLogDir::S3(S3LogDir {
499501
bucket: ResolvedS3Bucket {
500502
bucket_name: "my-bucket".to_string(),
501503
connection: ResolvedS3Connection {
@@ -507,10 +509,10 @@ mod test {
507509
},
508510
},
509511
prefix: "prefix".to_string(),
510-
};
512+
});
511513

512514
let merged_env = history.merged_env(
513-
&s3_log_dir,
515+
&log_dir,
514516
history
515517
.spec
516518
.nodes

rust/crd/src/lib.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub use crate::roles::*;
1111
use constants::*;
1212
use history::LogFileDirectorySpec;
1313
use product_config::{types::PropertyNameKind, ProductConfigManager};
14-
use s3logdir::S3LogDir;
14+
use s3logdir::ResolvedLogDir;
1515
use serde::{Deserialize, Serialize};
1616
use snafu::{OptionExt, ResultExt, Snafu};
1717
use stackable_operator::{
@@ -208,7 +208,6 @@ pub struct SparkApplicationSpec {
208208
pub env: Vec<EnvVar>,
209209

210210
/// The log file directory definition used by the Spark history server.
211-
/// Currently only S3 buckets are supported.
212211
#[serde(default, skip_serializing_if = "Option::is_none")]
213212
pub log_file_directory: Option<LogFileDirectorySpec>,
214213
}
@@ -280,7 +279,7 @@ impl SparkApplication {
280279
pub fn volumes(
281280
&self,
282281
s3conn: &Option<S3ConnectionSpec>,
283-
s3logdir: &Option<S3LogDir>,
282+
logdir: &Option<ResolvedLogDir>,
284283
log_config_map: Option<&str>,
285284
) -> Result<Vec<Volume>, Error> {
286285
let mut result: Vec<Volume> = self.spec.volumes.clone();
@@ -313,7 +312,7 @@ impl SparkApplication {
313312
);
314313
}
315314

316-
if let Some(log_dir) = s3logdir.as_ref() {
315+
if let Some(log_dir) = logdir.as_ref() {
317316
if let Some(volume) = log_dir
318317
.credentials_volume()
319318
.context(ConfigureS3LogDirSnafu)?
@@ -348,7 +347,7 @@ impl SparkApplication {
348347
.build(),
349348
);
350349
}
351-
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
350+
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, logdir) {
352351
result.push(
353352
VolumeBuilder::new(STACKABLE_TRUST_STORE_NAME)
354353
.with_empty_dir(None::<String>, Some(Quantity("5Mi".to_string())))
@@ -385,7 +384,7 @@ impl SparkApplication {
385384
pub fn spark_job_volume_mounts(
386385
&self,
387386
s3conn: &Option<S3ConnectionSpec>,
388-
s3logdir: &Option<S3LogDir>,
387+
logdir: &Option<ResolvedLogDir>,
389388
) -> Vec<VolumeMount> {
390389
let mut tmpl_mounts = vec![
391390
VolumeMount {
@@ -400,7 +399,7 @@ impl SparkApplication {
400399
},
401400
];
402401

403-
tmpl_mounts = self.add_common_volume_mounts(tmpl_mounts, s3conn, s3logdir, false);
402+
tmpl_mounts = self.add_common_volume_mounts(tmpl_mounts, s3conn, logdir, false);
404403

405404
if let Some(CommonConfiguration {
406405
config:
@@ -424,7 +423,7 @@ impl SparkApplication {
424423
&self,
425424
mut mounts: Vec<VolumeMount>,
426425
s3conn: &Option<S3ConnectionSpec>,
427-
s3logdir: &Option<S3LogDir>,
426+
logdir: &Option<ResolvedLogDir>,
428427
logging_enabled: bool,
429428
) -> Vec<VolumeMount> {
430429
if self.spec.image.is_some() {
@@ -457,7 +456,7 @@ impl SparkApplication {
457456
});
458457
}
459458

460-
if let Some(vm) = s3logdir.as_ref().and_then(|o| o.credentials_volume_mount()) {
459+
if let Some(vm) = logdir.as_ref().and_then(|o| o.credentials_volume_mount()) {
461460
mounts.push(vm);
462461
}
463462

@@ -482,7 +481,7 @@ impl SparkApplication {
482481
..VolumeMount::default()
483482
});
484483
}
485-
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
484+
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, logdir) {
486485
mounts.push(VolumeMount {
487486
name: STACKABLE_TRUST_STORE_NAME.into(),
488487
mount_path: STACKABLE_TRUST_STORE.into(),
@@ -521,7 +520,7 @@ impl SparkApplication {
521520
&self,
522521
serviceaccount_name: &str,
523522
s3conn: &Option<S3ConnectionSpec>,
524-
s3_log_dir: &Option<S3LogDir>,
523+
log_dir: &Option<ResolvedLogDir>,
525524
spark_image: &str,
526525
) -> Result<Vec<String>, Error> {
527526
// mandatory properties
@@ -584,7 +583,7 @@ impl SparkApplication {
584583
let mut extra_java_opts = vec![format!(
585584
"-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
586585
)];
587-
if tlscerts::tls_secret_names(s3conn, s3_log_dir).is_some() {
586+
if tlscerts::tls_secret_names(s3conn, log_dir).is_some() {
588587
extra_java_opts.extend(vec![
589588
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
590589
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
@@ -645,7 +644,7 @@ impl SparkApplication {
645644
submit_conf.insert("spark.executor.instances".to_string(), replicas.to_string());
646645
}
647646

648-
if let Some(log_dir) = s3_log_dir {
647+
if let Some(log_dir) = log_dir {
649648
submit_conf.extend(
650649
log_dir
651650
.application_spark_config()
@@ -683,7 +682,7 @@ impl SparkApplication {
683682
pub fn env(
684683
&self,
685684
s3conn: &Option<S3ConnectionSpec>,
686-
s3logdir: &Option<S3LogDir>,
685+
logdir: &Option<ResolvedLogDir>,
687686
) -> Vec<EnvVar> {
688687
let mut e: Vec<EnvVar> = self.spec.env.clone();
689688
if self.requirements().is_some() {
@@ -695,7 +694,7 @@ impl SparkApplication {
695694
value_from: None,
696695
});
697696
}
698-
if tlscerts::tls_secret_names(s3conn, s3logdir).is_some() {
697+
if tlscerts::tls_secret_names(s3conn, logdir).is_some() {
699698
e.push(EnvVar {
700699
name: "STACKABLE_TLS_STORE_PASSWORD".to_string(),
701700
value: Some(STACKABLE_TLS_STORE_PASSWORD.to_string()),

rust/crd/src/roles.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
//! each role is named "default". These roles are transparent to the user.
1414
//!
1515
//! The history server has its own role completely unrelated to this module.
16+
use crate::ResolvedLogDir;
1617
use std::{collections::BTreeMap, slice};
1718

1819
use serde::{Deserialize, Serialize};
1920

20-
use crate::s3logdir::S3LogDir;
2121
use crate::SparkApplication;
2222
use stackable_operator::{
2323
commons::{
@@ -148,10 +148,10 @@ impl RoleConfig {
148148
&self,
149149
spark_application: &SparkApplication,
150150
s3conn: &Option<S3ConnectionSpec>,
151-
s3logdir: &Option<S3LogDir>,
151+
logdir: &Option<ResolvedLogDir>,
152152
) -> Vec<VolumeMount> {
153153
let volume_mounts = self.volume_mounts.clone().into();
154-
spark_application.add_common_volume_mounts(volume_mounts, s3conn, s3logdir, true)
154+
spark_application.add_common_volume_mounts(volume_mounts, s3conn, logdir, true)
155155
}
156156
}
157157

rust/crd/src/s3logdir.rs

Lines changed: 109 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,46 +49,129 @@ pub enum Error {
4949
ConfigureS3 { source: S3Error },
5050
}
5151

52+
pub enum ResolvedLogDir {
53+
S3(S3LogDir),
54+
Custom(String),
55+
}
56+
57+
impl ResolvedLogDir {
58+
pub async fn resolve(
59+
log_file_dir: &LogFileDirectorySpec,
60+
namespace: Option<String>,
61+
client: &stackable_operator::client::Client,
62+
) -> Result<ResolvedLogDir, Error> {
63+
match log_file_dir {
64+
S3(s3_log_dir) => S3LogDir::resolve(s3_log_dir, namespace, client)
65+
.await
66+
.map(ResolvedLogDir::S3),
67+
LogFileDirectorySpec::CustomLogDirectory(custom_log_dir) => {
68+
Ok(ResolvedLogDir::Custom(custom_log_dir.to_owned()))
69+
}
70+
}
71+
}
72+
73+
pub fn tls_enabled(&self) -> bool {
74+
self.tls_secret_name().is_some()
75+
}
76+
77+
pub fn tls_secret_name(&self) -> Option<&str> {
78+
match self {
79+
ResolvedLogDir::S3(s3_log_dir) => {
80+
tlscerts::tls_secret_name(&s3_log_dir.bucket.connection)
81+
}
82+
ResolvedLogDir::Custom(_) => None,
83+
}
84+
}
85+
86+
pub fn history_server_spark_config(&self) -> Result<BTreeMap<String, String>, Error> {
87+
match self {
88+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.history_server_spark_config(),
89+
ResolvedLogDir::Custom(custom_log_dir) => Ok(BTreeMap::from([(
90+
"spark.history.fs.logDirectory".to_string(),
91+
custom_log_dir.to_string(),
92+
)])),
93+
}
94+
}
95+
96+
pub fn application_spark_config(&self) -> Result<BTreeMap<String, String>, Error> {
97+
match self {
98+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.application_spark_config(),
99+
ResolvedLogDir::Custom(custom_log_dir) => Ok(BTreeMap::from([
100+
("spark.eventLog.enabled".to_string(), "true".to_string()),
101+
("spark.eventLog.dir".to_string(), custom_log_dir.to_string()),
102+
])),
103+
}
104+
}
105+
106+
pub fn volumes(&self) -> Result<Vec<Volume>, Error> {
107+
match self {
108+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.volumes(),
109+
ResolvedLogDir::Custom(_) => Ok(vec![]),
110+
}
111+
}
112+
113+
pub fn volume_mounts(&self) -> Vec<VolumeMount> {
114+
match self {
115+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.volume_mounts(),
116+
ResolvedLogDir::Custom(_) => vec![],
117+
}
118+
}
119+
120+
pub fn credentials_volume(&self) -> Result<Option<Volume>, Error> {
121+
match self {
122+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_volume(),
123+
ResolvedLogDir::Custom(_) => Ok(None),
124+
}
125+
}
126+
127+
pub fn credentials_volume_mount(&self) -> Option<VolumeMount> {
128+
match self {
129+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_volume_mount(),
130+
ResolvedLogDir::Custom(_) => None,
131+
}
132+
}
133+
134+
pub fn credentials(&self) -> Option<SecretClassVolume> {
135+
match self {
136+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials(),
137+
ResolvedLogDir::Custom(_) => None,
138+
}
139+
}
140+
141+
pub fn credentials_mount_path(&self) -> Option<String> {
142+
match self {
143+
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_mount_path(),
144+
ResolvedLogDir::Custom(_) => None,
145+
}
146+
}
147+
}
148+
52149
pub struct S3LogDir {
53150
pub bucket: ResolvedS3Bucket,
54151
pub prefix: String,
55152
}
56153

57154
impl S3LogDir {
58155
pub async fn resolve(
59-
log_file_dir: Option<&LogFileDirectorySpec>,
156+
log_file_dir: &S3LogFileDirectorySpec,
60157
namespace: Option<String>,
61158
client: &stackable_operator::client::Client,
62-
) -> Result<Option<S3LogDir>, Error> {
63-
#[allow(irrefutable_let_patterns)]
64-
let (bucket, prefix) = if let Some(S3(S3LogFileDirectorySpec {
65-
bucket: bucket_def,
66-
prefix,
67-
})) = log_file_dir
68-
{
69-
(
70-
bucket_def
71-
.clone()
72-
.resolve(client, namespace.unwrap().as_str())
73-
.await
74-
.context(ConfigureS3Snafu)?,
75-
prefix.clone(),
76-
)
77-
} else {
78-
// !!!!!
79-
// Ugliness alert!
80-
// No point in trying to resolve the connection anymore since there is no
81-
// log_file_dir in the first place.
82-
// This can casually happen for Spark applications that don't use a history server
83-
// !!!!!
84-
return Ok(None);
85-
};
159+
) -> Result<S3LogDir, Error> {
160+
let bucket = log_file_dir
161+
.bucket
162+
.clone()
163+
.resolve(client, namespace.unwrap().as_str())
164+
.await
165+
.context(ConfigureS3Snafu)?;
86166

87167
if bucket.connection.tls.uses_tls() && !bucket.connection.tls.uses_tls() {
88168
return S3TlsNoVerificationNotSupportedSnafu.fail();
89169
}
90170

91-
Ok(Some(S3LogDir { bucket, prefix }))
171+
Ok(S3LogDir {
172+
bucket,
173+
prefix: log_file_dir.prefix.to_owned(),
174+
})
92175
}
93176

94177
/// Constructs the properties needed for loading event logs from S3.

0 commit comments

Comments
 (0)