Skip to content

Commit 4c9c06e

Browse files
committed
implement connect server status tracking
1 parent e559188 commit 4c9c06e

File tree

8 files changed

+107
-31
lines changed

8 files changed

+107
-31
lines changed

Cargo.lock

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 3 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ indoc = "2"
3535

3636
# TODO: revert before merging
3737
[patch."https://github.com/stackabletech/operator-rs.git"]
38-
stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
39-
# stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
38+
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
39+
stackable-operator = { path = "../operator-rs/crates/stackable-operator" }

crate-hashes.json

Lines changed: 0 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2150,10 +2150,58 @@ spec:
21502150
required:
21512151
- image
21522152
type: object
2153+
status:
2154+
nullable: true
2155+
properties:
2156+
conditions:
2157+
default: []
2158+
items:
2159+
properties:
2160+
lastTransitionTime:
2161+
description: Last time the condition transitioned from one status to another.
2162+
format: date-time
2163+
nullable: true
2164+
type: string
2165+
lastUpdateTime:
2166+
description: The last time this condition was updated.
2167+
format: date-time
2168+
nullable: true
2169+
type: string
2170+
message:
2171+
description: A human readable message indicating details about the transition.
2172+
nullable: true
2173+
type: string
2174+
reason:
2175+
description: The reason for the condition's last transition.
2176+
nullable: true
2177+
type: string
2178+
status:
2179+
description: Status of the condition, one of True, False, Unknown.
2180+
enum:
2181+
- 'True'
2182+
- 'False'
2183+
- Unknown
2184+
type: string
2185+
type:
2186+
description: Type of deployment condition.
2187+
enum:
2188+
- Available
2189+
- Degraded
2190+
- Progressing
2191+
- ReconciliationPaused
2192+
- Stopped
2193+
type: string
2194+
required:
2195+
- status
2196+
- type
2197+
type: object
2198+
type: array
2199+
type: object
21532200
required:
21542201
- spec
21552202
title: SparkConnectServer
21562203
type: object
21572204
served: true
21582205
storage: true
2159-
subresources: {}
2206+
subresources:
2207+
status: {}

deploy/helm/spark-k8s-operator/templates/roles.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ rules:
113113
- spark.stackable.tech
114114
resources:
115115
- sparkapplications/status
116+
- sparkconnectservers/status
116117
verbs:
117118
- patch
118119
- apiGroups:

rust/operator-binary/src/connect/controller.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ use stackable_operator::{
4444
},
4545
},
4646
role_utils::{JavaCommonConfig, JvmArgumentOverrides, RoleGroupRef},
47+
status::condition::{
48+
compute_conditions, deployment::DeploymentConditionBuilder,
49+
operations::ClusterOperationsConditionBuilder,
50+
},
4751
time::Duration,
4852
};
4953
use strum::{Display, EnumDiscriminants, IntoStaticStr};
@@ -53,6 +57,7 @@ use super::crd::{
5357
CONNECT_CONTROLLER_NAME, CONNECT_GRPC_PORT, CONNECT_SERVER_ROLE_NAME, CONNECT_UI_PORT,
5458
};
5559
use crate::{
60+
connect::crd::SparkConnectServerStatus,
5661
crd::constants::{
5762
APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE,
5863
METRICS_PORT, OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
@@ -69,6 +74,12 @@ const DUMMY_SPARK_CONNECT_GROUP_NAME: &str = "default";
6974
#[strum_discriminants(derive(IntoStaticStr))]
7075
#[allow(clippy::enum_variant_names)]
7176
pub enum Error {
77+
#[snafu(display("failed to update status of spark connect server {name}"))]
78+
ApplyStatus {
79+
source: stackable_operator::client::Error,
80+
name: String,
81+
},
82+
7283
#[snafu(display("failed to merge jvm argument overrides"))]
7384
MergeJvmArgumentOverrides {
7485
source: stackable_operator::role_utils::Error,
@@ -311,16 +322,35 @@ pub async fn reconcile(
311322
&config_map,
312323
args,
313324
)?;
314-
cluster_resources
315-
.add(client, deployment)
316-
.await
317-
.context(ApplyDeploymentSnafu)?;
325+
326+
let mut ss_cond_builder = DeploymentConditionBuilder::default();
327+
328+
ss_cond_builder.add(
329+
cluster_resources
330+
.add(client, deployment)
331+
.await
332+
.context(ApplyDeploymentSnafu)?,
333+
);
318334

319335
cluster_resources
320336
.delete_orphaned_resources(client)
321337
.await
322338
.context(DeleteOrphanedResourcesSnafu)?;
323339

340+
let cluster_operation_cond_builder =
341+
ClusterOperationsConditionBuilder::new(&scs.spec.cluster_operation);
342+
343+
let status = SparkConnectServerStatus {
344+
conditions: compute_conditions(scs, &[&ss_cond_builder, &cluster_operation_cond_builder]),
345+
};
346+
347+
client
348+
.apply_patch_status(OPERATOR_NAME, scs, &status)
349+
.await
350+
.context(ApplyStatusSnafu {
351+
name: scs.name_any(),
352+
})?;
353+
324354
Ok(Action::await_change())
325355
}
326356

rust/operator-binary/src/connect/crd.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use stackable_operator::{
1919
product_logging::{self, spec::Logging},
2020
role_utils::{CommonConfiguration, JavaCommonConfig},
2121
schemars::{self, JsonSchema},
22+
status::condition::{ClusterCondition, HasStatusCondition},
2223
time::Duration,
2324
};
2425
use stackable_versioned::versioned;
@@ -67,6 +68,7 @@ pub mod versioned {
6768
kind = "SparkConnectServer",
6869
plural = "sparkconnectservers",
6970
shortname = "sparkconnect",
71+
status = "SparkConnectServerStatus",
7072
namespaced,
7173
crates(
7274
kube_core = "stackable_operator::kube::core",
@@ -245,3 +247,19 @@ impl v1alpha1::SparkConnectServer {
245247
.context(FragmentValidationFailureSnafu)
246248
}
247249
}
250+
251+
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)]
252+
#[serde(rename_all = "camelCase")]
253+
pub struct SparkConnectServerStatus {
254+
#[serde(default)]
255+
pub conditions: Vec<ClusterCondition>,
256+
}
257+
258+
impl HasStatusCondition for v1alpha1::SparkConnectServer {
259+
fn conditions(&self) -> Vec<ClusterCondition> {
260+
match &self.status {
261+
Some(status) => status.conditions.clone(),
262+
None => vec![],
263+
}
264+
}
265+
}

0 commit comments

Comments
 (0)