diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3abb06521..6022f44e7 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -130,6 +130,10 @@ path = "log_stream.rs" name = "multi_watcher" path = "multi_watcher.rs" +[[example]] +name = "broadcast_reflector" +path = "broadcast_reflector.rs" + [[example]] name = "pod_api" path = "pod_api.rs" diff --git a/examples/broadcast_reflector.rs b/examples/broadcast_reflector.rs new file mode 100644 index 000000000..f21cbb1e8 --- /dev/null +++ b/examples/broadcast_reflector.rs @@ -0,0 +1,113 @@ +use futures::{future, lock::Mutex, pin_mut, stream, StreamExt}; +use k8s_openapi::api::{ + apps::v1::Deployment, + core::v1::{ConfigMap, Secret}, +}; +use kube::{ + api::ApiResource, + runtime::{ + broadcaster, + controller::Action, + reflector::multi_dispatcher::{BroadcastStream, MultiDispatcher}, + watcher, Controller, + }, + Api, Client, ResourceExt, +}; +use std::{fmt::Debug, pin::pin, sync::Arc, time::Duration}; +use thiserror::Error; +use tracing::*; + +#[derive(Debug, Error)] +enum Infallible {} + +// A generic reconciler that can be used with any object whose type is known at +// compile time. Will simply log its kind on reconciliation. +async fn reconcile(_obj: Arc, _ctx: Arc<()>) -> Result +where + K: ResourceExt, +{ + let kind = K::kind(&()); + info!("Reconciled {kind}"); + Ok(Action::await_change()) +} + +fn error_policy(_: Arc, _: &Infallible, _ctx: Arc<()>) -> Action { + info!("error"); + Action::requeue(Duration::from_secs(10)) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let writer = MultiDispatcher::new(128); + + // multireflector stream + let combo_stream = Arc::new(Mutex::new(stream::select_all(vec![]))); + let watcher = broadcaster(writer.clone(), BroadcastStream::new(combo_stream.clone())); + + combo_stream.lock().await.push( + watcher::watcher( + Api::all_with(client.clone(), &ApiResource::erase::(&())), + Default::default(), + ) + .boxed(), + ); + + // watching config maps, but ignoring in the final configuration + combo_stream.lock().await.push( + watcher::watcher( + Api::all_with(client.clone(), &ApiResource::erase::(&())), + Default::default(), + ) + .boxed(), + ); + + // Combine duplicate type streams with narrowed down selection + combo_stream.lock().await.push( + watcher::watcher( + Api::default_namespaced_with(client.clone(), &ApiResource::erase::(&())), + Default::default(), + ) + .boxed(), + ); + combo_stream.lock().await.push( + watcher::watcher( + Api::namespaced_with(client.clone(), "kube-system", &ApiResource::erase::(&())), + Default::default(), + ) + .boxed(), + ); + + let (sub, reader) = writer.subscribe::(); + let deploy = Controller::for_shared_stream(sub, reader) + .shutdown_on_signal() + .run(reconcile, error_policy, Arc::new(())) + .for_each(|res| async move { + match res { + Ok(v) => info!("Reconciled deployment {v:?}"), + Err(error) => warn!(%error, "Failed to reconcile metadata"), + }; + }); + + let (sub, reader) = writer.subscribe::(); + let secret = Controller::for_shared_stream(sub, reader) + .shutdown_on_signal() + .run(reconcile, error_policy, Arc::new(())) + .for_each(|res| async move { + match res { + Ok(v) => info!("Reconciled secret {v:?}"), + Err(error) => warn!(%error, "Failed to reconcile metadata"), + }; + }); + + info!("long watches starting"); + tokio::select! { + r = watcher.for_each(|_| future::ready(())) => println!("watcher exit: {r:?}"), + x = deploy => println!("deployments exit: {x:?}"), + x = secret => println!("secrets exit: {x:?}"), + } + + Ok(()) +} diff --git a/kube-core/src/dynamic.rs b/kube-core/src/dynamic.rs index ebaa7e3a6..9f82ff248 100644 --- a/kube-core/src/dynamic.rs +++ b/kube-core/src/dynamic.rs @@ -5,6 +5,7 @@ pub use crate::discovery::ApiResource; use crate::{ metadata::TypeMeta, resource::{DynamicResourceScope, Resource}, + GroupVersionKind, }; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; @@ -73,6 +74,12 @@ impl DynamicObject { ) -> Result { Ok(serde_json::from_value(serde_json::to_value(self)?)?) } + + /// Returns the group, version, and kind (GVK) of this resource. + pub fn gvk(&self) -> Option { + let gvk = self.types.clone()?; + gvk.try_into().ok() + } } impl Resource for DynamicObject { diff --git a/kube-core/src/gvk.rs b/kube-core/src/gvk.rs index 91b986601..77d140eda 100644 --- a/kube-core/src/gvk.rs +++ b/kube-core/src/gvk.rs @@ -12,7 +12,7 @@ use thiserror::Error; pub struct ParseGroupVersionError(pub String); /// Core information about an API Resource. -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct GroupVersionKind { /// API group pub group: String, diff --git a/kube-core/src/metadata.rs b/kube-core/src/metadata.rs index 67edf6e16..0c12b9f5b 100644 --- a/kube-core/src/metadata.rs +++ b/kube-core/src/metadata.rs @@ -51,6 +51,22 @@ impl TypeMeta { kind: K::kind(&()).into(), } } + + /// Construct a new `TypeMeta` for the object from the list `TypeMeta`. + /// + /// ``` + /// # use k8s_openapi::api::core::v1::Pod; + /// # use kube_core::TypeMeta; + /// + /// let mut type_meta = TypeMeta::resource::(); + /// type_meta.kind = "PodList".to_string(); + /// assert_eq!(type_meta.clone().singular_list().unwrap().kind, "Pod"); + /// assert_eq!(type_meta.clone().singular_list().unwrap().api_version, "v1"); + /// ``` + pub fn singular_list(self) -> Option { + let kind = self.kind.strip_suffix("List")?.to_string(); + (!kind.is_empty()).then_some(Self { kind, ..self }) + } } /// A generic representation of any object with `ObjectMeta`. diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index fa508dc4e..4abe942c9 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -8,7 +8,7 @@ use serde::Serialize; /// depending on what `resource_version`, `limit`, `continue_token` you include with the list request. /// /// See for details. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Hash)] pub enum VersionMatch { /// Returns data at least as new as the provided resource version. /// @@ -36,7 +36,7 @@ pub enum VersionMatch { } /// Common query parameters used in list/delete calls on collections -#[derive(Clone, Debug, Default, PartialEq)] +#[derive(Clone, Debug, Default, PartialEq, Hash)] pub struct ListParams { /// A selector to restrict the list of returned objects by their labels. /// @@ -305,7 +305,7 @@ impl ValidationDirective { } /// Common query parameters used in watch calls on collections -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Hash)] pub struct WatchParams { /// A selector to restrict returned objects by their labels. /// diff --git a/kube-core/src/resource.rs b/kube-core/src/resource.rs index 3ab1d88df..6dffdeeb1 100644 --- a/kube-core/src/resource.rs +++ b/kube-core/src/resource.rs @@ -8,6 +8,8 @@ use std::{borrow::Cow, collections::BTreeMap}; pub use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope, ResourceScope, SubResourceScope}; +use crate::GroupVersionKind; + /// Indicates that a [`Resource`] is of an indeterminate dynamic scope. pub struct DynamicResourceScope {} impl ResourceScope for DynamicResourceScope {} @@ -54,6 +56,11 @@ pub trait Resource { /// This is known as the resource in apimachinery, we rename it for disambiguation. fn plural(dt: &Self::DynamicType) -> Cow<'_, str>; + /// Generates an object reference for the resource + fn gvk(dt: &Self::DynamicType) -> GroupVersionKind { + GroupVersionKind::gvk(&Self::group(dt), &Self::version(dt), &Self::kind(dt)) + } + /// Creates a url path for http requests for this resource fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String { let n = if let Some(ns) = namespace { diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 8a8f7fc3a..2816129b8 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1766,7 +1766,7 @@ mod tests { queue_rx.map(Result::<_, Infallible>::Ok), Config::default(), )); - store_tx.apply_watcher_event(&watcher::Event::InitDone); + store_tx.apply_watcher_event(&watcher::Event::InitDone(None)); for i in 0..items { let obj = ConfigMap { metadata: ObjectMeta { @@ -1776,7 +1776,7 @@ mod tests { }, ..Default::default() }; - store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone())); + store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone(), None)); queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap(); } diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 7d0d8512c..8c6669656 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -32,6 +32,7 @@ pub mod watcher; pub use controller::{applier, Config, Controller}; pub use finalizer::finalizer; +#[cfg(feature = "unstable-runtime-subscribe")] pub use reflector::broadcaster; pub use reflector::reflector; pub use scheduler::scheduler; pub use utils::WatchStreamExt; diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..31c4bbbac 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -6,12 +6,17 @@ use std::{fmt::Debug, sync::Arc}; use educe::Educe; use futures::Stream; +#[cfg(feature = "unstable-runtime-subscribe")] +use kube_client::{api::DynamicObject, Resource}; use pin_project::pin_project; +#[cfg(feature = "unstable-runtime-subscribe")] use serde::de::DeserializeOwned; use std::task::ready; use crate::reflector::{ObjectRef, Store}; +#[cfg(feature = "unstable-runtime-subscribe")] use crate::watcher::Event; use async_broadcast::{InactiveReceiver, Receiver, Sender}; +#[cfg(feature = "unstable-runtime-subscribe")] use super::store::Writer; use super::Lookup; #[derive(Educe)] @@ -125,7 +130,7 @@ where impl Stream for ReflectHandle where K: Lookup + Clone, - K::DynamicType: Eq + std::hash::Hash + Clone + Default, + K::DynamicType: Eq + std::hash::Hash + Clone, { type Item = Arc; @@ -141,6 +146,97 @@ where } } +/// A handle to a shared dynamic object stream +/// +/// [`TypedReflectHandle`]s are created by calling [`subscribe()`] on a [`TypedDispatcher`], +/// Each shared stream reader should be polled independently and driven to readiness +/// to avoid deadlocks. When the [`TypedDispatcher`]'s buffer is filled, backpressure +/// will be applied on the root stream side. +/// +/// When the root stream is dropped, or it ends, all [`TypedReflectHandle`]s +/// subscribed to the shared stream will also terminate after all events yielded by +/// the root stream have been observed. This means [`TypedReflectHandle`] streams +/// can still be polled after the root stream has been dropped. +#[cfg(feature = "unstable-runtime-subscribe")] +#[pin_project] +pub struct TypedReflectHandle +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, + K: DeserializeOwned, +{ + #[pin] + rx: Receiver>, + store: Writer, +} + +#[cfg(feature = "unstable-runtime-subscribe")] +impl TypedReflectHandle +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone + Default, + K: DeserializeOwned, +{ + pub(super) fn new(rx: Receiver>) -> TypedReflectHandle { + Self { + rx, + // Initialize a ready store by default + store: { + let mut store: Writer = Default::default(); + store.apply_shared_watcher_event(&Event::InitDone(None)); + store + }, + } + } + + pub fn reader(&self) -> Store { + self.store.as_reader() + } +} + +#[cfg(feature = "unstable-runtime-subscribe")] +impl Stream for TypedReflectHandle +where + K: Resource + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone + Default, + K: DeserializeOwned, +{ + type Item = Arc; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + return match ready!(this.rx.as_mut().poll_next(cx)) { + Some(event) => { + let obj = match event { + Event::InitApply(obj, id) | Event::Apply(obj, id) + if obj.gvk() == Some(K::gvk(&Default::default())) => + { + obj.try_parse::().ok().map(Arc::new).inspect(|o| { + this.store.apply_shared_watcher_event(&Event::Apply(o.clone(), id)); + }) + } + Event::Delete(obj, id) if obj.gvk() == Some(K::gvk(&Default::default())) => { + obj.try_parse::().ok().map(Arc::new).inspect(|o| { + this.store.apply_shared_watcher_event(&Event::Delete(o.clone(), id)); + }) + } + _ => None, + }; + + // Skip propagating all objects which do not belong to the cache + if obj.is_none() { + continue; + } + + Poll::Ready(obj) + } + None => Poll::Ready(None), + }; + } + } +} + #[cfg(feature = "unstable-runtime-subscribe")] #[cfg(test)] pub(crate) mod test { @@ -165,12 +261,12 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Apply(foo.clone())), + Ok(Event::Apply(foo.clone(), None)), Err(Error::NoResourceVersion), - Ok(Event::Init), - Ok(Event::InitApply(foo)), - Ok(Event::InitApply(bar)), - Ok(Event::InitDone), + Ok(Event::Init(None)), + Ok(Event::InitApply(foo, None)), + Ok(Event::InitApply(bar, None)), + Ok(Event::InitDone(None)), ]); let (reader, writer) = reflector::store_shared(10); @@ -180,7 +276,7 @@ pub(crate) mod test { assert_eq!(reader.len(), 0); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); // Make progress and assert all events are seen @@ -192,19 +288,19 @@ pub(crate) mod test { assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init(_)))))); assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(..)))))); assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(..)))))); assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone(_)))))); assert_eq!(reader.len(), 2); assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); @@ -220,13 +316,13 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Delete(foo.clone())), - Ok(Event::Apply(foo.clone())), + Ok(Event::Delete(foo.clone(), None)), + Ok(Event::Apply(foo.clone(), None)), Err(Error::NoResourceVersion), - Ok(Event::Init), - Ok(Event::InitApply(foo.clone())), - Ok(Event::InitApply(bar.clone())), - Ok(Event::InitDone), + Ok(Event::Init(None)), + Ok(Event::InitApply(foo.clone(), None)), + Ok(Event::InitApply(bar.clone(), None)), + Ok(Event::InitDone(None)), ]); let foo = Arc::new(foo); @@ -239,13 +335,13 @@ pub(crate) mod test { // Deleted events should be skipped by subscriber. assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Delete(_)))) + Poll::Ready(Some(Ok(Event::Delete(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Pending); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); @@ -260,21 +356,21 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Init))) + Poll::Ready(Some(Ok(Event::Init(_)))) )); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitApply(_)))) + Poll::Ready(Some(Ok(Event::InitApply(..)))) )); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitApply(_)))) + Poll::Ready(Some(Ok(Event::InitApply(..)))) )); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(_)))) )); // these don't come back in order atm: @@ -293,11 +389,11 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Apply(foo.clone())), - Ok(Event::Init), - Ok(Event::InitApply(foo.clone())), - Ok(Event::InitApply(bar.clone())), - Ok(Event::InitDone), + Ok(Event::Apply(foo.clone(), None)), + Ok(Event::Init(None)), + Ok(Event::InitApply(foo.clone(), None)), + Ok(Event::InitApply(bar.clone(), None)), + Ok(Event::InitDone(None)), ]); let foo = Arc::new(foo); @@ -309,7 +405,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); @@ -321,25 +417,25 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Init))) + Poll::Ready(Some(Ok(Event::Init(_)))) )); assert_eq!(poll!(subscriber.next()), Poll::Pending); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitApply(_)))) + Poll::Ready(Some(Ok(Event::InitApply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Pending); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitApply(_)))) + Poll::Ready(Some(Ok(Event::InitApply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Pending); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(_)))) )); drop(reflect); @@ -360,9 +456,9 @@ pub(crate) mod test { let bar = testpod("bar"); let st = stream::iter([ //TODO: include a ready event here to avoid dealing with Init? - Ok(Event::Apply(foo.clone())), - Ok(Event::Apply(bar.clone())), - Ok(Event::Apply(foo.clone())), + Ok(Event::Apply(foo.clone(), None)), + Ok(Event::Apply(bar.clone(), None)), + Ok(Event::Apply(foo.clone(), None)), ]); let foo = Arc::new(foo); @@ -382,7 +478,7 @@ pub(crate) mod test { // we will still get an event from the root. assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); @@ -404,14 +500,14 @@ pub(crate) mod test { // had two. We repeat the same pattern. assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); assert!(matches!(poll!(reflect.next()), Poll::Pending)); assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone()))); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); // Poll again to drain the queue. assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 88f4f2910..802d3932d 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -1,6 +1,7 @@ //! Caches objects in memory mod dispatcher; +#[cfg(feature = "unstable-runtime-subscribe")] pub mod multi_dispatcher; mod object_ref; pub mod store; @@ -11,6 +12,12 @@ pub use self::{ use crate::watcher; use async_stream::stream; use futures::{Stream, StreamExt}; +#[cfg(feature = "unstable-runtime-subscribe")] +use kube_client::api::DynamicObject; +#[cfg(feature = "unstable-runtime-subscribe")] +use multi_dispatcher::BroadcastStream; +#[cfg(feature = "unstable-runtime-subscribe")] +use multi_dispatcher::MultiDispatcher; use std::hash::Hash; #[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared; pub use store::{store, Store}; @@ -134,6 +141,28 @@ where } } +// broadcaster uses a common stream of DynamicObject events to distribute to any subscribed typed watcher. +#[cfg(feature = "unstable-runtime-subscribe")] +pub fn broadcaster( + mut writer: MultiDispatcher, + mut stream: BroadcastStream, +) -> impl Stream +where + W: Stream>> + Unpin, +{ + stream! { + while let Some(event) = stream.next().await { + match event { + Ok(ev) => { + writer.broadcast_event(&ev).await; + yield Ok(ev); + }, + Err(ev) => yield Err(ev) + } + } + } +} + #[cfg(test)] mod tests { use super::{reflector, store, ObjectRef}; @@ -157,7 +186,7 @@ mod tests { }, ..ConfigMap::default() }; - reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))])) + reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone(), None))])) .map(|_| ()) .collect::<()>() .await; @@ -186,8 +215,8 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Apply(cm.clone())), - Ok(watcher::Event::Apply(updated_cm.clone())), + Ok(watcher::Event::Apply(cm.clone(), None)), + Ok(watcher::Event::Apply(updated_cm.clone(), None)), ]), ) .map(|_| ()) @@ -210,8 +239,8 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Apply(cm.clone())), - Ok(watcher::Event::Delete(cm.clone())), + Ok(watcher::Event::Apply(cm.clone(), None)), + Ok(watcher::Event::Delete(cm.clone(), None)), ]), ) .map(|_| ()) @@ -241,10 +270,10 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Apply(cm_a.clone())), - Ok(watcher::Event::Init), - Ok(watcher::Event::InitApply(cm_b.clone())), - Ok(watcher::Event::InitDone), + Ok(watcher::Event::Apply(cm_a.clone(), None)), + Ok(watcher::Event::Init(None)), + Ok(watcher::Event::InitApply(cm_b.clone(), None)), + Ok(watcher::Event::InitDone(None)), ]), ) .map(|_| ()) @@ -275,9 +304,9 @@ mod tests { ..ConfigMap::default() }; Ok(if deleted { - watcher::Event::Delete(obj) + watcher::Event::Delete(obj, None) } else { - watcher::Event::Apply(obj) + watcher::Event::Apply(obj, None) }) })), ) diff --git a/kube-runtime/src/reflector/multi_dispatcher.rs b/kube-runtime/src/reflector/multi_dispatcher.rs new file mode 100644 index 000000000..3eb993df6 --- /dev/null +++ b/kube-runtime/src/reflector/multi_dispatcher.rs @@ -0,0 +1,107 @@ +use std::{ + hash::Hash, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::watcher::Event; +use async_broadcast::{InactiveReceiver, Sender}; +use futures::{lock::Mutex, Stream, StreamExt as _}; +use kube_client::{api::DynamicObject, Resource}; +use serde::de::DeserializeOwned; + +use crate::watcher; + +use super::{dispatcher::TypedReflectHandle, Store}; + +#[derive(Clone)] +pub struct MultiDispatcher { + dispatch_tx: Sender>, + // An inactive reader that prevents the channel from closing until the + // writer is dropped. + _dispatch_rx: InactiveReceiver>, +} + +impl MultiDispatcher { + #[must_use] + pub fn new(buf_size: usize) -> Self { + // Create a broadcast (tx, rx) pair + let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size); + // The tx half will not wait for any receivers to be active before + // broadcasting events. If no receivers are active, events will be + // buffered. + dispatch_tx.set_await_active(false); + Self { + dispatch_tx, + _dispatch_rx: dispatch_rx.deactivate(), + } + } + + /// Return a handle to a typed subscriber + /// + /// Multiple subscribe handles may be obtained, by either calling + /// `subscribe` multiple times, or by calling `clone()` + /// + /// This function returns a `Some` when the [`Writer`] is constructed through + /// [`Writer::new_shared`] or [`store_shared`], and a `None` otherwise. + #[must_use] + pub fn subscribe(&self) -> (TypedReflectHandle, Store) + where + K: Resource + Clone + DeserializeOwned, + K::DynamicType: Eq + Clone + Hash + Default, + { + let sub = TypedReflectHandle::new(self.dispatch_tx.new_receiver()); + let reader = sub.reader(); + (sub, reader) + } + + /// Broadcast an event to any downstream listeners subscribed on the store + pub(crate) async fn broadcast_event(&mut self, event: &watcher::Event) { + match event { + // Broadcast stores are pre-initialized + watcher::Event::InitDone(_) => {} + ev => { + let _ = self.dispatch_tx.broadcast_direct(ev.clone()).await; + } + } + } +} + +/// `BroadcastStream` allows to stream shared list of dynamic objects, +/// sources of which can be changed at any moment. +pub struct BroadcastStream { + pub stream: Arc>, +} + +impl Clone for BroadcastStream { + fn clone(&self) -> Self { + Self { + stream: self.stream.clone(), + } + } +} + +impl BroadcastStream +where + W: Stream>> + Unpin, +{ + pub fn new(stream: Arc>) -> Self { + Self { stream } + } +} + +impl Stream for BroadcastStream +where + W: Stream>> + Unpin, +{ + type Item = W::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(mut stream) = self.stream.try_lock() { + return stream.poll_next_unpin(cx); + } + + Poll::Pending + } +} diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index d6d264dea..00f4eff1e 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -101,24 +101,46 @@ where /// Applies a single watcher event to the store pub fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { - watcher::Event::Apply(obj) => { - let key = obj.to_object_ref(self.dyntype.clone()); + watcher::Event::Apply(obj, id) => { + let obj = Arc::new(obj.clone()); + self.apply_shared_watcher_event(&watcher::Event::Apply(obj, *id)); + } + watcher::Event::Delete(obj, id) => { let obj = Arc::new(obj.clone()); - self.store.write().insert(key, obj); + self.apply_shared_watcher_event(&watcher::Event::Delete(obj, *id)); + } + watcher::Event::InitApply(obj, id) => { + let obj = Arc::new(obj.clone()); + self.apply_shared_watcher_event(&watcher::Event::InitApply(obj, *id)); + } + watcher::Event::Init(id) => { + self.apply_shared_watcher_event(&watcher::Event::Init(*id)); + } + watcher::Event::InitDone(id) => { + self.apply_shared_watcher_event(&watcher::Event::InitDone(*id)); } - watcher::Event::Delete(obj) => { + } + } + + /// Applies a single shared watcher event to the store + pub fn apply_shared_watcher_event(&mut self, event: &watcher::Event>) { + match event { + watcher::Event::Apply(obj, ..) => { + let key = obj.to_object_ref(self.dyntype.clone()); + self.store.write().insert(key, obj.clone()); + } + watcher::Event::Delete(obj, ..) => { let key = obj.to_object_ref(self.dyntype.clone()); self.store.write().remove(&key); } - watcher::Event::Init => { + watcher::Event::Init(_) => { self.buffer = AHashMap::new(); } - watcher::Event::InitApply(obj) => { + watcher::Event::InitApply(obj, ..) => { let key = obj.to_object_ref(self.dyntype.clone()); - let obj = Arc::new(obj.clone()); - self.buffer.insert(key, obj); + self.buffer.insert(key, obj.clone()); } - watcher::Event::InitDone => { + watcher::Event::InitDone(_) => { let mut store = self.store.write(); // Swap the buffer into the store @@ -141,14 +163,14 @@ where pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event) { if let Some(ref mut dispatcher) = self.dispatcher { match event { - watcher::Event::Apply(obj) => { + watcher::Event::Apply(obj, id) => { let obj_ref = obj.to_object_ref(self.dyntype.clone()); // TODO (matei): should this take a timeout to log when backpressure has // been applied for too long, e.g. 10s dispatcher.broadcast(obj_ref).await; } - watcher::Event::InitDone => { + watcher::Event::InitDone(_) => { let obj_refs: Vec<_> = { let store = self.store.read(); store.keys().cloned().collect() @@ -324,7 +346,7 @@ mod tests { ..ConfigMap::default() }; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone(), None)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } @@ -342,7 +364,7 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Apply(cm)); + store_w.apply_watcher_event(&watcher::Event::Apply(cm, None)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } @@ -358,7 +380,7 @@ mod tests { ..ConfigMap::default() }; let (store, mut writer) = store(); - writer.apply_watcher_event(&watcher::Event::Apply(cm.clone())); + writer.apply_watcher_event(&watcher::Event::Apply(cm.clone(), None)); assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } @@ -376,7 +398,7 @@ mod tests { let mut nsed_cm = cm.clone(); nsed_cm.metadata.namespace = Some("ns".to_string()); let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone(), None)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm)); } @@ -395,14 +417,14 @@ mod tests { let (reader, mut writer) = store::(); assert!(reader.is_empty()); - writer.apply_watcher_event(&watcher::Event::Apply(cm)); + writer.apply_watcher_event(&watcher::Event::Apply(cm, None)); assert_eq!(reader.len(), 1); assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none()); target_cm.metadata.name = Some("obj1".to_string()); target_cm.metadata.generation = Some(1234); - writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone())); + writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone(), None)); assert!(!reader.is_empty()); assert_eq!(reader.len(), 2); let found = reader.find(|k| k.metadata.generation == Some(1234)); diff --git a/kube-runtime/src/utils/event_decode.rs b/kube-runtime/src/utils/event_decode.rs index 2a0085120..b2991e220 100644 --- a/kube-runtime/src/utils/event_decode.rs +++ b/kube-runtime/src/utils/event_decode.rs @@ -29,15 +29,15 @@ where let mut me = self.project(); Poll::Ready(loop { let var_name = match ready!(me.stream.as_mut().poll_next(cx)) { - Some(Ok(Event::Apply(obj) | Event::InitApply(obj))) => Some(Ok(obj)), - Some(Ok(Event::Delete(obj))) => { + Some(Ok(Event::Apply(obj, ..) | Event::InitApply(obj, ..))) => Some(Ok(obj)), + Some(Ok(Event::Delete(obj, ..))) => { if *me.emit_deleted { Some(Ok(obj)) } else { continue; } } - Some(Ok(Event::Init | Event::InitDone)) => continue, + Some(Ok(Event::Init(_) | Event::InitDone(_))) => continue, Some(Err(err)) => Some(Err(err)), None => return Poll::Ready(None), }; @@ -56,14 +56,14 @@ pub(crate) mod tests { #[tokio::test] async fn watches_applies_uses_correct_stream() { let data = stream::iter([ - Ok(Event::Apply(0)), - Ok(Event::Apply(1)), - Ok(Event::Delete(0)), - Ok(Event::Apply(2)), - Ok(Event::InitApply(1)), - Ok(Event::InitApply(2)), + Ok(Event::Apply(0, None)), + Ok(Event::Apply(1, None)), + Ok(Event::Delete(0, None)), + Ok(Event::Apply(2, None)), + Ok(Event::InitApply(1, None)), + Ok(Event::InitApply(2, None)), Err(Error::NoResourceVersion), - Ok(Event::Apply(2)), + Ok(Event::Apply(2, None)), ]); let mut rx = pin!(EventDecode::new(data, false)); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); diff --git a/kube-runtime/src/utils/event_modify.rs b/kube-runtime/src/utils/event_modify.rs index 39a2ec909..c04f5c8da 100644 --- a/kube-runtime/src/utils/event_modify.rs +++ b/kube-runtime/src/utils/event_modify.rs @@ -54,9 +54,9 @@ pub(crate) mod test { #[tokio::test] async fn eventmodify_modifies_innner_value_of_event() { let st = stream::iter([ - Ok(Event::Apply(0)), + Ok(Event::Apply(0, None)), Err(Error::NoResourceVersion), - Ok(Event::InitApply(10)), + Ok(Event::InitApply(10, None)), ]); let mut ev_modify = pin!(EventModify::new(st, |x| { *x += 1; @@ -64,7 +64,7 @@ pub(crate) mod test { assert!(matches!( poll!(ev_modify.next()), - Poll::Ready(Some(Ok(Event::Apply(1)))) + Poll::Ready(Some(Ok(Event::Apply(1, None)))) )); assert!(matches!( @@ -75,7 +75,7 @@ pub(crate) mod test { let restarted = poll!(ev_modify.next()); assert!(matches!( restarted, - Poll::Ready(Some(Ok(Event::InitApply(x)))) if x == 11 + Poll::Ready(Some(Ok(Event::InitApply(x, _)))) if x == 11 )); assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None))); diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index e93354202..035f570e0 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -72,12 +72,12 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Apply(foo.clone())), + Ok(Event::Apply(foo.clone(), None)), Err(Error::NoResourceVersion), - Ok(Event::Init), - Ok(Event::InitApply(foo)), - Ok(Event::InitApply(bar)), - Ok(Event::InitDone), + Ok(Event::Init(None)), + Ok(Event::InitApply(foo, None)), + Ok(Event::InitApply(bar, None)), + Ok(Event::InitDone(None)), ]); let (reader, writer) = reflector::store(); @@ -86,7 +86,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Apply(_)))) + Poll::Ready(Some(Ok(Event::Apply(..)))) )); assert_eq!(reader.len(), 1); @@ -98,20 +98,20 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Init))) + Poll::Ready(Some(Ok(Event::Init(None)))) )); assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(..)))))); assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(..)))))); assert_eq!(reader.len(), 1); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(None)))) )); assert_eq!(reader.len(), 2); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 755320b38..f136b74b5 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -38,17 +38,17 @@ pub type Result = std::result::Result; /// Watch events returned from the [`watcher`] pub enum Event { /// An object was added or modified - Apply(K), + Apply(K, Option), /// An object was deleted /// /// NOTE: This should not be used for managing persistent state elsewhere, since /// events may be lost if the watcher is unavailable. Use Finalizers instead. - Delete(K), + Delete(K, Option), /// The watch stream was restarted. /// /// A series of `InitApply` events are expected to follow until all matching objects /// have been listed. This event can be used to prepare a buffer for `InitApply` events. - Init, + Init(Option), /// Received an object during `Init`. /// /// Objects returned here are either from the initial stream using the `StreamingList` strategy, @@ -56,7 +56,7 @@ pub enum Event { /// /// These events can be passed up if having a complete set of objects is not a concern. /// If you need to wait for a complete set, please buffer these events until an `InitDone`. - InitApply(K), + InitApply(K, Option), /// The initialisation is complete. /// /// This can be used as a signal to replace buffered store contents atomically. @@ -64,7 +64,7 @@ pub enum Event { /// /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted). - InitDone, + InitDone(Option), } impl Event { @@ -78,8 +78,8 @@ impl Event { )] pub fn into_iter_applied(self) -> impl Iterator { match self { - Self::Apply(obj) | Self::InitApply(obj) => Some(obj), - Self::Delete(_) | Self::Init | Self::InitDone => None, + Self::Apply(obj, ..) | Self::InitApply(obj, ..) => Some(obj), + Self::Delete(..) | Self::Init(..) | Self::InitDone(..) => None, } .into_iter() } @@ -95,8 +95,8 @@ impl Event { )] pub fn into_iter_touched(self) -> impl Iterator { match self { - Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj), - Self::Init | Self::InitDone => None, + Self::Apply(obj, ..) | Self::Delete(obj, ..) | Self::InitApply(obj, ..) => Some(obj), + Self::Init(_) | Self::InitDone(_) => None, } .into_iter() } @@ -121,8 +121,8 @@ impl Event { #[must_use] pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self { match &mut self { - Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj), - Self::Init | Self::InitDone => {} // markers, nothing to modify + Self::Apply(obj, ..) | Self::Delete(obj, ..) | Self::InitApply(obj, ..) => (f)(obj), + Self::Init(_) | Self::InitDone(_) => {} // markers, nothing to modify } self } @@ -279,6 +279,9 @@ pub struct Config { /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls. /// This is default enabled and should generally not be turned off. pub bookmarks: bool, + + /// Custom identifier for this watcher config, which will be assigned to initial events for objects distinction. + pub identifier: Option, } impl Default for Config { @@ -293,6 +296,7 @@ impl Default for Config { // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31 page_size: Some(500), initial_list_strategy: InitialListStrategy::ListWatch, + identifier: None, } } } @@ -406,6 +410,13 @@ impl Config { self } + /// Sets a unique identifier for the watcher config + #[must_use] + fn identified(mut self, identifier: u64) -> Self { + self.identifier = Some(identifier); + self + } + /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests. fn to_list_params(&self) -> ListParams { let (resource_version, version_match) = match self.list_semantic { @@ -498,11 +509,14 @@ where { match state { State::Empty => match wc.initial_list_strategy { - InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage { - continue_token: None, - objects: VecDeque::default(), - last_bookmark: None, - }), + InitialListStrategy::ListWatch => ( + Some(Ok(Event::Init(wc.identifier))), + State::InitPage { + continue_token: None, + objects: VecDeque::default(), + last_bookmark: None, + }, + ), InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { Ok(stream) => (None, State::InitialWatch { stream }), Err(err) => { @@ -521,17 +535,20 @@ where last_bookmark, } => { if let Some(next) = objects.pop_front() { - return (Some(Ok(Event::InitApply(next))), State::InitPage { - continue_token, - objects, - last_bookmark, - }); + return ( + Some(Ok(Event::InitApply(next, wc.identifier))), + State::InitPage { + continue_token, + objects, + last_bookmark, + }, + ); } // check if we need to perform more pages if continue_token.is_none() { if let Some(resource_version) = last_bookmark { // we have drained the last page - move on to next stage - return (Some(Ok(Event::InitDone)), State::InitListed { resource_version }); + return (Some(Ok(Event::InitDone(wc.identifier))), State::InitListed { resource_version }); } } let mut lp = wc.to_list_params(); @@ -545,11 +562,14 @@ where } // Buffer page here, causing us to return to this enum branch (State::InitPage) // until the objects buffer has drained - (None, State::InitPage { - continue_token, - objects: list.items.into_iter().collect(), - last_bookmark, - }) + ( + None, + State::InitPage { + continue_token, + objects: list.items.into_iter().collect(), + last_bookmark, + }, + ) } Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { @@ -564,7 +584,7 @@ where State::InitialWatch { mut stream } => { match stream.next().await { Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { - (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream }) + (Some(Ok(Event::InitApply(obj, wc.identifier))), State::InitialWatch { stream }) } Some(Ok(WatchEvent::Deleted(_obj))) => { // Kubernetes claims these events are impossible @@ -575,10 +595,13 @@ where Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - (Some(Ok(Event::InitDone)), State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + ( + Some(Ok(Event::InitDone(wc.identifier))), + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } else { (None, State::InitialWatch { stream }) } @@ -610,19 +633,23 @@ where } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream, - }), + Ok(stream) => ( + None, + State::Watching { + resource_version, + stream, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); } else { debug!("watch initlist error: {err:?}"); } - (Some(Err(Error::WatchStartFailed(err))), State::InitListed { - resource_version, - }) + ( + Some(Err(Error::WatchStartFailed(err))), + State::InitListed { resource_version }, + ) } } } @@ -635,10 +662,13 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Apply(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Apply(obj, wc.identifier))), + State::Watching { + resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -646,16 +676,22 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Delete(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Delete(obj, wc.identifier))), + State::Watching { + resource_version, + stream, + }, + ) } } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), + Some(Ok(WatchEvent::Bookmark(bm))) => ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -679,10 +715,13 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(Error::WatchFailed(err))), State::Watching { - resource_version, - stream, - }) + ( + Some(Err(Error::WatchFailed(err))), + State::Watching { + resource_version, + stream, + }, + ) } None => (None, State::InitListed { resource_version }), }, @@ -860,9 +899,9 @@ pub fn watch_object Some(Ok(Some(obj))), + Ok(Event::Apply(obj, ..) | Event::InitApply(obj, ..)) => Some(Ok(Some(obj))), // Pass up `None` for Deleted - Ok(Event::Delete(_)) => Some(Ok(None)), + Ok(Event::Delete(..)) => Some(Ok(None)), // Pass up `None` if the object wasn't seen in the initial list - Ok(Event::InitDone) if !obj_seen => Some(Ok(None)), + Ok(Event::InitDone(_)) if !obj_seen => Some(Ok(None)), // Ignore marker events - Ok(Event::Init | Event::InitDone) => None, + Ok(Event::Init(_) | Event::InitDone(_)) => None, // Bubble up errors Err(err) => Some(Err(err)), }