From fae3c0bb57a3e46b077df0c8ef92a7b85700ac90 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:29:30 +0200 Subject: [PATCH 01/10] fix: ensure a spark application can only be submitted once --- .../src/spark_k8s_controller.rs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index e9a794b5..3abf8781 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -10,7 +10,7 @@ use product_config::writer::to_java_properties_string; use stackable_operator::time::Duration; use stackable_spark_k8s_crd::{ constants::*, s3logdir::S3LogDir, tlscerts, RoleConfig, SparkApplication, SparkApplicationRole, - SparkContainer, SubmitConfig, + SparkApplicationStatus, SparkContainer, SubmitConfig, }; use crate::product_logging::{self, resolve_vector_aggregator_address}; @@ -155,6 +155,11 @@ pub enum Error { CreateVolumes { source: stackable_spark_k8s_crd::Error, }, + #[snafu(display("Failed to update status for application [{name}]"))] + ApplySparkApplicationStatus { + source: stackable_operator::client::Error, + name: String, + }, } type Result = std::result::Result; @@ -170,6 +175,16 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) let client = &ctx.client; + // Fix for #457 + // Skip reconcyling the SparkApplication if it has a non empty status. + if spark_application.status.is_some() { + tracing::info!( + "Skip reconciling SparkApplication [{}] with non empty status", + spark_application.name_any() + ); + return Ok(Action::await_change()); + } + let opt_s3conn = match spark_application.spec.s3connection.as_ref() { Some(s3bd) => s3bd .resolve( @@ -346,6 +361,22 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .await .context(ApplyApplicationSnafu)?; + // Fix for #457 + // Update the status of the SparkApplication immediately after creating the Job + // to ensure the Job is not created again after being recycled by Kubernetes. + client + .apply_patch_status( + CONTROLLER_NAME, + spark_application.as_ref(), + &SparkApplicationStatus { + phase: "Unknown".to_string(), + }, + ) + .await + .with_context(|_| ApplySparkApplicationStatusSnafu { + name: spark_application.name_any(), + })?; + Ok(Action::await_change()) } From 2629b757acf94bf97e788b9b9ec68a167f0f628c Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:43:35 +0200 Subject: [PATCH 02/10] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b9f308c..a186fa33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file. ### Fixed - Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]). +- Ensure SparkApplications can only create a single single submit Job. Fix for #457 ([#460]). ### Removed @@ -24,6 +25,7 @@ All notable changes to this project will be documented in this file. [#450]: https://github.com/stackabletech/spark-k8s-operator/pull/450 [#451]: https://github.com/stackabletech/spark-k8s-operator/pull/451 [#459]: https://github.com/stackabletech/spark-k8s-operator/pull/459 +[#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460 ## [24.7.0] - 2024-07-24 From 3a589def22412587a1439676c7e469dfe3488b41 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 12 Sep 2024 17:24:38 +0200 Subject: [PATCH 03/10] add doc page for app status --- .../pages/usage-guide/operations/applications.adoc | 7 +++++++ docs/modules/spark-k8s/partials/nav.adoc | 1 + 2 files changed, 8 insertions(+) create mode 100644 docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc new file mode 100644 index 00000000..a677e4df --- /dev/null +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc @@ -0,0 +1,7 @@ += Spark Applications + +Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start. + +Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transisions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase. + +The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. diff --git a/docs/modules/spark-k8s/partials/nav.adoc b/docs/modules/spark-k8s/partials/nav.adoc index 2fb175f1..a514d14d 100644 --- a/docs/modules/spark-k8s/partials/nav.adoc +++ b/docs/modules/spark-k8s/partials/nav.adoc @@ -11,6 +11,7 @@ ** xref:spark-k8s:usage-guide/history-server.adoc[] ** xref:spark-k8s:usage-guide/examples.adoc[] ** xref:spark-k8s:usage-guide/operations/index.adoc[] +*** xref:spark-k8s:usage-guide/operations/applications.adoc[] *** xref:spark-k8s:usage-guide/operations/pod-placement.adoc[] *** xref:spark-k8s:usage-guide/operations/pod-disruptions.adoc[] *** xref:spark-k8s:usage-guide/operations/graceful-shutdown.adoc[] From 94dae3a79be825c9248adfef6f134198942ffc04 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 12 Sep 2024 17:31:09 +0200 Subject: [PATCH 04/10] add callout --- .../spark-k8s/pages/usage-guide/operations/applications.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc index a677e4df..ea60859b 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc @@ -4,4 +4,4 @@ Spark applications are submitted to the Spark Operator as SparkApplication resou Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transisions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase. -The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. +NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. From 149d9d0b5837b67f428213baa1e578145d0082a2 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 12 Sep 2024 17:34:57 +0200 Subject: [PATCH 05/10] fix typos --- CHANGELOG.md | 2 +- .../spark-k8s/pages/usage-guide/operations/applications.adoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a186fa33..b1171d3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ All notable changes to this project will be documented in this file. ### Fixed - Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]). -- Ensure SparkApplications can only create a single single submit Job. Fix for #457 ([#460]). +- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]). ### Removed diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc index ea60859b..ab38f728 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc @@ -2,6 +2,6 @@ Spark applications are submitted to the Spark Operator as SparkApplication resources. These resources are used to define the configuration of the Spark job, including the image to use, the main application file, and the number of executors to start. -Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transisions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase. +Upon creation, the application's status set to `Unknown`. As the operator creates the necessary resources, the status of the application transitions through different phases that reflect the phase of the driver Pod. A successful application will eventually reach the `Succeeded` phase. NOTE: The operator will never reconcile an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. From 350b1864804617447c38b2240d0c1d78ed8eb278 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 16 Sep 2024 07:43:12 -0400 Subject: [PATCH 06/10] Update rust/operator-binary/src/spark_k8s_controller.rs Co-authored-by: Sebastian Bernauer --- rust/operator-binary/src/spark_k8s_controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 3abf8781..f17ba824 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -179,8 +179,8 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) // Skip reconcyling the SparkApplication if it has a non empty status. if spark_application.status.is_some() { tracing::info!( - "Skip reconciling SparkApplication [{}] with non empty status", - spark_application.name_any() + spark_application = spark_application.name_any(), + "Skipped reconciling SparkApplication with non empty status" ); return Ok(Action::await_change()); } From b5676f8c0d5ec59f636faecbce27a312aacb058f Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 16 Sep 2024 07:43:18 -0400 Subject: [PATCH 07/10] Update rust/operator-binary/src/spark_k8s_controller.rs Co-authored-by: Sebastian Bernauer --- rust/operator-binary/src/spark_k8s_controller.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index f17ba824..f8038bdc 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -175,8 +175,6 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) let client = &ctx.client; - // Fix for #457 - // Skip reconcyling the SparkApplication if it has a non empty status. if spark_application.status.is_some() { tracing::info!( spark_application = spark_application.name_any(), From 4264995cb6adf66018041b38756a96049b6f1f27 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 16 Sep 2024 13:46:59 +0200 Subject: [PATCH 08/10] implement review feedback --- rust/crd/src/lib.rs | 15 +++++++++++++++ rust/operator-binary/src/spark_k8s_controller.rs | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 0bdfc051..8d195f74 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -230,6 +230,21 @@ pub struct JobDependencies { } impl SparkApplication { + /// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`. + /// + /// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is + /// restarted it would re-create the Job, resulting in the Spark job running multiple times. This function assumes + /// that the [`SparkApplication`]'s status will always be set when the Kubernetes Job is created. It therefore + /// checks if the status is set to determine if the Job was already created in the past. + /// + /// See the bug report [#457](https://github.com/stackabletech/spark-k8s-operator/issues/457) for details. + pub fn k8s_job_has_been_created(&self) -> bool { + self.status + .as_ref() + .map(|s| !s.phase.is_empty()) + .unwrap_or_default() + } + pub fn submit_job_config_map_name(&self) -> String { format!("{app_name}-submit-job", app_name = self.name_any()) } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index f8038bdc..77a7f60c 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -175,7 +175,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) let client = &ctx.client; - if spark_application.status.is_some() { + if spark_application.k8s_job_has_been_created() { tracing::info!( spark_application = spark_application.name_any(), "Skipped reconciling SparkApplication with non empty status" From 00ed783c7ebec1924222b63ac21de71b7c4671cf Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 16 Sep 2024 08:12:06 -0400 Subject: [PATCH 09/10] Update rust/operator-binary/src/spark_k8s_controller.rs Co-authored-by: Sebastian Bernauer --- rust/operator-binary/src/spark_k8s_controller.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 77a7f60c..cd42d3a6 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -155,6 +155,7 @@ pub enum Error { CreateVolumes { source: stackable_spark_k8s_crd::Error, }, + #[snafu(display("Failed to update status for application [{name}]"))] ApplySparkApplicationStatus { source: stackable_operator::client::Error, From 3a4ca93e083f552be6e28274ae40d9efd754f3f0 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 16 Sep 2024 08:36:06 -0400 Subject: [PATCH 10/10] Update rust/operator-binary/src/spark_k8s_controller.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- rust/operator-binary/src/spark_k8s_controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index cd42d3a6..a890a6c9 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -156,7 +156,7 @@ pub enum Error { source: stackable_spark_k8s_crd::Error, }, - #[snafu(display("Failed to update status for application [{name}]"))] + #[snafu(display("Failed to update status for application {name:?}"))] ApplySparkApplicationStatus { source: stackable_operator::client::Error, name: String,