From 550e50f58d0e3236217531acb8f90b5e4f583632 Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 5 Feb 2025 23:59:53 +0000 Subject: [PATCH] Experiment with multi_reflectors since the question has showed up a few times. ultimately this is awkward because the hard generic use in ObjectRef and in Store which ties them to a particular resource. It feels like it shouldn't have to be this way because the Store::get takes an ObjectRef, which feels dynamically typed, but actually isnt. Tried out a couple of potentials and had to just resort to the dumb thing instead. Maybe there's some other things we can do for dynamics. Maybe worth exploring in an issue? Signed-off-by: clux --- examples/Cargo.toml | 4 ++ examples/multi_reflector.rs | 137 ++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 examples/multi_reflector.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3abb06521..1c7740f1c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -130,6 +130,10 @@ path = "log_stream.rs" name = "multi_watcher" path = "multi_watcher.rs" +[[example]] +name = "multi_reflector" +path = "multi_reflector.rs" + [[example]] name = "pod_api" path = "pod_api.rs" diff --git a/examples/multi_reflector.rs b/examples/multi_reflector.rs new file mode 100644 index 000000000..9f3c4537d --- /dev/null +++ b/examples/multi_reflector.rs @@ -0,0 +1,137 @@ +use futures::{future, StreamExt}; +use k8s_openapi::api::{ + apps::v1::Deployment, + core::v1::{ConfigMap, Secret}, +}; +use kube::{ + runtime::{ + reflector, + reflector::{ObjectRef, Store}, + watcher, WatchStreamExt, + }, + Api, Client, +}; +use std::sync::Arc; +use tracing::*; + +// This does not work because Resource trait is not dyn safe. +/* +use std::any::TypeId; +use std::collections::HashMap; +use k8s_openapi::NamespaceResourceScope; +use kube::api::{Resource, ResourceExt}; +struct MultiStore { + stores: HashMap>>, + } +impl MultiStore { + fn get>(&self, name: &str, ns: &str) -> Option> { + let oref = ObjectRef::::new(name).within(ns); + if let Some(store) = self.stores.get(&TypeId::of::()) { + store.get(oref) + } else { + None + } + } +}*/ + +// explicit store can work +struct MultiStore { + deploys: Store, + cms: Store, + secs: Store, +} +// but using generics to help out won't because the K needs to be concretised +/* +impl MultiStore { + fn get>(&self, name: &str, ns: &str) -> Option>> { + let oref = ObjectRef::::new(name).within(ns); + let kind = K::kind(&()).to_owned(); + match kind.as_ref() { + "Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)), + "ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)), + "Secret" => self.secs.get(&ObjectRef::new(name).within(ns)), + _ => None, + } + None + } +} +*/ +// so left with this + +impl MultiStore { + fn get_deploy(&self, name: &str, ns: &str) -> Option> { + self.deploys.get(&ObjectRef::::new(name).within(ns)) + } + + fn get_secret(&self, name: &str, ns: &str) -> Option> { + self.secs.get(&ObjectRef::::new(name).within(ns)) + } + + fn get_cm(&self, name: &str, ns: &str) -> Option> { + self.cms.get(&ObjectRef::::new(name).within(ns)) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let deploys: Api = Api::default_namespaced(client.clone()); + let cms: Api = Api::default_namespaced(client.clone()); + let secret: Api = Api::default_namespaced(client.clone()); + + let (dep_reader, dep_writer) = reflector::store::(); + let (cm_reader, cm_writer) = reflector::store::(); + let (sec_reader, sec_writer) = reflector::store::(); + + let cfg = watcher::Config::default(); + let dep_watcher = watcher(deploys, cfg.clone()) + .reflect(dep_writer) + .applied_objects() + .for_each(|_| future::ready(())); + let cm_watcher = watcher(cms, cfg.clone()) + .reflect(cm_writer) + .applied_objects() + .for_each(|_| future::ready(())); + let sec_watcher = watcher(secret, cfg) + .reflect(sec_writer) + .applied_objects() + .for_each(|_| future::ready(())); + // poll these forever + + // multistore + let stores = MultiStore { + deploys: dep_reader, + cms: cm_reader, + secs: sec_reader, + }; + + // simulate doing stuff with the stores from some other thread + tokio::spawn(async move { + // Show state every 5 seconds of watching + info!("waiting for them to be ready"); + stores.deploys.wait_until_ready().await.unwrap(); + stores.cms.wait_until_ready().await.unwrap(); + stores.secs.wait_until_ready().await.unwrap(); + info!("stores initialised"); + // can use helper accessors + info!( + "common cm: {:?}", + stores.get_cm("kube-root-ca.crt", "kube-system").unwrap() + ); + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + // access individual sub stores + info!("Current deploys count: {}", stores.deploys.state().len()); + } + }); + // info!("long watches starting"); + tokio::select! { + r = dep_watcher => println!("dep watcher exit: {r:?}"), + r = cm_watcher => println!("cm watcher exit: {r:?}"), + r = sec_watcher => println!("sec watcher exit: {r:?}"), + } + + Ok(()) +}