Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 66 additions & 96 deletions kube-runtime/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use crate::watcher::{self, watch_object};

/// Wait errors from [`await_condition`]
#[derive(Debug, Error)]
pub enum Error {
#[error("failed to probe for whether the condition is fulfilled yet: {0}")]
Expand Down Expand Up @@ -79,29 +80,34 @@
/// use k8s_openapi::api::core::v1::Pod;
/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
/// move |obj: Option<&Pod>| {
/// if let Some(pod) = &obj {
/// if let Some(status) = &pod.status {
/// if let Some(conds) = &status.conditions {
/// if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
/// return pcond.status == "True";
/// }
/// }
/// }
/// }
/// false
/// let cond = obj?.status.as_ref()?.conditions.as_ref()?.iter().find(|c| c.type_ == my_cond)?;
/// Some(cond.status == "True")
/// }
/// }
/// ```
pub trait Condition<K> {
fn matches_object(&self, obj: Option<&K>) -> bool;
/// Condition function with bool return
///
/// This function does NOT distinguish between a missing property and property declared to be false.
fn matches_object(&self, obj: Option<&K>) -> bool {
self.matches(obj).unwrap_or_default()
}

/// Condition function with optional return
///
/// This function is the raw underlying fn used in an `impl Condition` distinguishing missing and false conditions.
///
/// This function should return None when required properties are missing.
/// If the properties are found, but the condition is not satisfied, it must return Some(false).
fn matches(&self, _obj: Option<&K>) -> Option<bool>;

/// Returns a `Condition` that holds if `self` does not
///
/// # Usage
///
/// ```
/// # use kube_runtime::wait::Condition;
/// let condition: fn(Option<&()>) -> bool = |_| true;
/// let condition: fn(Option<&()>) -> Option<bool> = |_| Some(true);
/// assert!(condition.matches_object(None));
/// assert!(!condition.not().matches_object(None));
/// ```
Expand All @@ -118,8 +124,8 @@
///
/// ```
/// # use kube_runtime::wait::Condition;
/// let cond_false: fn(Option<&()>) -> bool = |_| false;
/// let cond_true: fn(Option<&()>) -> bool = |_| true;
/// let cond_false: fn(Option<&()>) -> Option<bool> = |_| Some(false);
/// let cond_true: fn(Option<&()>) -> Option<bool> = |_| Some(true);
/// assert!(!cond_false.and(cond_false).matches_object(None));
/// assert!(!cond_false.and(cond_true).matches_object(None));
/// assert!(!cond_true.and(cond_false).matches_object(None));
Expand All @@ -138,8 +144,8 @@
///
/// ```
/// # use kube_runtime::wait::Condition;
/// let cond_false: fn(Option<&()>) -> bool = |_| false;
/// let cond_true: fn(Option<&()>) -> bool = |_| true;
/// let cond_false: fn(Option<&()>) -> Option<bool> = |_| Some(false);
/// let cond_true: fn(Option<&()>) -> Option<bool> = |_| Some(true);
/// assert!(!cond_false.or(cond_false).matches_object(None));
/// assert!(cond_false.or(cond_true).matches_object(None));
/// assert!(cond_true.or(cond_false).matches_object(None));
Expand All @@ -153,8 +159,8 @@
}
}

impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
fn matches_object(&self, obj: Option<&K>) -> bool {
impl<K, F: Fn(Option<&K>) -> Option<bool>> Condition<K> for F {
fn matches(&self, obj: Option<&K>) -> Option<bool> {
(self)(obj)
}
}
Expand All @@ -181,12 +187,12 @@
#[must_use]
pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
move |obj: Option<&K>| {
obj.map_or(
Some(obj.map_or(
// Object is not found, success!
true,
// Object is found, but a changed uid would mean that it was deleted and recreated
|obj| obj.meta().uid.as_deref() != Some(uid),
)
))
}
}

Expand All @@ -197,48 +203,27 @@
#[must_use]
pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
|obj: Option<&CustomResourceDefinition>| {
if let Some(o) = obj {
if let Some(s) = &o.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
return pcond.status == "True";
}
}
}
}
false
let status = obj.as_ref()?.status.as_ref()?;
let conds = status.conditions.as_ref()?;
let established = conds.iter().find(|c| c.type_ == "Established")?;
Some(established.status == "True")
}
}

/// An await condition for `Pod` that returns `true` once it is running
#[must_use]
pub fn is_pod_running() -> impl Condition<Pod> {
|obj: Option<&Pod>| {
if let Some(pod) = &obj {
if let Some(status) = &pod.status {
if let Some(phase) = &status.phase {
return phase == "Running";
}
}
}
false
}
|obj: Option<&Pod>| Some(obj?.status.as_ref()?.phase.as_ref()? == "Running")
}

/// An await condition for `Job` that returns `true` once it is completed
#[must_use]
pub fn is_job_completed() -> impl Condition<Job> {
|obj: Option<&Job>| {
if let Some(job) = &obj {
if let Some(s) = &job.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
return pcond.status == "True";
}
}
}
}
false
let status = obj.as_ref()?.status.as_ref()?;
let conds = status.conditions.as_ref()?;
let complete = conds.iter().find(|c| c.type_ == "Complete")?;
Some(complete.status == "True")
}
}

Expand All @@ -249,68 +234,44 @@
#[must_use]
pub fn is_deployment_completed() -> impl Condition<Deployment> {
|obj: Option<&Deployment>| {
if let Some(depl) = &obj {
if let Some(s) = &depl.status {
if let Some(conds) = &s.conditions {
if let Some(dcond) = conds.iter().find(|c| {
c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
}) {
return dcond.status == "True";
}
}
}
}
false
let conds = obj?.status.as_ref()?.conditions.as_ref()?;
let progressing = conds.iter().find(|c| {
c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
})?;
Some(progressing.status == "True")
}
}

/// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
#[must_use]
pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
|obj: Option<&Service>| {
if let Some(svc) = &obj {
// ignore services that are not type LoadBalancer (return true immediately)
if let Some(spec) = &svc.spec {
if spec.type_ != Some("LoadBalancer".to_string()) {
return true;
}
Comment on lines -273 to -276
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @detjensrobert doing a follow-up on conditions. wondering if you had a particular reason to return true here rather than false for this condition. feels to me we shouldn't consider a condition "matching" on a service that's not even of the right type.

also if you have opinions on this pr, happy to hear 🙏

EDIT: sorry about spam, put comment on wrong place with wrong account.

Copy link
Contributor

@detjensrobert detjensrobert Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that since non-LoadBalancer services do not need to wait for a cloud LB, they are (almost) immediately ready and do not need any checks. I can see this wanting to fail on the wrong type of service though.

// carry on if this is a LoadBalancer service
if let Some(s) = &svc.status {
if let Some(lbs) = &s.load_balancer {
if let Some(ings) = &lbs.ingress {
return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
}
}
}
}
// explicitly reject services that are not type LoadBalancer
if obj?.spec.as_ref()?.type_.as_ref()? != "LoadBalancer" {
return Some(false);
}
false
let status = obj?.status.as_ref()?;
let ingress = status.load_balancer.as_ref()?.ingress.as_ref()?;
Some(ingress.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some()))
}
}

/// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
#[must_use]
pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
|obj: Option<&Ingress>| {
if let Some(ing) = &obj {
if let Some(s) = &ing.status {
if let Some(lbs) = &s.load_balancer {
if let Some(ings) = &lbs.ingress {
return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
}
}
}
}
false
let status = obj?.status.as_ref()?;
let ingress = status.load_balancer.as_ref()?.ingress.as_ref()?;
Some(ingress.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some()))
}
}

/// See [`Condition::not`]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Not<A>(pub(super) A);
impl<A: Condition<K>, K> Condition<K> for Not<A> {
fn matches_object(&self, obj: Option<&K>) -> bool {
!self.0.matches_object(obj)
fn matches(&self, obj: Option<&K>) -> Option<bool> {
Some(!self.0.matches_object(obj))

Check warning on line 274 in kube-runtime/src/wait.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/wait.rs#L273-L274

Added lines #L273 - L274 were not covered by tests
}
}

Expand All @@ -322,8 +283,8 @@
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) && self.1.matches_object(obj)
fn matches(&self, obj: Option<&K>) -> Option<bool> {
Some(self.0.matches_object(obj) && self.1.matches_object(obj))
}
}

Expand All @@ -335,8 +296,8 @@
A: Condition<K>,
B: Condition<K>,
{
fn matches_object(&self, obj: Option<&K>) -> bool {
self.0.matches_object(obj) || self.1.matches_object(obj)
fn matches(&self, obj: Option<&K>) -> Option<bool> {
Some(self.0.matches_object(obj) || self.1.matches_object(obj))

Check warning on line 300 in kube-runtime/src/wait.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/wait.rs#L299-L300

Added lines #L299 - L300 were not covered by tests
}
}

Expand Down Expand Up @@ -870,7 +831,10 @@
";

let s = serde_yaml::from_str(service).unwrap();
assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
// matches object is false because it does not match the condition
assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)),);
// but via None because the underlying matches method is missing properties
assert_eq!(is_service_loadbalancer_provisioned().matches(Some(&s)), None);
}

#[test]
Expand All @@ -896,7 +860,13 @@
";

let s = serde_yaml::from_str(service).unwrap();
assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
// false; not matching because it's not a load balancer
assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)),);
// but via explicit false, because method rejected the value of the properties
assert_eq!(
is_service_loadbalancer_provisioned().matches(Some(&s)),
Some(false)
)
}

#[test]
Expand Down
13 changes: 3 additions & 10 deletions kube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,9 @@ mod test {
// TODO: remove these once we can write these functions generically
fn is_each_container_ready() -> impl Condition<Pod> {
|obj: Option<&Pod>| {
if let Some(o) = obj {
if let Some(s) = &o.status {
if let Some(conds) = &s.conditions {
if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
return pcond.status == "True";
}
}
}
}
false
let conds = obj?.status.as_ref()?.conditions.as_ref()?;
let pcond = conds.iter().find(|c| c.type_ == "ContainersReady")?;
Some(pcond.status == "True")
}
}
let is_fully_ready = await_condition(
Expand Down