From 158ad28c219a59cdf6ae377b14547ae189609eb3 Mon Sep 17 00:00:00 2001 From: clux Date: Thu, 18 Apr 2024 22:54:40 +0100 Subject: [PATCH 1/2] Remove abandoned `StreamSubscribe` implementation This was never properly usable without further integration, and as such was never stabilised. It is replaced via #1449 Signed-off-by: clux --- kube-runtime/src/utils/stream_subscribe.rs | 232 --------------------- kube-runtime/src/utils/watch_ext.rs | 68 ------ 2 files changed, 300 deletions(-) delete mode 100644 kube-runtime/src/utils/stream_subscribe.rs diff --git a/kube-runtime/src/utils/stream_subscribe.rs b/kube-runtime/src/utils/stream_subscribe.rs deleted file mode 100644 index 2f0be443d..000000000 --- a/kube-runtime/src/utils/stream_subscribe.rs +++ /dev/null @@ -1,232 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use futures::{stream, Stream}; -use pin_project::pin_project; -use std::{fmt, sync::Arc}; -use tokio::sync::{broadcast, broadcast::error::RecvError}; - -const CHANNEL_CAPACITY: usize = 128; - -/// Exposes the [`StreamSubscribe::subscribe()`] method which allows additional -/// consumers of events from a stream without consuming the stream itself. -/// -/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`] -/// error. The subscriber can then decide to abort its task or tolerate the lost events. -/// -/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams -/// will also end. -/// -/// ## Warning -/// -/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams -/// will never receive any events. -#[pin_project] -#[must_use = "subscribers will not get events unless this stream is polled"] -pub struct StreamSubscribe -where - S: Stream, -{ - #[pin] - stream: S, - sender: broadcast::Sender>>, -} - -impl StreamSubscribe { - pub fn new(stream: S) -> Self { - let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); - - Self { stream, sender } - } - - /// Subscribe to events from this stream - #[must_use = "streams do nothing unless polled"] - pub fn subscribe(&self) -> impl Stream, Error>> { - stream::unfold(self.sender.subscribe(), |mut rx| async { - match rx.recv().await { - Ok(Some(obj)) => Some((Ok(obj), rx)), - Err(RecvError::Lagged(amt)) => Some((Err(Error::Lagged(amt)), rx)), - _ => None, - } - }) - } -} - -impl Stream for StreamSubscribe { - type Item = Arc; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let item = this.stream.poll_next(cx); - - match item { - Poll::Ready(Some(item)) => { - #[allow(clippy::arc_with_non_send_sync)] - // ^ this whole module is unstable and does not have a PoC - let item = Arc::new(item); - this.sender.send(Some(item.clone())).ok(); - Poll::Ready(Some(item)) - } - Poll::Ready(None) => { - this.sender.send(None).ok(); - Poll::Ready(None) - } - Poll::Pending => Poll::Pending, - } - } -} - -/// An error returned from the inner stream of a [`StreamSubscribe`]. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Error { - /// The subscriber lagged too far behind. Polling again will return - /// the oldest event still retained. - /// - /// Includes the number of skipped events. - Lagged(u64), -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Lagged(amt) => write!(f, "subscriber lagged by {amt}"), - } - } -} - -impl std::error::Error for Error {} - -#[cfg(test)] -mod tests { - use std::pin::pin; - - use super::*; - use futures::{poll, stream, StreamExt}; - - #[tokio::test] - async fn stream_subscribe_continues_to_propagate_values() { - let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let mut rx = pin!(StreamSubscribe::new(rx)); - - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0))))); - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1))))); - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2))))); - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(3))))); - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(4))))); - assert_eq!(poll!(rx.next()), Poll::Ready(None)); - } - - #[tokio::test] - async fn all_subscribers_get_events() { - let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]; - let rx = stream::iter(events); - let mut rx = pin!(StreamSubscribe::new(rx)); - - let mut rx_s1 = pin!(rx.subscribe()); - let mut rx_s2 = pin!(rx.subscribe()); - - // Subscribers are pending until we start consuming the stream - assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); - assert_eq!(poll!(rx_s2.next()), Poll::Pending, "rx_s2"); - - for item in events { - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx"); - let expected = Poll::Ready(Some(Ok(Arc::new(item)))); - assert_eq!(poll!(rx_s1.next()), expected, "rx_s1"); - assert_eq!(poll!(rx_s2.next()), expected, "rx_s2"); - } - - // Ensure that if the stream is closed, all subscribers are closed - assert_eq!(poll!(rx.next()), Poll::Ready(None), "rx"); - assert_eq!(poll!(rx_s1.next()), Poll::Ready(None), "rx_s1"); - assert_eq!(poll!(rx_s2.next()), Poll::Ready(None), "rx_s2"); - } - - #[tokio::test] - async fn subscribers_can_catch_up_to_the_main_stream() { - let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::>(); - let rx = stream::iter(events.clone()); - let mut rx = pin!(StreamSubscribe::new(rx)); - - let mut rx_s1 = pin!(rx.subscribe()); - - for item in events.clone() { - assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",); - } - - for item in events { - assert_eq!( - poll!(rx_s1.next()), - Poll::Ready(Some(Ok(Arc::new(item)))), - "rx_s1" - ); - } - } - - #[tokio::test] - async fn if_the_subscribers_lag_they_get_a_lagged_error_as_the_next_event() { - // The broadcast channel rounds the capacity up to the next power of two. - let max_capacity = CHANNEL_CAPACITY.next_power_of_two(); - let overflow = 5; - let events = (0..max_capacity + overflow).collect::>(); - let rx = stream::iter(events.clone()); - let mut rx = pin!(StreamSubscribe::new(rx)); - - let mut rx_s1 = pin!(rx.subscribe()); - - // Consume the entire stream, overflowing the inner channel - for _ in events { - rx.next().await; - } - - assert_eq!( - poll!(rx_s1.next()), - Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))), - ); - - let expected_next_event = overflow; - assert_eq!( - poll!(rx_s1.next()), - Poll::Ready(Some(Ok(Arc::new(expected_next_event)))), - ); - } - - #[tokio::test] - async fn a_lagging_subscriber_does_not_impact_a_well_behaved_subscriber() { - // The broadcast channel rounds the capacity up to the next power of two. - let max_capacity = CHANNEL_CAPACITY.next_power_of_two(); - let overflow = 5; - let events = (0..max_capacity + overflow).collect::>(); - let rx = stream::iter(events.clone()); - let mut rx = pin!(StreamSubscribe::new(rx)); - - let mut rx_s1 = pin!(rx.subscribe()); - let mut rx_s2 = pin!(rx.subscribe()); - - for event in events { - assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); - - rx.next().await; - - assert_eq!( - poll!(rx_s1.next()), - Poll::Ready(Some(Ok(Arc::new(event)))), - "rx_s1" - ); - } - - assert_eq!( - poll!(rx_s2.next()), - Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))), - "rx_s2" - ); - - let expected_next_event = overflow; - assert_eq!( - poll!(rx_s2.next()), - Poll::Ready(Some(Ok(Arc::new(expected_next_event)))), - "rx_s2" - ); - } -} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index f2033fc3d..13adef81b 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,7 +1,5 @@ #[cfg(feature = "unstable-runtime-predicates")] use crate::utils::predicate::{Predicate, PredicateFilter}; -#[cfg(feature = "unstable-runtime-subscribe")] -use crate::utils::stream_subscribe::StreamSubscribe; use crate::{ utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff}, watcher, @@ -128,72 +126,6 @@ pub trait WatchStreamExt: Stream { PredicateFilter::new(self, predicate) } - /// Create a [`StreamSubscribe`] from a [`watcher()`] stream. - /// - /// The [`StreamSubscribe::subscribe()`] method which allows additional consumers - /// of events from a stream without consuming the stream itself. - /// - /// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`](crate::utils::stream_subscribe::Error::Lagged) - /// error. The subscriber can then decide to abort its task or tolerate the lost events. - /// - /// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams - /// will also end. - /// - /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature. - /// - /// ## Warning - /// - /// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams - /// will never receive any events. - /// - /// # Usage - /// - /// ``` - /// use futures::{Stream, StreamExt}; - /// use std::{fmt::Debug, sync::Arc}; - /// use kube_runtime::{watcher, WatchStreamExt}; - /// - /// fn explain_events( - /// stream: S, - /// ) -> ( - /// impl Stream, watcher::Error>>> + Send + Sized + 'static, - /// impl Stream + Send + Sized + 'static, - /// ) - /// where - /// K: Clone + Debug + Send + Sync + 'static, - /// S: Stream, watcher::Error>> + Send + Sized + 'static, - /// { - /// // Create a stream that can be subscribed to - /// let stream_subscribe = stream.stream_subscribe(); - /// // Create a subscription to that stream - /// let subscription = stream_subscribe.subscribe(); - /// - /// // Create a stream of descriptions of the events - /// let explain_stream = subscription.filter_map(|event| async move { - /// // We don't care about lagged events so we can throw that error away - /// match event.ok()?.as_ref() { - /// Ok(watcher::Event::Applied(event)) => { - /// Some(format!("An object was added or modified: {event:?}")) - /// } - /// Ok(_) => todo!("explain other events"), - /// // We don't care about watcher errors either - /// Err(_) => None, - /// } - /// }); - /// - /// // We now still have the original stream, and a secondary stream of explanations - /// (stream_subscribe, explain_stream) - /// } - /// ``` - #[cfg(feature = "unstable-runtime-subscribe")] - fn stream_subscribe(self) -> StreamSubscribe - where - Self: Stream, watcher::Error>> + Send + Sized + 'static, - K: Clone, - { - StreamSubscribe::new(self) - } - /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`] /// /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`]. From 048d4762a1b0005d8a718c2964f7ffff896d092e Mon Sep 17 00:00:00 2001 From: clux Date: Thu, 18 Apr 2024 23:00:25 +0100 Subject: [PATCH 2/2] forgot two imports Signed-off-by: clux --- kube-runtime/src/utils/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index b5039fab7..2ae546bc8 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -7,7 +7,6 @@ mod event_modify; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; mod reflect; mod stream_backoff; -#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; @@ -17,8 +16,6 @@ pub use event_modify::EventModify; pub use predicate::{predicates, Predicate, PredicateFilter}; pub use reflect::Reflect; pub use stream_backoff::StreamBackoff; -#[cfg(feature = "unstable-runtime-subscribe")] -pub use stream_subscribe::StreamSubscribe; pub use watch_ext::WatchStreamExt; use futures::{