Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ All notable changes to this project will be documented in this file.
### Fixed

- BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory. Previously operators errored out in case these fields where missing ([#283]).
- Failing to parse one `ZookeeperCluster`/`ZookeeperZnode` should no longer cause the whole operator to stop functioning ([#293]).
- The StatefulSet restarter service now only retrieves metadata for ConfigMaps and Secrets, rather than full objects ([#293]).

[#283]: https://github.com/stackabletech/commons-operator/pull/283
[#285]: https://github.com/stackabletech/commons-operator/pull/285
[#290]: https://github.com/stackabletech/commons-operator/pull/290
[#293]: https://github.com/stackabletech/commons-operator/pull/293

## [24.7.0] - 2024-07-24

Expand Down
44 changes: 37 additions & 7 deletions rust/operator-binary/src/pod_enrichment_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use snafu::{ResultExt, Snafu};
use stackable_operator::{
k8s_openapi::api::core::v1::{Node, Pod},
kube::{
core::ObjectMeta,
core::{error_boundary, DeserializeGuard, ObjectMeta},
runtime::{controller, reflector::ObjectRef, watcher, Controller},
Resource,
},
logging::controller::{report_controller_reconciled, ReconcilerError},
namespace::WatchNamespace,
Expand All @@ -23,11 +24,17 @@ struct Ctx {
#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
pub enum Error {
#[snafu(display("Pod object is invalid"))]
InvalidPod {
source: error_boundary::InvalidObject,
},

#[snafu(display("failed to get {node} for Pod"))]
GetNode {
source: stackable_operator::client::Error,
node: ObjectRef<Node>,
},

#[snafu(display("failed to update Pod"))]
UpdatePod {
source: stackable_operator::client::Error,
Expand All @@ -41,6 +48,7 @@ impl ReconcilerError for Error {

fn secondary_object(&self) -> Option<ObjectRef<stackable_operator::kube::core::DynamicObject>> {
match self {
Error::InvalidPod { source: _ } => None,
Error::GetNode { node, .. } => Some(node.clone().erase()),
Error::UpdatePod { source: _ } => None,
}
Expand All @@ -49,20 +57,23 @@ impl ReconcilerError for Error {

pub async fn start(client: &stackable_operator::client::Client, watch_namespace: &WatchNamespace) {
let controller = Controller::new(
watch_namespace.get_api::<Pod>(client),
watch_namespace.get_api::<DeserializeGuard<Pod>>(client),
watcher::Config::default().labels("enrichment.stackable.tech/enabled=true"),
);
let pods = controller.store();
controller
.watches(
client.get_all_api::<Node>(),
client.get_all_api::<DeserializeGuard<Node>>(),
watcher::Config::default(),
move |node| {
pods.state()
.into_iter()
.filter(move |pod| {
let Ok(pod) = &pod.0 else {
return false;
};
pod.spec.as_ref().and_then(|s| s.node_name.as_deref())
== node.metadata.name.as_deref()
== node.meta().name.as_deref()
})
.map(|pod| ObjectRef::from_obj(&*pod))
},
Expand All @@ -86,7 +97,17 @@ pub enum NodeAddressType {
InternalIP,
}

async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, Error> {
async fn reconcile(
pod: Arc<DeserializeGuard<Pod>>,
ctx: Arc<Ctx>,
) -> Result<controller::Action, Error> {
tracing::info!("Starting reconcile");
let pod = pod
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidPodSnafu)?;

let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref());
let node = if let Some(node_name) = node_name {
ctx.client
Expand Down Expand Up @@ -133,6 +154,15 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, E
Ok(controller::Action::await_change())
}

fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> controller::Action {
controller::Action::requeue(Duration::from_secs(5))
fn error_policy(
_obj: Arc<DeserializeGuard<Pod>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> controller::Action {
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidPod { .. } => controller::Action::await_change(),

_ => controller::Action::requeue(Duration::from_secs(5)),
}
}
8 changes: 4 additions & 4 deletions rust/operator-binary/src/restart_controller/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use stackable_operator::{
},
kube::{
self,
api::EvictParams,
api::{EvictParams, PartialObjectMeta},
core::DynamicObject,
runtime::{controller::Action, reflector::ObjectRef, watcher, Controller},
},
Expand Down Expand Up @@ -63,7 +63,7 @@ impl ReconcilerError for Error {

pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
let controller = Controller::new(
watch_namespace.get_api::<Pod>(client),
watch_namespace.get_api::<PartialObjectMeta<Pod>>(client),
watcher::Config::default(),
);
controller
Expand All @@ -80,7 +80,7 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
.await;
}

async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
async fn reconcile(pod: Arc<PartialObjectMeta<Pod>>, ctx: Arc<Ctx>) -> Result<Action, Error> {
tracing::info!("Starting reconciliation ..");
if pod.metadata.deletion_timestamp.is_some() {
// Object is already being deleted, no point trying again
Expand Down Expand Up @@ -163,6 +163,6 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
}
}

fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
fn error_policy(_obj: Arc<PartialObjectMeta<Pod>>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(Duration::from_secs(5))
}
74 changes: 49 additions & 25 deletions rust/operator-binary/src/restart_controller/statefulset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,36 @@ use stackable_operator::k8s_openapi::api::core::v1::{
ConfigMap, EnvFromSource, EnvVar, PodSpec, Secret, Volume,
};
use stackable_operator::kube;
use stackable_operator::kube::api::{Patch, PatchParams};
use stackable_operator::kube::core::DynamicObject;
use stackable_operator::kube::api::{PartialObjectMeta, Patch, PatchParams};
use stackable_operator::kube::core::{error_boundary, DeserializeGuard, DynamicObject};
use stackable_operator::kube::runtime::controller::{
trigger_self, trigger_with, Action, ReconcileRequest,
};
use stackable_operator::kube::runtime::reflector::{ObjectRef, Store};
use stackable_operator::kube::runtime::{applier, reflector, watcher, Config, WatchStreamExt};
use stackable_operator::kube::runtime::{
applier, metadata_watcher, reflector, watcher, Config, WatchStreamExt,
};
use stackable_operator::kube::{Resource, ResourceExt};
use stackable_operator::logging::controller::{report_controller_reconciled, ReconcilerError};
use stackable_operator::namespace::WatchNamespace;
use strum::{EnumDiscriminants, IntoStaticStr};

struct Ctx {
kube: kube::Client,
cms: Store<ConfigMap>,
cms: Store<PartialObjectMeta<ConfigMap>>,
cms_inited: Arc<AtomicBool>,
secrets: Store<Secret>,
secrets: Store<PartialObjectMeta<Secret>>,
secrets_inited: Arc<AtomicBool>,
}

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
enum Error {
#[snafu(display("StatefulSet object is invalid"))]
InvalidStatefulSet {
source: error_boundary::InvalidObject,
},

#[snafu(display("failed to patch object {obj_ref}"))]
PatchFailed {
source: kube::Error,
Expand All @@ -55,6 +62,7 @@ impl ReconcilerError for Error {

fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
match self {
Error::InvalidStatefulSet { source: _ } => None,
Error::PatchFailed { obj_ref, .. } => Some(*obj_ref.clone()),
Error::ConfigMapsUninitialized => None,
Error::SecretsUninitialized => None,
Expand All @@ -63,12 +71,12 @@ impl ReconcilerError for Error {
}

pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
let stses = watch_namespace.get_api::<StatefulSet>(client);
let stses = watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(client);
let cms = watch_namespace.get_api::<ConfigMap>(client);
let secrets = watch_namespace.get_api::<Secret>(client);
let sts_store = reflector::store::Writer::new(());
let cm_store = reflector::store::Writer::new(());
let secret_store = reflector::store::Writer::new(());
let sts_store = reflector::store::Writer::<DeserializeGuard<StatefulSet>>::new(());
let cm_store = reflector::store::Writer::<PartialObjectMeta<ConfigMap>>::new(());
let secret_store = reflector::store::Writer::<PartialObjectMeta<Secret>>::new(());
let cms_inited = Arc::new(AtomicBool::from(false));
let secrets_inited = Arc::new(AtomicBool::from(false));

Expand All @@ -86,17 +94,18 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
stream::select(
stream::select(
trigger_all(
reflector(cm_store, watcher(cms, watcher::Config::default()))
reflector(cm_store, metadata_watcher(cms, watcher::Config::default()))
.inspect(|_| cms_inited.store(true, std::sync::atomic::Ordering::SeqCst))
.touched_objects(),
sts_store.as_reader(),
),
trigger_all(
reflector(secret_store, watcher(secrets, watcher::Config::default()))
.inspect(|_| {
secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst)
})
.touched_objects(),
reflector(
secret_store,
metadata_watcher(secrets, watcher::Config::default()),
)
.inspect(|_| secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst))
.touched_objects(),
sts_store.as_reader(),
),
),
Expand Down Expand Up @@ -161,7 +170,17 @@ fn find_pod_refs<'a, K: Resource + 'a>(
.chain(container_env_from_refs)
}

async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error> {
async fn reconcile(
sts: Arc<DeserializeGuard<StatefulSet>>,
ctx: Arc<Ctx>,
) -> Result<Action, Error> {
tracing::info!("Starting reconcile");
let sts = sts
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidStatefulSetSnafu)?;

if !ctx.cms_inited.load(std::sync::atomic::Ordering::SeqCst) {
return ConfigMapsUninitializedSnafu.fail();
}
Expand All @@ -181,12 +200,12 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
find_pod_refs(
pod_spec,
|volume| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&volume.config_map.as_ref()?.name,
))
},
|env_var| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&env_var
.value_from
.as_ref()?
Expand All @@ -196,7 +215,7 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
))
},
|env_from| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&env_from.config_map_ref.as_ref()?.name,
))
},
Expand Down Expand Up @@ -225,17 +244,17 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
find_pod_refs(
pod_spec,
|volume| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
volume.secret.as_ref()?.secret_name.as_deref()?,
))
},
|env_var| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
&env_var.value_from.as_ref()?.secret_key_ref.as_ref()?.name,
))
},
|env_from| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
&env_from.secret_ref.as_ref()?.name,
))
},
Expand Down Expand Up @@ -290,11 +309,16 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
)
.await
.context(PatchFailedSnafu {
obj_ref: ObjectRef::from_obj(sts.as_ref()).erase(),
obj_ref: ObjectRef::from_obj(sts).erase(),
})?;
Ok(Action::await_change())
}

fn error_policy(_obj: Arc<StatefulSet>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(Duration::from_secs(5))
fn error_policy(_obj: Arc<DeserializeGuard<StatefulSet>>, error: &Error, _ctx: Arc<Ctx>) -> Action {
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidStatefulSet { .. } => Action::await_change(),

_ => Action::requeue(Duration::from_secs(5)),
}
}
Loading