Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ All notable changes to this project will be documented in this file.
- `podOverrides`
- `affinity`

### Fixed

- An invalid `HdfsCluster` doesn't cause the operator to stop functioning (#[594]).

[#574]: https://github.com/stackabletech/hdfs-operator/pull/574
[#591]: https://github.com/stackabletech/hdfs-operator/pull/591

Expand Down
67 changes: 43 additions & 24 deletions rust/operator-binary/src/hdfs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use stackable_operator::{
},
kube::{
api::ObjectMeta,
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
Resource, ResourceExt,
},
Expand Down Expand Up @@ -241,6 +242,11 @@ pub enum Error {

#[snafu(display("invalid OPA configuration"))]
InvalidOpaConfig { source: security::opa::Error },

#[snafu(display("HdfsCluster object is invalid"))]
InvalidHdfsCluster {
source: error_boundary::InvalidObject,
},
}

impl ReconcilerError for Error {
Expand All @@ -256,23 +262,32 @@ pub struct Ctx {
pub product_config: ProductConfigManager,
}

pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperatorResult<Action> {
pub async fn reconcile_hdfs(
hdfs: Arc<DeserializeGuard<HdfsCluster>>,
ctx: Arc<Ctx>,
) -> HdfsOperatorResult<Action> {
tracing::info!("Starting reconcile");

let hdfs = hdfs
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidHdfsClusterSnafu)?;
let client = &ctx.client;

let resolved_product_image = hdfs
.spec
.image
.resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION);

let vector_aggregator_address = resolve_vector_aggregator_address(&hdfs, client)
let vector_aggregator_address = resolve_vector_aggregator_address(hdfs, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let validated_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&transform_all_roles_to_config(
hdfs.as_ref(),
hdfs,
hdfs.build_role_properties()
.context(BuildRolePropertiesSnafu)?,
)
Expand Down Expand Up @@ -302,7 +317,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

// The service account and rolebinding will be created per cluster
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
hdfs.as_ref(),
hdfs,
APP_NAME,
cluster_resources
.get_required_labels()
Expand All @@ -321,7 +336,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

let hdfs_opa_config = match &hdfs.spec.cluster_config.authorization {
Some(opa_config) => Some(
HdfsOpaConfig::from_opa_config(client, &hdfs, opa_config)
HdfsOpaConfig::from_opa_config(client, hdfs, opa_config)
.await
.context(InvalidOpaConfigSnafu)?,
),
Expand Down Expand Up @@ -354,9 +369,9 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
continue;
};

if let Some(content) = build_invalid_replica_message(&hdfs, &role, dfs_replication) {
if let Some(content) = build_invalid_replica_message(hdfs, &role, dfs_replication) {
publish_event(
&hdfs,
hdfs,
client,
"Reconcile",
"Invalid replicas",
Expand All @@ -368,7 +383,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat

for (rolegroup_name, rolegroup_config) in group_config.iter() {
let merged_config = role
.merged_config(&hdfs, rolegroup_name)
.merged_config(hdfs, rolegroup_name)
.context(ConfigMergeSnafu)?;

let env_overrides = rolegroup_config.get(&PropertyNameKind::Env);
Expand All @@ -379,25 +394,25 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
// to avoid the compiler error "E0716 (temporary value dropped while borrowed)".
let mut metadata = ObjectMetaBuilder::new();
let metadata = metadata
.name_and_namespace(hdfs.as_ref())
.name_and_namespace(hdfs)
.name(rolegroup_ref.object_name())
.ownerreference_from_resource(hdfs.as_ref(), None, Some(true))
.ownerreference_from_resource(hdfs, None, Some(true))
.with_context(|_| ObjectMissingMetadataForOwnerRefSnafu {
obj_ref: ObjectRef::from_obj(hdfs.as_ref()),
obj_ref: ObjectRef::from_obj(hdfs),
})?
.with_recommended_labels(build_recommended_labels(
hdfs.as_ref(),
hdfs,
RESOURCE_MANAGER_HDFS_CONTROLLER,
&resolved_product_image.app_version_label,
&rolegroup_ref.role,
&rolegroup_ref.role_group,
))
.context(ObjectMetaSnafu)?;

let rg_service = rolegroup_service(&hdfs, metadata, &role, &rolegroup_ref)?;
let rg_service = rolegroup_service(hdfs, metadata, &role, &rolegroup_ref)?;

let rg_configmap = rolegroup_config_map(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
metadata,
&rolegroup_ref,
Expand All @@ -410,7 +425,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
)?;

let rg_statefulset = rolegroup_statefulset(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
metadata,
&role,
Expand Down Expand Up @@ -463,7 +478,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &hdfs, &role, client, &mut cluster_resources)
add_pdbs(pdb, hdfs, &role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}
Expand All @@ -472,7 +487,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
// Discovery CM will fail to build until the rest of the cluster has been deployed, so do it last
// so that failure won't inhibit the rest of the cluster from booting up.
let discovery_cm = build_discovery_configmap(
&hdfs,
hdfs,
&client.kubernetes_cluster_info,
HDFS_CONTROLLER,
&hdfs
Expand All @@ -496,10 +511,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
ClusterOperationsConditionBuilder::new(&hdfs.spec.cluster_operation);

let status = HdfsClusterStatus {
conditions: compute_conditions(
hdfs.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
conditions: compute_conditions(hdfs, &[&ss_cond_builder, &cluster_operation_cond_builder]),
// FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized
deployed_product_version: Some(
hdfs.status
Expand Down Expand Up @@ -539,7 +551,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
.context(DeleteOrphanedResourcesSnafu)?;
}
client
.apply_patch_status(OPERATOR_NAME, &*hdfs, &status)
.apply_patch_status(OPERATOR_NAME, hdfs, &status)
.await
.context(ApplyStatusSnafu)?;

Expand Down Expand Up @@ -893,8 +905,15 @@ fn rolegroup_statefulset(
})
}

pub fn error_policy(_obj: Arc<HdfsCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<HdfsCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidHdfsCluster { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

#[cfg(test)]
Expand Down
9 changes: 5 additions & 4 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use stackable_operator::{
apps::v1::StatefulSet,
core::v1::{ConfigMap, Service},
},
kube::core::DeserializeGuard,
kube::{
api::PartialObjectMeta,
runtime::{reflector, watcher, Controller},
Expand Down Expand Up @@ -109,19 +110,19 @@ pub async fn create_controller(
});

let hdfs_controller = Controller::new(
namespace.get_api::<HdfsCluster>(&client),
namespace.get_api::<DeserializeGuard<HdfsCluster>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<StatefulSet>(&client),
namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Service>(&client),
namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ConfigMap>(&client),
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand Down
Loading