Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
- `podOverrides`
- `affinity`

### Fixed

- Invalid `SupersetCluster`, `DruidConnection` or `AuthenticationClass` objects don't stop the operator from reconciling ([#551]).

[#528]: https://github.com/stackabletech/superset-operator/pull/528
[#530]: https://github.com/stackabletech/superset-operator/pull/530
[#549]: https://github.com/stackabletech/superset-operator/pull/549
[#551]: https://github.com/stackabletech/superset-operator/pull/551

## [24.7.0] - 2024-07-24

Expand Down
46 changes: 30 additions & 16 deletions rust/operator-binary/src/druid_connection_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use stackable_operator::{
},
kube::{
core::DynamicObject,
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
ResourceExt,
},
Expand Down Expand Up @@ -84,6 +85,11 @@ pub enum Error {
SupersetClusterRetrieval {
source: stackable_operator::client::Error,
},

#[snafu(display("DruidConnection object is invalid"))]
InvalidDruidConnection {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -110,20 +116,25 @@ impl ReconcilerError for Error {
Error::ApplyServiceAccount { .. } => None,
Error::ApplyRoleBinding { .. } => None,
Error::SupersetClusterRetrieval { .. } => None,
Error::InvalidDruidConnection { .. } => None,
}
}
}

pub async fn reconcile_druid_connection(
druid_connection: Arc<DruidConnection>,
druid_connection: Arc<DeserializeGuard<DruidConnection>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconciling DruidConnections");

let druid_connection = druid_connection
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidDruidConnectionSnafu)?;
let client = &ctx.client;

let (rbac_sa, rbac_rolebinding) =
rbac::build_rbac_resources(druid_connection.as_ref(), APP_NAME);
let (rbac_sa, rbac_rolebinding) = rbac::build_rbac_resources(druid_connection, APP_NAME);
client
.apply_patch(DRUID_CONNECTION_CONTROLLER_NAME, &rbac_sa, &rbac_sa)
.await
Expand All @@ -146,7 +157,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.druid_name(),
&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
)
Expand All @@ -159,7 +170,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.superset_name(),
&druid_connection.superset_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
)
Expand All @@ -183,7 +194,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.druid_name(),
&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
client,
Expand All @@ -195,7 +206,7 @@ pub async fn reconcile_druid_connection(
.resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION);
let job = build_import_job(
&superset_cluster,
&druid_connection,
druid_connection,
&resolved_product_image,
&sqlalchemy_str,
&rbac_sa.name_any(),
Expand All @@ -209,7 +220,7 @@ pub async fn reconcile_druid_connection(
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
druid_connection,
&s.importing(),
)
.await
Expand All @@ -236,11 +247,7 @@ pub async fn reconcile_druid_connection(

if let Some(ns) = new_status {
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
&ns,
)
.apply_patch_status(DRUID_CONNECTION_CONTROLLER_NAME, druid_connection, &ns)
.await
.context(ApplyStatusSnafu)?;
}
Expand All @@ -253,7 +260,7 @@ pub async fn reconcile_druid_connection(
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
druid_connection,
&DruidConnectionStatus::new(),
)
.await
Expand Down Expand Up @@ -360,6 +367,13 @@ async fn build_import_job(
Ok(job)
}

pub fn error_policy(_obj: Arc<DruidConnection>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<DruidConnection>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidDruidConnection { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}
74 changes: 52 additions & 22 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use stackable_operator::{
core::v1::{ConfigMap, Service},
},
kube::{
core::DeserializeGuard,
runtime::{reflector::ObjectRef, watcher, Controller},
ResourceExt,
},
Expand Down Expand Up @@ -86,28 +87,28 @@ async fn main() -> anyhow::Result<()> {
.await?;

let superset_controller_builder = Controller::new(
watch_namespace.get_api::<SupersetCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<SupersetCluster>>(&client),
watcher::Config::default(),
);
let superset_store_1 = superset_controller_builder.store();
let superset_controller = superset_controller_builder
.owns(
watch_namespace.get_api::<Service>(&client),
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<StatefulSet>(&client),
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
.watches(
client.get_api::<AuthenticationClass>(&()),
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
watcher::Config::default(),
move |authentication_class| {
superset_store_1
.state()
.into_iter()
.filter(move |superset: &Arc<SupersetCluster>| {
.filter(move |superset| {
references_authentication_class(superset, &authentication_class)
})
.map(|superset| ObjectRef::from_obj(&*superset))
Expand All @@ -130,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
});

let druid_connection_controller_builder = Controller::new(
watch_namespace.get_api::<DruidConnection>(&client),
watch_namespace.get_api::<DeserializeGuard<DruidConnection>>(&client),
watcher::Config::default(),
);
let druid_connection_store_1 = druid_connection_controller_builder.store();
Expand All @@ -139,46 +140,38 @@ async fn main() -> anyhow::Result<()> {
let druid_connection_controller = druid_connection_controller_builder
.shutdown_on_signal()
.watches(
watch_namespace.get_api::<SupersetCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<SupersetCluster>>(&client),
watcher::Config::default(),
move |superset_cluster| {
druid_connection_store_1
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.superset_name() == superset_cluster.name_any()
&& druid_connection.superset_namespace().ok()
== superset_cluster.namespace()
valid_druid_connection(&superset_cluster, druid_connection)
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
)
.watches(
watch_namespace.get_api::<Job>(&client),
watch_namespace.get_api::<DeserializeGuard<Job>>(&client),
watcher::Config::default(),
move |job| {
druid_connection_store_2
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.metadata.namespace == job.metadata.namespace
&& Some(druid_connection.job_name()) == job.metadata.name
})
.filter(move |druid_connection| valid_druid_job(druid_connection, &job))
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
)
.watches(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
move |config_map| {
druid_connection_store_3
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.druid_namespace().ok()
== config_map.metadata.namespace
&& Some(druid_connection.druid_name())
== config_map.metadata.name
valid_druid_connection_namespace(druid_connection, &config_map)
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
Expand Down Expand Up @@ -208,9 +201,13 @@ async fn main() -> anyhow::Result<()> {
}

fn references_authentication_class(
superset: &SupersetCluster,
authentication_class: &AuthenticationClass,
superset: &DeserializeGuard<SupersetCluster>,
authentication_class: &DeserializeGuard<AuthenticationClass>,
) -> bool {
let Ok(superset) = &superset.0 else {
return false;
};

let authentication_class_name = authentication_class.name_any();
superset
.spec
Expand All @@ -219,3 +216,36 @@ fn references_authentication_class(
.iter()
.any(|c| c.common.authentication_class_name() == &authentication_class_name)
}

fn valid_druid_connection(
superset_cluster: &DeserializeGuard<SupersetCluster>,
druid_connection: &DeserializeGuard<DruidConnection>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.superset_name() == superset_cluster.name_any()
&& druid_connection.superset_namespace().ok() == superset_cluster.namespace()
}

fn valid_druid_connection_namespace(
druid_connection: &DeserializeGuard<DruidConnection>,
config_map: &DeserializeGuard<ConfigMap>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.druid_namespace().ok() == config_map.meta().namespace

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:238:59
    |
238 |     druid_connection.druid_namespace().ok() == config_map.meta().namespace
    |                                                           ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
238 |     druid_connection.druid_namespace().ok() == config_map.meta_mut().namespace
    |                                                           ~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:238:59
    |
238 |     druid_connection.druid_namespace().ok() == config_map.meta().namespace
    |                                                           ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
238 |     druid_connection.druid_namespace().ok() == config_map.meta_mut().namespace
    |                                                           ~~~~~~~~

&& Some(druid_connection.druid_name()) == config_map.meta().name

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:239:62
    |
239 |         && Some(druid_connection.druid_name()) == config_map.meta().name
    |                                                              ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
239 |         && Some(druid_connection.druid_name()) == config_map.meta_mut().name
    |                                                              ~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:239:62
    |
239 |         && Some(druid_connection.druid_name()) == config_map.meta().name
    |                                                              ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
239 |         && Some(druid_connection.druid_name()) == config_map.meta_mut().name
    |                                                              ~~~~~~~~

}

fn valid_druid_job(
druid_connection: &DeserializeGuard<DruidConnection>,
job: &DeserializeGuard<Job>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.metadata.namespace == job.meta().namespace

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:249:48
    |
249 |     druid_connection.metadata.namespace == job.meta().namespace
    |                                                ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
249 |     druid_connection.metadata.namespace == job.meta_mut().namespace
    |                                                ~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:249:48
    |
249 |     druid_connection.metadata.namespace == job.meta().namespace
    |                                                ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
249 |     druid_connection.metadata.namespace == job.meta_mut().namespace
    |                                                ~~~~~~~~

&& Some(druid_connection.job_name()) == job.meta().name

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:250:53
    |
250 |         && Some(druid_connection.job_name()) == job.meta().name
    |                                                     ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
250 |         && Some(druid_connection.job_name()) == job.meta_mut().name
    |                                                     ~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [clippy] reported by reviewdog 🐶

error[E0599]: no method named `meta` found for reference `&stackable_operator::kube::kube_core::DeserializeGuard` in the current scope
   --> rust/operator-binary/src/main.rs:250:53
    |
250 |         && Some(druid_connection.job_name()) == job.meta().name
    |                                                     ^^^^
    |
    = help: items from traits can only be used if the trait is in scope
help: trait `Resource` which provides `meta` is implemented but not in scope; perhaps you want to import it
    |
1   + use stackable_operator::kube::Resource;
    |
help: there is a method `meta_mut` with a similar name
    |
250 |         && Some(druid_connection.job_name()) == job.meta_mut().name
    |                                                     ~~~~~~~~

}
Loading
Loading