Skip to content

Commit 1836246

Browse files
committed
Wait for deployed resources to become ready
This waits for: - pods to become running - deployments to complete rollouts - ingresses to become published by controller - LoadBalancer services to get external IP Other types of resources or services are ignored and immediately return Ok. Signed-off-by: Robert Detjens <[email protected]>
1 parent ae02b62 commit 1836246

File tree

2 files changed

+186
-9
lines changed

2 files changed

+186
-9
lines changed

src/clients.rs

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@
33
use anyhow::{anyhow, bail, Context, Error, Result};
44
use bollard;
55
use futures::TryFutureExt;
6+
use k8s_openapi::api::{
7+
apps::v1::Deployment,
8+
core::v1::{Pod, Service},
9+
networking::v1::Ingress,
10+
};
611
use kube::{
712
self,
8-
api::{DynamicObject, GroupVersionKind, Patch, PatchParams, TypeMeta},
13+
api::{DynamicObject, GroupVersionKind, Patch, PatchParams},
914
core::ResourceExt,
10-
discovery::{ApiCapabilities, ApiResource, Discovery, Scope},
15+
discovery::{ApiCapabilities, ApiResource},
16+
runtime::{conditions, wait::await_condition},
1117
};
1218
use s3;
1319
use simplelog::*;
@@ -240,3 +246,107 @@ fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yml::Value>> {
240246
// .map(|r| r.map_err(|e| e.into()))
241247
// .collect()
242248
}
249+
250+
/// Check the status of the passed object and wait for it to become ready.
251+
///
252+
/// This function does not provide a timeout. Callers will need to wrap this with a timeout instead.
253+
pub async fn wait_for_status(client: &kube::Client, object: &DynamicObject) -> Result<()> {
254+
debug!(
255+
"waiting for ok status for {} {}",
256+
object.types.clone().unwrap_or_default().kind,
257+
object.name_any()
258+
);
259+
260+
// handle each separate object type differently
261+
match object.types.clone().unwrap_or_default().kind.as_str() {
262+
// wait for Pod to become running
263+
"Pod" => {
264+
let api = kube::Api::namespaced(client.clone(), &object.namespace().unwrap());
265+
let x = await_condition(api, &object.name_any(), conditions::is_pod_running()).await?;
266+
}
267+
268+
// wait for Deployment to complete rollout
269+
"Deployment" => {
270+
let api = kube::Api::namespaced(client.clone(), &object.namespace().unwrap());
271+
await_condition(api, &object.name_any(), |d: Option<&Deployment>| {
272+
// Use a nested function so that we can use Option? returns (the outer closure returns `bool`)
273+
// TODO: switch to try { } when that is standardized
274+
/// Replicate the upstream deployment complete check
275+
/// https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment
276+
fn depl_complete(d: Option<&Deployment>) -> Option<bool> {
277+
Some(d?.status.as_ref()?.conditions.as_ref()?.iter().any(|c| {
278+
c.reason == Some("NewReplicaSetAvailable".to_string()) && c.status == "True"
279+
}))
280+
}
281+
depl_complete(d).unwrap_or(false)
282+
})
283+
.await?;
284+
}
285+
286+
// wait for Ingress to get IP from ingress controller
287+
"Ingress" => {
288+
let api = kube::Api::namespaced(client.clone(), &object.namespace().unwrap());
289+
await_condition(api, &object.name_any(), |i: Option<&Ingress>| {
290+
// Use nested function for Option ?, like above.
291+
/// Wait for ingress controller to update this with its external ip
292+
fn ingress_ip(i: Option<&Ingress>) -> Option<bool> {
293+
Some(
294+
// bleh, this as_ref stuff is unavoidable
295+
i?.status
296+
.as_ref()?
297+
.load_balancer
298+
.as_ref()?
299+
.ingress
300+
.as_ref()?
301+
.iter()
302+
// TODO: should this be any()? all controllers I've seen only add .ip here
303+
.all(|ip| ip.hostname.is_some() || ip.ip.is_some()),
304+
)
305+
}
306+
ingress_ip(i).unwrap_or(false)
307+
})
308+
.await?;
309+
}
310+
311+
// wait for LoadBalancer service to get IP
312+
"Service" => {
313+
let api = kube::Api::namespaced(client.clone(), &object.namespace().unwrap());
314+
let svc: Service = api.get(&object.name_any()).await?;
315+
316+
// we only care about checking LoadBalancer-type services, return Ok
317+
// for any non-LB services
318+
//
319+
// TODO: do we care about NodePorts? don't need to check any atm
320+
if svc.spec.unwrap_or_default().type_ != Some("LoadBalancer".to_string()) {
321+
trace!(
322+
"not checking status for internal service {}",
323+
object.name_any()
324+
);
325+
return Ok(());
326+
}
327+
328+
await_condition(api, &object.name_any(), |s: Option<&Service>| {
329+
/// Wait for LoadBalancer to get external IP
330+
fn lb_ip(s: Option<&Service>) -> Option<bool> {
331+
Some(
332+
// bleh, this as_ref stuff is unavoidable
333+
s?.status
334+
.as_ref()?
335+
.load_balancer
336+
.as_ref()?
337+
.ingress
338+
.as_ref()?
339+
.iter()
340+
.all(|ip| ip.hostname.is_some() || ip.ip.is_some()),
341+
)
342+
}
343+
lb_ip(s).unwrap_or(false)
344+
})
345+
.await?;
346+
}
347+
348+
other => trace!("not checking status for resource type {other}"),
349+
};
350+
351+
Ok(())
352+
}

src/deploy/kubernetes/mod.rs

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use std::path::PathBuf;
2+
use std::time::Duration;
23

34
use anyhow::{anyhow, bail, Context, Error, Ok, Result};
45
use itertools::Itertools;
56
use minijinja;
67
use simplelog::*;
8+
use tokio::time::timeout;
79

810
use crate::builder::BuildResult;
9-
use crate::clients::{apply_manifest_yaml, kube_client};
11+
use crate::clients::{apply_manifest_yaml, kube_client, wait_for_status};
1012
use crate::configparser::challenge::ExposeType;
1113
use crate::configparser::config::ProfileConfig;
1214
use crate::configparser::{get_config, get_profile_config, ChallengeConfig};
@@ -78,9 +80,17 @@ async fn deploy_single_challenge(
7880
trace!("NAMESPACE:\n{}", ns_manifest);
7981

8082
debug!("applying namespace for chal {:?}", chal.directory);
81-
apply_manifest_yaml(&kube, &ns_manifest).await?;
8283

83-
let expose_results = DeployResult { exposed: vec![] };
84+
// apply namespace manifest
85+
apply_manifest_yaml(&kube, &ns_manifest)
86+
.await?
87+
.iter()
88+
// and then wait for it to be ready
89+
.map(|object| wait_for_status(&kube, object))
90+
.try_join_all()
91+
.await?;
92+
93+
let results = DeployResult { exposed: vec![] };
8494

8595
for pod in &chal.pods {
8696
let pod_image = chal.container_tag_for_pod(profile_name, &pod.name)?;
@@ -94,7 +104,26 @@ async fn deploy_single_challenge(
94104
"applying deployment for chal {:?} pod {:?}",
95105
chal.directory, pod.name
96106
);
97-
apply_manifest_yaml(&kube, &depl_manifest).await?;
107+
let depl = apply_manifest_yaml(&kube, &depl_manifest).await?;
108+
for object in depl {
109+
// wait for objects to be ready, with 5m timeout
110+
timeout(Duration::from_secs(5 * 60), wait_for_status(&kube, &object))
111+
.await
112+
// timeout wraps with another Result
113+
.with_context(|| {
114+
format!(
115+
"timed out waiting for chal {:?} pod {:?} deployment to become ready",
116+
chal.directory, pod.name
117+
)
118+
})?
119+
// inner result from wait_for_status
120+
.with_context(|| {
121+
format!(
122+
"failed to get status for chal {:?} pod {:?} deployment",
123+
chal.directory, pod.name
124+
)
125+
})?;
126+
}
98127

99128
// tcp and http exposes need to he handled separately, so separate them by type
100129
let (tcp_ports, http_ports): (Vec<_>, Vec<_>) = pod
@@ -113,7 +142,26 @@ async fn deploy_single_challenge(
113142
"applying tcp service for chal {:?} pod {:?}",
114143
chal.directory, pod.name
115144
);
116-
apply_manifest_yaml(&kube, &tcp_manifest).await?;
145+
let tcp = apply_manifest_yaml(&kube, &tcp_manifest).await?;
146+
for object in tcp {
147+
// wait for objects to be ready, with 5m timeout
148+
timeout(Duration::from_secs(5 * 60), wait_for_status(&kube, &object))
149+
.await
150+
// timeout wraps with another Result
151+
.with_context(|| {
152+
format!(
153+
"timed out waiting for chal {:?} pod {:?} exposed TCP service to become ready",
154+
chal.directory, pod.name
155+
)
156+
})?
157+
// inner result from wait_for_status
158+
.with_context(|| {
159+
format!(
160+
"failed to get status for chal {:?} pod {:?} exposed TCP service",
161+
chal.directory, pod.name
162+
)
163+
})?;
164+
}
117165

118166
// TODO:
119167
// expose_results.exposed.push(PodDeployResult::Tcp { port: tcp_ports[0]. });
@@ -130,11 +178,30 @@ async fn deploy_single_challenge(
130178
"applying http service and ingress for chal {:?} pod {:?}",
131179
chal.directory, pod.name
132180
);
133-
apply_manifest_yaml(&kube, &http_manifest).await?;
181+
let ingress = apply_manifest_yaml(&kube, &http_manifest).await?;
182+
for object in ingress {
183+
// wait for objects to be ready, with 5m timeout
184+
timeout(Duration::from_secs(5 * 60), wait_for_status(&kube, &object))
185+
.await
186+
// timeout wraps with another Result
187+
.with_context(|| {
188+
format!(
189+
"timed out waiting for chal {:?} pod {:?} ingress to become ready",
190+
chal.directory, pod.name
191+
)
192+
})?
193+
// inner result from wait_for_status
194+
.with_context(|| {
195+
format!(
196+
"failed to get status for chal {:?} pod {:?} ingress",
197+
chal.directory, pod.name
198+
)
199+
})?;
200+
}
134201
}
135202
}
136203

137-
Ok(expose_results)
204+
Ok(results)
138205
}
139206

140207
// Updates the current ingress controller chart with the current set of TCP

0 commit comments

Comments
 (0)