Skip to content

Commit 13d2553

Browse files
committed
Merge remote-tracking branch 'origin/main' into op-rs-0.79.0
2 parents d7b1f95 + 9479339 commit 13d2553

File tree

3 files changed

+45
-29
lines changed

3 files changed

+45
-29
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ All notable changes to this project will be documented in this file.
2222
### Fixed
2323

2424
- BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory ([#632]).
25+
- Failing to parse one `DruidCluster`/`AuthenticationClass` should no longer cause the whole operator to stop functioning ([#638]).
2526

2627
### Removed
2728

@@ -34,6 +35,7 @@ All notable changes to this project will be documented in this file.
3435
[#631]: https://github.com/stackabletech/druid-operator/pull/631
3536
[#632]: https://github.com/stackabletech/druid-operator/pull/632
3637
[#637]: https://github.com/stackabletech/druid-operator/pull/637
38+
[#638]: https://github.com/stackabletech/druid-operator/pull/638
3739

3840
## [24.7.0] - 2024-07-24
3941

rust/operator-binary/src/druid_controller.rs

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Ensures that `Pod`s are configured and running for each [`DruidCluster`]
22
use std::{
33
collections::{BTreeMap, HashMap},
4-
ops::Deref,
54
str::FromStr,
65
sync::Arc,
76
};
@@ -49,6 +48,7 @@ use stackable_operator::{
4948
DeepMerge,
5049
},
5150
kube::{
51+
core::{error_boundary, DeserializeGuard},
5252
runtime::{controller::Action, reflector::ObjectRef},
5353
Resource,
5454
},
@@ -358,6 +358,11 @@ pub enum Error {
358358
AddVolumeMount {
359359
source: builder::pod::container::Error,
360360
},
361+
362+
#[snafu(display("DruidCluster object is invalid"))]
363+
InvalidDruidCluster {
364+
source: error_boundary::InvalidObject,
365+
},
361366
}
362367

363368
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -368,8 +373,17 @@ impl ReconcilerError for Error {
368373
}
369374
}
370375

371-
pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<Action> {
376+
pub async fn reconcile_druid(
377+
druid: Arc<DeserializeGuard<DruidCluster>>,
378+
ctx: Arc<Ctx>,
379+
) -> Result<Action> {
372380
tracing::info!("Starting reconcile");
381+
let druid = druid
382+
.0
383+
.as_ref()
384+
.map_err(error_boundary::InvalidObject::clone)
385+
.context(InvalidDruidClusterSnafu)?;
386+
373387
let client = &ctx.client;
374388
let namespace = &druid
375389
.metadata
@@ -394,7 +408,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
394408
cm_name: zk_confmap.clone(),
395409
})?;
396410

397-
let vector_aggregator_address = resolve_vector_aggregator_address(&druid, client)
411+
let vector_aggregator_address = resolve_vector_aggregator_address(druid, client)
398412
.await
399413
.context(ResolveVectorAggregatorAddressSnafu)?;
400414

@@ -404,12 +418,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
404418
{
405419
Some(
406420
opa_config
407-
.full_document_url_from_config_map(
408-
client,
409-
druid.deref(),
410-
Some("allow"),
411-
OpaApiVersion::V1,
412-
)
421+
.full_document_url_from_config_map(client, druid, Some("allow"), OpaApiVersion::V1)
413422
.await
414423
.context(GetOpaConnStringSnafu {
415424
cm_name: opa_config.config_map_name.clone(),
@@ -444,12 +453,12 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
444453
.context(AuthenticationClassRetrievalSnafu)?;
445454

446455
let druid_tls_security =
447-
DruidTlsSecurity::new_from_druid_cluster(&druid, &resolved_auth_classes);
456+
DruidTlsSecurity::new_from_druid_cluster(druid, &resolved_auth_classes);
448457

449458
let druid_auth_config = DruidAuthenticationConfig::try_from(resolved_auth_classes)
450459
.context(InvalidDruidAuthenticationConfigSnafu)?;
451460

452-
let role_config = transform_all_roles_to_config(druid.as_ref(), druid.build_role_properties());
461+
let role_config = transform_all_roles_to_config(druid, druid.build_role_properties());
453462
let validated_role_config = validate_all_roles_and_groups_config(
454463
&resolved_product_image.product_version,
455464
&role_config.context(ProductConfigTransformSnafu)?,
@@ -471,7 +480,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
471480
let merged_config = druid.merged_config().context(FailedToResolveConfigSnafu)?;
472481

473482
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
474-
druid.as_ref(),
483+
druid,
475484
APP_NAME,
476485
cluster_resources
477486
.get_required_labels()
@@ -495,7 +504,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
495504
})?;
496505

497506
let role_service = build_role_service(
498-
&druid,
507+
druid,
499508
&resolved_product_image,
500509
&druid_role,
501510
&druid_tls_security,
@@ -505,13 +514,13 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
505514
.await
506515
.context(ApplyRoleServiceSnafu)?;
507516

508-
create_shared_internal_secret(&druid, client, DRUID_CONTROLLER_NAME)
517+
create_shared_internal_secret(druid, client, DRUID_CONTROLLER_NAME)
509518
.await
510519
.context(FailedInternalSecretCreationSnafu)?;
511520

512521
for (rolegroup_name, rolegroup_config) in role_config.iter() {
513522
let rolegroup = RoleGroupRef {
514-
cluster: ObjectRef::from_obj(&*druid),
523+
cluster: ObjectRef::from_obj(druid),
515524
role: role_name.into(),
516525
role_group: rolegroup_name.into(),
517526
};
@@ -521,13 +530,13 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
521530
.context(FailedToResolveConfigSnafu)?;
522531

523532
let rg_service = build_rolegroup_services(
524-
&druid,
533+
druid,
525534
&resolved_product_image,
526535
&rolegroup,
527536
&druid_tls_security,
528537
)?;
529538
let rg_configmap = build_rolegroup_config_map(
530-
&druid,
539+
druid,
531540
&resolved_product_image,
532541
&rolegroup,
533542
rolegroup_config,
@@ -541,7 +550,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
541550
&druid_auth_config,
542551
)?;
543552
let rg_statefulset = build_rolegroup_statefulset(
544-
&druid,
553+
druid,
545554
&resolved_product_image,
546555
&druid_role,
547556
&rolegroup,
@@ -577,7 +586,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
577586

578587
add_pdbs(
579588
&role_config.pod_disruption_budget,
580-
&druid,
589+
druid,
581590
&druid_role,
582591
client,
583592
&mut cluster_resources,
@@ -588,8 +597,8 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
588597

589598
// discovery
590599
for discovery_cm in build_discovery_configmaps(
591-
&druid,
592-
&*druid,
600+
druid,
601+
druid,
593602
&client.kubernetes_cluster_info,
594603
&resolved_product_image,
595604
&druid_tls_security,
@@ -607,18 +616,15 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
607616
ClusterOperationsConditionBuilder::new(&druid.spec.cluster_operation);
608617

609618
let status = DruidClusterStatus {
610-
conditions: compute_conditions(
611-
druid.as_ref(),
612-
&[&ss_cond_builder, &cluster_operation_cond_builder],
613-
),
619+
conditions: compute_conditions(druid, &[&ss_cond_builder, &cluster_operation_cond_builder]),
614620
};
615621

616622
cluster_resources
617623
.delete_orphaned_resources(client)
618624
.await
619625
.context(DeleteOrphanedResourcesSnafu)?;
620626
client
621-
.apply_patch_status(OPERATOR_NAME, &*druid, &status)
627+
.apply_patch_status(OPERATOR_NAME, druid, &status)
622628
.await
623629
.context(ApplyStatusSnafu)?;
624630

@@ -1298,8 +1304,15 @@ fn add_log_volume_and_volume_mounts(
12981304
Ok(())
12991305
}
13001306

1301-
pub fn error_policy(_obj: Arc<DruidCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
1302-
Action::requeue(*Duration::from_secs(5))
1307+
pub fn error_policy(
1308+
_obj: Arc<DeserializeGuard<DruidCluster>>,
1309+
error: &Error,
1310+
_ctx: Arc<Ctx>,
1311+
) -> Action {
1312+
match error {
1313+
Error::InvalidDruidCluster { .. } => Action::await_change(),
1314+
_ => Action::requeue(*Duration::from_secs(5)),
1315+
}
13031316
}
13041317

13051318
#[cfg(test)]

rust/operator-binary/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use stackable_operator::{
2121
apps::v1::StatefulSet,
2222
core::v1::{ConfigMap, Service},
2323
},
24+
kube::core::DeserializeGuard,
2425
kube::runtime::{watcher, Controller},
2526
logging::controller::report_controller_reconciled,
2627
};
@@ -71,7 +72,7 @@ async fn main() -> anyhow::Result<()> {
7172
.await?;
7273

7374
Controller::new(
74-
watch_namespace.get_api::<DruidCluster>(&client),
75+
watch_namespace.get_api::<DeserializeGuard<DruidCluster>>(&client),
7576
watcher::Config::default(),
7677
)
7778
.owns(

0 commit comments

Comments
 (0)