diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 6f183ab8f..179ac2c50 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -6,7 +6,7 @@ use crate::utils::{Backoff, ResetTimerBackoff}; use backon::BackoffBuilder; use educe::Educe; -use futures::{stream::BoxStream, Stream, StreamExt}; +use futures::{poll, stream::BoxStream, Stream, StreamExt}; use kube_client::{ api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams}, core::{metadata::PartialObjectMeta, ObjectList, Selector}, @@ -14,7 +14,7 @@ use kube_client::{ Api, Error as ClientErr, }; use serde::de::DeserializeOwned; -use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration}; +use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, task::Poll, time::Duration}; use thiserror::Error; use tracing::{debug, error, warn}; @@ -107,6 +107,16 @@ enum State { objects: VecDeque, last_bookmark: Option, }, + /// Completed the first page of the LIST operation, now transitioning to the watch phase + /// while continuing to list subsequent pages + WatchedInitPage { + continue_token: Option, + objects: VecDeque, + last_bookmark: Option, + #[educe(Debug(ignore))] + stream: BoxStream<'static, kube_client::Result>>, + stream_events: VecDeque>, + }, /// Kubernetes 1.27 Streaming Lists /// The initial watch is in progress InitialWatch { @@ -115,6 +125,14 @@ enum State { }, /// The initial LIST was successful, so we should move on to starting the actual watch. InitListed { resource_version: String }, + /// After completing the list operation, process the backlog of cached events from the watch + /// stream. + WatchedInitListed { + resource_version: String, + #[educe(Debug(ignore))] + stream: BoxStream<'static, kube_client::Result>>, + stream_events: VecDeque>, + }, /// The watch is in progress, from this point we just return events from the server. /// /// If the connection is disrupted then we propagate the error but try to restart the watch stream by @@ -183,6 +201,11 @@ pub enum InitialListStrategy { /// When using this mode, you can configure the `page_size` on the watcher. #[default] ListWatch, + /// List first, Starts watching after retrieving the first page of the list. + /// + /// Suitable for clusters with a large number of resources where list operations are slow. + /// Prevents resource version expiration caused by waiting for the entire list operation to complete before starting the watch. + ListWatchParallel, /// Kubernetes 1.27 Streaming Lists /// /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists), @@ -461,11 +484,13 @@ where { match state { State::Empty => match wc.initial_list_strategy { - InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage { - continue_token: None, - objects: VecDeque::default(), - last_bookmark: None, - }), + InitialListStrategy::ListWatch | InitialListStrategy::ListWatchParallel => { + (Some(Ok(Event::Init)), State::InitPage { + continue_token: None, + objects: VecDeque::default(), + last_bookmark: None, + }) + } InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { Ok(stream) => (None, State::InitialWatch { stream }), Err(err) => { @@ -499,6 +524,110 @@ where } let mut lp = wc.to_list_params(); lp.continue_token = continue_token; + match api.list(&lp).await { + Ok(list) => { + let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty()); + let continue_token = list.metadata.continue_.filter(|s| !s.is_empty()); + if last_bookmark.is_none() && continue_token.is_none() { + return (Some(Err(Error::NoResourceVersion)), State::Empty); + } + match wc.initial_list_strategy { + InitialListStrategy::ListWatch => { + // we have drained the last page - move on to next stage + (None, State::InitPage { + continue_token, + objects: list.items.into_iter().collect(), + last_bookmark, + }) + } + InitialListStrategy::ListWatchParallel => { + // start watch + match api + .watch(&wc.to_watch_params(), &last_bookmark.clone().unwrap()) + .await + { + Ok(stream) => (None, State::WatchedInitPage { + continue_token, + objects: list.items.into_iter().collect(), + last_bookmark: last_bookmark, + stream, + stream_events: VecDeque::default(), + }), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + (Some(Err(Error::WatchStartFailed(err))), State::Empty) + } + } + } + _ => unreachable!(), + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(Error::InitialListFailed(err))), State::Empty) + } + } + } + State::WatchedInitPage { + continue_token, + mut objects, + last_bookmark, + mut stream, + mut stream_events, + } => { + if let Some(next) = objects.pop_front() { + return (Some(Ok(Event::InitApply(next))), State::WatchedInitPage { + continue_token, + objects, + last_bookmark, + stream, + stream_events, + }); + } + // Attempt to asynchronously fetch events from the Watch Stream and cache them. + // If an error occurs at this stage, restart the list operation. + loop { + let event = poll!(stream.next()); + match event { + Poll::Ready(Some(Ok(WatchEvent::Error(err)))) => { + return (Some(Err(Error::WatchError(err))), State::default()); + } + Poll::Ready(Some(Ok(event))) => { + stream_events.push_back(event); + } + Poll::Ready(Some(Err(err))) => { + return (Some(Err(Error::WatchFailed(err))), State::default()); + } + Poll::Ready(None) => { + // Stream ended, we need to restart the list operation + return (None, State::default()); + } + Poll::Pending => { + break; + } + } + } + // check if we need to perform more pages + if continue_token.is_none() { + if let Some(resource_version) = last_bookmark { + // we have drained the last page - move on to next stage + return (Some(Ok(Event::InitDone)), State::WatchedInitListed { + resource_version, + stream, + stream_events, + }); + } + } + let mut lp = wc.to_list_params(); + lp.continue_token = continue_token; match api.list(&lp).await { Ok(list) => { let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty()); @@ -508,10 +637,12 @@ where } // Buffer page here, causing us to return to this enum branch (State::InitPage) // until the objects buffer has drained - (None, State::InitPage { + (None, State::WatchedInitPage { continue_token, objects: list.items.into_iter().collect(), last_bookmark, + stream, + stream_events, }) } Err(err) => { @@ -589,6 +720,52 @@ where } } } + State::WatchedInitListed { + resource_version, + stream, + mut stream_events, + } => { + if let Some(event) = stream_events.pop_front() { + match event { + WatchEvent::Added(obj) | WatchEvent::Modified(obj) => { + let resource_version = obj.resource_version().unwrap_or_default(); + return if resource_version.is_empty() { + (Some(Err(Error::NoResourceVersion)), State::default()) + } else { + (Some(Ok(Event::Apply(obj))), State::WatchedInitListed { + resource_version, + stream, + stream_events, + }) + }; + } + WatchEvent::Deleted(obj) => { + let resource_version = obj.resource_version().unwrap_or_default(); + return if resource_version.is_empty() { + (Some(Err(Error::NoResourceVersion)), State::default()) + } else { + (Some(Ok(Event::Delete(obj))), State::WatchedInitListed { + resource_version, + stream, + stream_events, + }) + }; + } + WatchEvent::Bookmark(bm) => { + return (None, State::WatchedInitListed { + resource_version: bm.metadata.resource_version, + stream, + stream_events, + }) + } + _ => unreachable!(), + } + } + (None, State::Watching { + resource_version, + stream, + }) + } State::Watching { resource_version, mut stream,