diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 09975cf5f..76f5c2643 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -54,6 +54,7 @@ hostname.workspace = true [dev-dependencies] kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" } serde_json.workspace = true +serde_yaml.workspace = true tokio = { workspace = true, features = ["full", "test-util"] } rand.workspace = true schemars.workspace = true diff --git a/kube-runtime/src/wait.rs b/kube-runtime/src/wait.rs index e7451cb60..31f581066 100644 --- a/kube-runtime/src/wait.rs +++ b/kube-runtime/src/wait.rs @@ -163,7 +163,12 @@ impl) -> bool> Condition for F { pub mod conditions { pub use super::Condition; use k8s_openapi::{ - api::{batch::v1::Job, core::v1::Pod}, + api::{ + apps::v1::Deployment, + batch::v1::Job, + core::v1::{Pod, Service}, + networking::v1::Ingress, + }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, }; use kube_client::Resource; @@ -237,6 +242,69 @@ pub mod conditions { } } + /// An await condition for `Deployment` that returns `true` once the latest deployment has completed + /// + /// This looks for the condition that Kubernetes sets for completed deployments: + /// + #[must_use] + pub fn is_deployment_completed() -> impl Condition { + |obj: Option<&Deployment>| { + if let Some(depl) = &obj { + if let Some(s) = &depl.status { + if let Some(conds) = &s.conditions { + if let Some(dcond) = conds.iter().find(|c| { + c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string()) + }) { + return dcond.status == "True"; + } + } + } + } + false + } + } + + /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname + #[must_use] + pub fn is_service_loadbalancer_provisioned() -> impl Condition { + |obj: Option<&Service>| { + if let Some(svc) = &obj { + // ignore services that are not type LoadBalancer (return true immediately) + if let Some(spec) = &svc.spec { + if spec.type_ != Some("LoadBalancer".to_string()) { + return true; + } + // carry on if this is a LoadBalancer service + if let Some(s) = &svc.status { + if let Some(lbs) = &s.load_balancer { + if let Some(ings) = &lbs.ingress { + return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some()); + } + } + } + } + } + false + } + } + + /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname + #[must_use] + pub fn is_ingress_provisioned() -> impl Condition { + |obj: Option<&Ingress>| { + if let Some(ing) = &obj { + if let Some(s) = &ing.status { + if let Some(lbs) = &s.load_balancer { + if let Some(ings) = &lbs.ingress { + return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some()); + } + } + } + } + false + } + } + /// See [`Condition::not`] #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct Not(pub(super) A); @@ -271,6 +339,685 @@ pub mod conditions { self.0.matches_object(obj) || self.1.matches_object(obj) } } + + mod tests { + #[test] + /// pass when CRD is established + fn crd_established_ok() { + use super::{is_crd_established, Condition}; + + let crd = r#" + apiVersion: apiextensions.k8s.io/v1 + kind: CustomResourceDefinition + metadata: + name: testthings.kube.rs + spec: + group: kube.rs + names: + categories: [] + kind: TestThing + plural: testthings + shortNames: [] + singular: testthing + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true + status: + acceptedNames: + kind: TestThing + listKind: TestThingList + plural: testthings + singular: testthing + conditions: + - lastTransitionTime: "2025-03-06T03:10:03Z" + message: no conflicts found + reason: NoConflicts + status: "True" + type: NamesAccepted + - lastTransitionTime: "2025-03-06T03:10:03Z" + message: the initial names have been accepted + reason: InitialNamesAccepted + status: "True" + type: Established + storedVersions: + - v1 + "#; + + let c = serde_yaml::from_str(crd).unwrap(); + assert!(is_crd_established().matches_object(Some(&c))) + } + + #[test] + /// fail when CRD is not yet ready + fn crd_established_fail() { + use super::{is_crd_established, Condition}; + + let crd = r#" + apiVersion: apiextensions.k8s.io/v1 + kind: CustomResourceDefinition + metadata: + name: testthings.kube.rs + spec: + group: kube.rs + names: + categories: [] + kind: TestThing + plural: testthings + shortNames: [] + singular: testthing + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1 + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + served: true + storage: true + status: + acceptedNames: + kind: TestThing + listKind: TestThingList + plural: testthings + singular: testthing + conditions: + - lastTransitionTime: "2025-03-06T03:10:03Z" + message: no conflicts found + reason: NoConflicts + status: "True" + type: NamesAccepted + - lastTransitionTime: "2025-03-06T03:10:03Z" + message: the initial names have been accepted + reason: InitialNamesAccepted + status: "False" + type: Established + storedVersions: + - v1 + "#; + + let c = serde_yaml::from_str(crd).unwrap(); + assert!(!is_crd_established().matches_object(Some(&c))) + } + + #[test] + /// fail when CRD does not exist + fn crd_established_missing() { + use super::{is_crd_established, Condition}; + + assert!(!is_crd_established().matches_object(None)) + } + + #[test] + /// pass when pod is running + fn pod_running_ok() { + use super::{is_pod_running, Condition}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + namespace: default + name: testpod + spec: + containers: + - name: testcontainer + image: alpine + command: [ sleep ] + args: [ "100000" ] + status: + conditions: + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:53:07Z" + status: "True" + type: PodReadyToStartContainers + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:52:58Z" + status: "True" + type: Initialized + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:53:24Z" + status: "True" + type: Ready + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:53:24Z" + status: "True" + type: ContainersReady + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:52:58Z" + status: "True" + type: PodScheduled + containerStatuses: + - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875 + image: docker.io/library/alpine:latest + imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c + lastState: {} + name: testcontainer + ready: true + restartCount: 0 + started: true + state: + running: + startedAt: "2025-03-06T03:59:20Z" + phase: Running + qosClass: Burstable + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(is_pod_running().matches_object(Some(&p))) + } + + #[test] + /// fail if pod is unschedulable + fn pod_running_unschedulable() { + use super::{is_pod_running, Condition}; + + let pod = r#" + apiVersion: v1 + kind: Pod + metadata: + namespace: default + name: testpod + spec: + containers: + - name: testcontainer + image: alpine + command: [ sleep ] + args: [ "100000" ] + status: + conditions: + - lastProbeTime: null + lastTransitionTime: "2025-03-06T03:52:25Z" + message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1 + nodes are available: 1 Preemption is not helpful for scheduling.' + reason: Unschedulable + status: "False" + type: PodScheduled + phase: Pending + qosClass: Burstable + "#; + + let p = serde_yaml::from_str(pod).unwrap(); + assert!(!is_pod_running().matches_object(Some(&p))) + } + + #[test] + /// fail if pod does not exist + fn pod_running_missing() { + use super::{is_pod_running, Condition}; + + assert!(!is_pod_running().matches_object(None)) + } + + #[test] + /// pass if job completed + fn job_completed_ok() { + use super::{is_job_completed, Condition}; + + let job = r#" + apiVersion: batch/v1 + kind: Job + metadata: + name: pi + namespace: default + spec: + template: + spec: + containers: + - name: pi + command: + - perl + - -Mbignum=bpi + - -wle + - print bpi(2000) + image: perl:5.34.0 + imagePullPolicy: IfNotPresent + status: + completionTime: "2025-03-06T05:27:56Z" + conditions: + - lastProbeTime: "2025-03-06T05:27:56Z" + lastTransitionTime: "2025-03-06T05:27:56Z" + message: Reached expected number of succeeded pods + reason: CompletionsReached + status: "True" + type: SuccessCriteriaMet + - lastProbeTime: "2025-03-06T05:27:56Z" + lastTransitionTime: "2025-03-06T05:27:56Z" + message: Reached expected number of succeeded pods + reason: CompletionsReached + status: "True" + type: Complete + ready: 0 + startTime: "2025-03-06T05:27:27Z" + succeeded: 1 + terminating: 0 + uncountedTerminatedPods: {} + "#; + + let j = serde_yaml::from_str(job).unwrap(); + assert!(is_job_completed().matches_object(Some(&j))) + } + + #[test] + /// fail if job is still in progress + fn job_completed_running() { + use super::{is_job_completed, Condition}; + + let job = r#" + apiVersion: batch/v1 + kind: Job + metadata: + name: pi + namespace: default + spec: + backoffLimit: 4 + completionMode: NonIndexed + completions: 1 + manualSelector: false + parallelism: 1 + template: + spec: + containers: + - name: pi + command: + - perl + - -Mbignum=bpi + - -wle + - print bpi(2000) + image: perl:5.34.0 + imagePullPolicy: IfNotPresent + status: + active: 1 + ready: 0 + startTime: "2025-03-06T05:27:27Z" + terminating: 0 + uncountedTerminatedPods: {} + "#; + + let j = serde_yaml::from_str(job).unwrap(); + assert!(!is_job_completed().matches_object(Some(&j))) + } + + #[test] + /// fail if job does not exist + fn job_completed_missing() { + use super::{is_job_completed, Condition}; + + assert!(!is_job_completed().matches_object(None)) + } + + #[test] + /// pass when deployment has been fully rolled out + fn deployment_completed_ok() { + use super::{is_deployment_completed, Condition}; + + let depl = r#" + apiVersion: apps/v1 + kind: Deployment + metadata: + name: testapp + namespace: default + spec: + progressDeadlineSeconds: 600 + replicas: 3 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: test + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + creationTimestamp: null + labels: + app: test + spec: + containers: + - image: postgres + imagePullPolicy: Always + name: postgres + ports: + - containerPort: 5432 + protocol: TCP + env: + - name: POSTGRES_PASSWORD + value: foobar + status: + availableReplicas: 3 + conditions: + - lastTransitionTime: "2025-03-06T06:06:57Z" + lastUpdateTime: "2025-03-06T06:06:57Z" + message: Deployment has minimum availability. + reason: MinimumReplicasAvailable + status: "True" + type: Available + - lastTransitionTime: "2025-03-06T06:03:20Z" + lastUpdateTime: "2025-03-06T06:06:57Z" + message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed. + reason: NewReplicaSetAvailable + status: "True" + type: Progressing + observedGeneration: 2 + readyReplicas: 3 + replicas: 3 + updatedReplicas: 3 + "#; + + let d = serde_yaml::from_str(depl).unwrap(); + assert!(is_deployment_completed().matches_object(Some(&d))) + } + + #[test] + /// fail if deployment update is still rolling out + fn deployment_completed_pending() { + use super::{is_deployment_completed, Condition}; + + let depl = r#" + apiVersion: apps/v1 + kind: Deployment + metadata: + name: testapp + namespace: default + spec: + progressDeadlineSeconds: 600 + replicas: 3 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: test + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + creationTimestamp: null + labels: + app: test + spec: + containers: + - image: postgres + imagePullPolicy: Always + name: postgres + ports: + - containerPort: 5432 + protocol: TCP + env: + - name: POSTGRES_PASSWORD + value: foobar + status: + conditions: + - lastTransitionTime: "2025-03-06T06:03:20Z" + lastUpdateTime: "2025-03-06T06:03:20Z" + message: Deployment does not have minimum availability. + reason: MinimumReplicasUnavailable + status: "False" + type: Available + - lastTransitionTime: "2025-03-06T06:03:20Z" + lastUpdateTime: "2025-03-06T06:03:20Z" + message: ReplicaSet "testapp-77789cd7d4" is progressing. + reason: ReplicaSetUpdated + status: "True" + type: Progressing + observedGeneration: 1 + replicas: 3 + unavailableReplicas: 3 + updatedReplicas: 3 + "#; + + let d = serde_yaml::from_str(depl).unwrap(); + assert!(!is_deployment_completed().matches_object(Some(&d))) + } + + #[test] + /// fail if deployment does not exist + fn deployment_completed_missing() { + use super::{is_deployment_completed, Condition}; + + assert!(!is_deployment_completed().matches_object(None)) + } + + #[test] + /// pass if loadbalancer service has recieved a loadbalancer IP + fn service_lb_provisioned_ok_ip() { + use super::{is_service_loadbalancer_provisioned, Condition}; + + let service = r" + apiVersion: v1 + kind: Service + metadata: + name: test + spec: + selector: + app.kubernetes.io/name: test + type: LoadBalancer + ports: + - protocol: TCP + port: 80 + targetPort: 9376 + clusterIP: 10.0.171.239 + status: + loadBalancer: + ingress: + - ip: 192.0.2.127 + "; + + let s = serde_yaml::from_str(service).unwrap(); + assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s))) + } + + #[test] + /// pass if loadbalancer service has recieved a loadbalancer hostname + fn service_lb_provisioned_ok_hostname() { + use super::{is_service_loadbalancer_provisioned, Condition}; + + let service = r" + apiVersion: v1 + kind: Service + metadata: + name: test + spec: + selector: + app.kubernetes.io/name: test + type: LoadBalancer + ports: + - protocol: TCP + port: 80 + targetPort: 9376 + clusterIP: 10.0.171.239 + status: + loadBalancer: + ingress: + - hostname: example.exposed.service + "; + + let s = serde_yaml::from_str(service).unwrap(); + assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s))) + } + + #[test] + /// fail if loadbalancer service is still waiting for a LB + fn service_lb_provisioned_pending() { + use super::{is_service_loadbalancer_provisioned, Condition}; + + let service = r" + apiVersion: v1 + kind: Service + metadata: + name: test + spec: + selector: + app.kubernetes.io/name: test + type: LoadBalancer + ports: + - protocol: TCP + port: 80 + targetPort: 9376 + clusterIP: 10.0.171.239 + status: + loadBalancer: {} + "; + + let s = serde_yaml::from_str(service).unwrap(); + assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s))) + } + + #[test] + /// pass if service is not a loadbalancer + fn service_lb_provisioned_not_loadbalancer() { + use super::{is_service_loadbalancer_provisioned, Condition}; + + let service = r" + apiVersion: v1 + kind: Service + metadata: + name: test + spec: + selector: + app.kubernetes.io/name: test + type: ClusterIP + ports: + - protocol: TCP + port: 80 + targetPort: 9376 + status: + loadBalancer: {} + "; + + let s = serde_yaml::from_str(service).unwrap(); + assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s))) + } + + #[test] + /// fail if service does not exist + fn service_lb_provisioned_missing() { + use super::{is_service_loadbalancer_provisioned, Condition}; + + assert!(!is_service_loadbalancer_provisioned().matches_object(None)) + } + + #[test] + /// pass when ingress has recieved a loadbalancer IP + fn ingress_provisioned_ok_ip() { + use super::{is_ingress_provisioned, Condition}; + + let ingress = r#" + apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + name: test + namespace: default + resourceVersion: "1401" + uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67 + spec: + ingressClassName: nginx + rules: + - host: httpbin.local + http: + paths: + - path: / + backend: + service: + name: httpbin + port: + number: 80 + status: + loadBalancer: + ingress: + - ip: 10.89.7.3 + "#; + + let i = serde_yaml::from_str(ingress).unwrap(); + assert!(is_ingress_provisioned().matches_object(Some(&i))) + } + + #[test] + /// pass when ingress has recieved a loadbalancer hostname + fn ingress_provisioned_ok_hostname() { + use super::{is_ingress_provisioned, Condition}; + + let ingress = r#" + apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + name: test + namespace: default + resourceVersion: "1401" + uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67 + spec: + ingressClassName: nginx + rules: + - host: httpbin.local + http: + paths: + - path: / + backend: + service: + name: httpbin + port: + number: 80 + status: + loadBalancer: + ingress: + - hostname: example.exposed.service + "#; + + let i = serde_yaml::from_str(ingress).unwrap(); + assert!(is_ingress_provisioned().matches_object(Some(&i))) + } + + #[test] + /// fail if ingress is still waiting for a LB + fn ingress_provisioned_pending() { + use super::{is_ingress_provisioned, Condition}; + + let ingress = r#" + apiVersion: networking.k8s.io/v1 + kind: Ingress + metadata: + name: test + namespace: default + resourceVersion: "1401" + uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67 + spec: + ingressClassName: nginx + rules: + - host: httpbin.local + http: + paths: + - path: / + backend: + service: + name: httpbin + port: + number: 80 + status: + loadBalancer: {} + "#; + + let i = serde_yaml::from_str(ingress).unwrap(); + assert!(!is_ingress_provisioned().matches_object(Some(&i))) + } + + #[test] + /// fail if ingress does not exist + fn ingress_provisioned_missing() { + use super::{is_ingress_provisioned, Condition}; + + assert!(!is_ingress_provisioned().matches_object(None)) + } + } } /// Utilities for deleting objects