Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ All notable changes to this project will be documented in this file.

- The lifetime of auto generated TLS certificates is now configurable with the role and roleGroup
config property `requestedSecretLifetime`. This helps reducing frequent Pod restarts ([#501]).
- Run a `containerdebug` process in the background of each Spark container to collect debugging information ([#508]).

[#501]: https://github.com/stackabletech/spark-k8s-operator/pull/501
[#508]: https://github.com/stackabletech/spark-k8s-operator/pull/508

## [24.11.0] - 2024-11-18

Expand Down
10 changes: 10 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ impl SparkHistoryServer {
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();
let role_env_overrides = &self.role().config.env_overrides;

// Needed by the `containerdebug` running in the background of the history container
// to log it's tracing information to.
vars.insert(
"CONTAINERDEBUG_LOG_DIRECTORY".to_string(),
EnvVar {
name: "CONTAINERDEBUG_LOG_DIRECTORY".to_string(),
value: Some(format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug")),
value_from: None,
},
);
// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.insert(
Expand Down
74 changes: 52 additions & 22 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,18 +326,18 @@ impl SparkApplication {
.with_config_map(log_config_map)
.build(),
);

result.push(
VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG)
.with_empty_dir(
None::<String>,
Some(product_logging::framework::calculate_log_volume_size_limit(
&[MAX_SPARK_LOG_FILES_SIZE, MAX_INIT_LOG_FILES_SIZE],
)),
)
.build(),
);
}
// This volume is also used by the containerdebug process so it must always be there.
result.push(
VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG)
.with_empty_dir(
None::<String>,
Some(product_logging::framework::calculate_log_volume_size_limit(
&[MAX_SPARK_LOG_FILES_SIZE, MAX_INIT_LOG_FILES_SIZE],
)),
)
.build(),
);

if !self.packages().is_empty() {
result.push(
Expand Down Expand Up @@ -466,14 +466,16 @@ impl SparkApplication {
mount_path: VOLUME_MOUNT_PATH_LOG_CONFIG.into(),
..VolumeMount::default()
});

mounts.push(VolumeMount {
name: VOLUME_MOUNT_NAME_LOG.into(),
mount_path: VOLUME_MOUNT_PATH_LOG.into(),
..VolumeMount::default()
});
}

// This is used at least by the containerdebug process.
// The volume is always there.
mounts.push(VolumeMount {
name: VOLUME_MOUNT_NAME_LOG.into(),
mount_path: VOLUME_MOUNT_PATH_LOG.into(),
..VolumeMount::default()
});

if !self.packages().is_empty() {
mounts.push(VolumeMount {
name: VOLUME_MOUNT_NAME_IVY2.into(),
Expand Down Expand Up @@ -527,9 +529,7 @@ impl SparkApplication {
let mode = &self.spec.mode;
let name = self.metadata.name.clone().context(ObjectHasNoNameSnafu)?;

let mut submit_cmd: Vec<String> = vec![];

submit_cmd.extend(vec![
let mut submit_cmd = vec![
"/stackable/spark/bin/spark-submit".to_string(),
"--verbose".to_string(),
"--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}".to_string(),
Expand All @@ -547,7 +547,7 @@ impl SparkApplication {
format!("--conf spark.driver.extraClassPath=/stackable/spark/extra-jars/*"),
format!("--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
format!("--conf spark.executor.extraClassPath=/stackable/spark/extra-jars/*"),
]);
];

// See https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
// for possible S3 related properties
Expand Down Expand Up @@ -676,7 +676,10 @@ impl SparkApplication {

submit_cmd.extend(self.spec.args.clone());

Ok(submit_cmd)
Ok(vec![
format!("containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"),
submit_cmd.join(" "),
])
}

pub fn env(
Expand All @@ -685,6 +688,27 @@ impl SparkApplication {
logdir: &Option<ResolvedLogDir>,
) -> Vec<EnvVar> {
let mut e: Vec<EnvVar> = self.spec.env.clone();

// These env variables enable the `containerdebug` process in driver and executor pods.
// More precisely, this process runs in the background of every `spark` container.
// - `CONTAINERDEBUG_LOG_DIRECTORY` - is the location where tracing information from the process
// is written. This directory is created by the process itself.
// - `_STACKABLE_PRE_HOOK` - is evaluated by the entrypoint script (run-spark.sh) in the Spark images
// before the actual JVM process is started. The result of this evaluation is that the
// `containerdebug` process is executed in the background.
e.extend(vec![
EnvVar {
name: "CONTAINERDEBUG_LOG_DIRECTORY".into(),
value: Some(format!("{VOLUME_MOUNT_PATH_LOG}/containerdebug")),
value_from: None,
},
EnvVar {
name: "_STACKABLE_PRE_HOOK".into(),
value: Some(format!( "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &")),
value_from: None,
},
]);

if self.requirements().is_some() {
e.push(EnvVar {
name: "PYTHONPATH".to_string(),
Expand Down Expand Up @@ -1385,6 +1409,12 @@ mod tests {
name: "executor-pod-template".into(),
..VolumeMount::default()
},
VolumeMount {
mount_path: "/stackable/log".into(),
mount_propagation: None,
name: "log".into(),
..VolumeMount::default()
},
VolumeMount {
mount_path: "/kerberos".into(),
mount_propagation: None,
Expand Down
12 changes: 9 additions & 3 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,13 @@ fn build_stateful_set(
.context(InvalidContainerNameSnafu)?
.image_from_product_image(resolved_product_image)
.resources(merged_config.resources.clone().into())
.command(vec!["/bin/bash".to_string()])
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(command_args(log_dir))
.add_container_port("http", 18080)
.add_container_port("metrics", METRICS_PORT.into())
Expand Down Expand Up @@ -751,10 +757,10 @@ fn command_args(logdir: &ResolvedLogDir) -> Vec<String> {
}

command.extend(vec![
format!("containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"),
format!("/stackable/spark/sbin/start-history-server.sh --properties-file {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME}"),
]);

vec![String::from("-c"), command.join(" && ")]
vec![command.join("\n")]
}

fn labels<'a, T>(
Expand Down
41 changes: 32 additions & 9 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,14 @@ fn init_containers(

Some(
jcb.image(job_image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(vec![args.join("\n")])
.add_volume_mount(VOLUME_MOUNT_NAME_JOB, VOLUME_MOUNT_PATH_JOB)
.context(AddVolumeMountSnafu)?
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
Expand Down Expand Up @@ -509,8 +515,14 @@ fn init_containers(
));

rcb.image(&spark_image.image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(vec![args.join("\n")])
.add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ)
.context(AddVolumeMountSnafu)?
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
Expand Down Expand Up @@ -549,8 +561,14 @@ fn init_containers(
}
Some(
tcb.image(&spark_image.image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(vec![args.join("\n")])
.add_volume_mount(STACKABLE_TRUST_STORE_NAME, STACKABLE_TRUST_STORE)
.context(AddVolumeMountSnafu)?
.resources(
Expand Down Expand Up @@ -858,12 +876,17 @@ fn spark_job(
let mut cb = ContainerBuilder::new(&SparkContainer::SparkSubmit.to_string())
.context(IllegalContainerNameSnafu)?;

let args = [job_commands.join(" ")];
let merged_env = spark_application.merged_env(SparkApplicationRole::Submit, env);

cb.image_from_product_image(spark_image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.args(vec![job_commands.join("\n")])
.resources(job_config.resources.clone().into())
.add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, logdir))
.context(AddVolumeMountSnafu)?
Expand Down
18 changes: 0 additions & 18 deletions tests/templates/kuttl/smoke/03-assert.yaml

This file was deleted.

17 changes: 11 additions & 6 deletions tests/templates/kuttl/smoke/20-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 900
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: history-api-check
timeout: 180
name: test-minio
status:
readyReplicas: 1
---
apiVersion: batch/v1
kind: Job
apiVersion: apps/v1
kind: Deployment
metadata:
name: history-api-check
name: eventlog-minio
status:
succeeded: 1
readyReplicas: 1
7 changes: 7 additions & 0 deletions tests/templates/kuttl/smoke/41-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
# This test checks if the containerdebug-state.json file is present and valid
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 60
commands:
- script: kubectl exec -n $NAMESPACE --container spark-history spark-history-node-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"'
15 changes: 15 additions & 0 deletions tests/templates/kuttl/smoke/50-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
# This test checks if the containerdebug-state.json file is present and valid.
#
# It needs to run as soon as the spark application has been submitted because
# once it is completed the pods are terminated.
#
# Unfortunately it's impossible to test the driver and the executor pods in a
# reliable way.
#
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
commands:
- script: |
SPARK_SUBMIT_POD=$(kubectl get -n $NAMESPACE pods --field-selector=status.phase=Running --selector batch.kubernetes.io/job-name=spark-pi-s3-1 -o jsonpath='{.items[0].metadata.name}')
kubectl exec -n $NAMESPACE --container spark-submit $SPARK_SUBMIT_POD -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"'
13 changes: 13 additions & 0 deletions tests/templates/kuttl/smoke/60-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
metadata:
name: history-api-check
timeout: 180
---
apiVersion: batch/v1
kind: Job
metadata:
name: history-api-check
status:
succeeded: 1
Loading