Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions policy-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ where
.await;
tracing::trace!(?ns);
tokio::time::timeout(
tokio::time::Duration::from_secs(60),
tokio::time::Duration::from_secs(15),
await_service_account(&client, &ns.name_unchecked(), "default"),
)
.await
Expand Down Expand Up @@ -754,33 +754,57 @@ where
}
}

pub async fn await_service_account(client: &kube::Client, ns: &str, name: &str) {
async fn await_resource<T>(
mut watcher: impl futures::Stream<
Item = Result<kube::runtime::watcher::Event<T>, kube::runtime::watcher::Error>,
> + Unpin,
predicate: impl Fn(&T) -> bool,
) where
T: kube::Resource + std::fmt::Debug,
{
use futures::StreamExt;

tracing::trace!(%name, %ns, "Waiting for serviceaccount");
tokio::pin! {
let sas = kube::runtime::watcher(
kube::Api::<k8s::ServiceAccount>::namespaced(client.clone(), ns),
Default::default(),
);
}
loop {
let ev = sas
let ev = watcher
.next()
.await
.expect("serviceaccounts watch must not end")
.expect("serviceaccounts watch must not fail");
.expect("watch must not end")
.expect("watch must not fail");
tracing::info!(?ev);
match ev {
kube::runtime::watcher::Event::InitApply(sa)
| kube::runtime::watcher::Event::Apply(sa)
if sa.name_unchecked() == name =>
kube::runtime::watcher::Event::InitApply(resource)
| kube::runtime::watcher::Event::Apply(resource)
if predicate(&resource) =>
{
return
break
}
_ => {}
}
}
}

pub async fn await_service_account(client: &kube::Client, ns: &str, name: &str) {
tracing::trace!(%ns, "Waiting for namespace");

let label_selector = format!("kubernetes.io/metadata.name={}", ns);
let watcher_config = kube::runtime::watcher::Config::default().labels(&label_selector);
tokio::pin! {
let namespaces = kube::runtime::watcher(
kube::Api::<k8s::Namespace>::all(client.clone()),
watcher_config,
);
}
await_resource(namespaces, |namespace| namespace.name_unchecked() == ns).await;

tracing::trace!(%name, %ns, "Waiting for serviceaccount");

tokio::pin! {
let sas = kube::runtime::watcher(
kube::Api::<k8s::ServiceAccount>::namespaced(client.clone(), ns),
Default::default(),
);
}
await_resource(sas, |sa| sa.name_unchecked() == name).await;

// XXX In some versions of k8s, it may be appropriate to wait for the
// ServiceAccount's secret to be created, but, as of v1.24,
Expand Down
Loading