-
-
Notifications
You must be signed in to change notification settings - Fork 364
Description
Would you like to work on this feature?
Maybe
What problem are you trying to solve?
As a final step to allow stream sharing #1080 we need to allow StreamExt::stream_subscribe output to be able to be passed into the controller streams interfaces:
Controller::for_streamController::watches_streamController::owns_stream
Which currently is not possible because StreamExt::stream_subscribe produces Stream<Item = Result<Arc<K>, Error>>, but the input streams are normal watche streams of the form Stream<Item = Result<K, Error>>.
Arc wrapping can help reduce memory consumption + reduce awkward arc wrap points:
- reflector stores arc wrap already by cloning objects passed through from streams
- we already have to arc-wrap in
Controllerbefore calling out to thereconcileanderror_policyfns anyway
Describe the solution you'd like
We need the streams that watcher produces to ultimately end up with equal types for both the subscribepath and the non-subscribe path. Options are considered below, but personally, I think that if it's viable, we should experiment with arc-wrapping earlier.
Describe alternatives you've considered
1. Always Arc wrap output from EventFlatten
Arc wrap once it's intended to be consumed; when it's flattened (::applied_objects() or ::touched_objects()). It's late enough that it won't really affect any store implementations, and it will work with ::stream_subscribe.
While this is less breaking than the suggestion below, it also creates another copy of the object; as each Store is cloning:
kube/kube-runtime/src/reflector/store.rs
Lines 54 to 69 in c30b376
| watcher::Event::Applied(obj) => { | |
| let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); | |
| let obj = Arc::new(obj.clone()); | |
| self.store.write().insert(key, obj); | |
| } | |
| watcher::Event::Deleted(obj) => { | |
| let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); | |
| self.store.write().remove(&key); | |
| } | |
| watcher::Event::Restarted(new_objs) => { | |
| let new_objs = new_objs | |
| .iter() | |
| .map(|obj| { | |
| ( | |
| ObjectRef::from_obj_with(obj, self.dyntype.clone()), | |
| Arc::new(obj.clone()), |
2. Always Arc wrap output from watcher
A much more fundamental change to our api, but it will provide a much more consistent api, and possibly clone less huge k8s objects while passing through streams.
If we do it this deep, then we basically add it to step_trampolined where the objects are handled first:
kube/kube-runtime/src/watcher.rs
Lines 371 to 472 in c30b376
| async fn step_trampolined<A>( | |
| api: &A, | |
| wc: &Config, | |
| state: State<A::Value>, | |
| ) -> (Option<Result<Event<A::Value>>>, State<A::Value>) | |
| where | |
| A: ApiMode, | |
| A::Value: Resource + 'static, | |
| { | |
| match state { | |
| State::Empty => match api.list(&wc.to_list_params()).await { | |
| Ok(list) => { | |
| if let Some(resource_version) = list.metadata.resource_version { | |
| (Some(Ok(Event::Restarted(list.items))), State::InitListed { | |
| resource_version, | |
| }) | |
| } else { | |
| (Some(Err(Error::NoResourceVersion)), State::Empty) | |
| } | |
| } | |
| Err(err) => { | |
| if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { | |
| warn!("watch list error with 403: {err:?}"); | |
| } else { | |
| debug!("watch list error: {err:?}"); | |
| } | |
| (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) | |
| } | |
| }, | |
| State::InitListed { resource_version } => { | |
| match api.watch(&wc.to_watch_params(), &resource_version).await { | |
| Ok(stream) => (None, State::Watching { | |
| resource_version, | |
| stream, | |
| }), | |
| Err(err) => { | |
| if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { | |
| warn!("watch initlist error with 403: {err:?}"); | |
| } else { | |
| debug!("watch initlist error: {err:?}"); | |
| } | |
| ( | |
| Some(Err(err).map_err(Error::WatchStartFailed)), | |
| State::InitListed { resource_version }, | |
| ) | |
| } | |
| } | |
| } | |
| State::Watching { | |
| resource_version, | |
| mut stream, | |
| } => match stream.next().await { | |
| Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { | |
| let resource_version = obj.resource_version().unwrap(); | |
| (Some(Ok(Event::Applied(obj))), State::Watching { | |
| resource_version, | |
| stream, | |
| }) | |
| } | |
| Some(Ok(WatchEvent::Deleted(obj))) => { | |
| let resource_version = obj.resource_version().unwrap(); | |
| (Some(Ok(Event::Deleted(obj))), State::Watching { | |
| resource_version, | |
| stream, | |
| }) | |
| } | |
| Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { | |
| resource_version: bm.metadata.resource_version, | |
| stream, | |
| }), | |
| Some(Ok(WatchEvent::Error(err))) => { | |
| // HTTP GONE, means we have desynced and need to start over and re-list :( | |
| let new_state = if err.code == 410 { | |
| State::Empty | |
| } else { | |
| State::Watching { | |
| resource_version, | |
| stream, | |
| } | |
| }; | |
| if err.code == 403 { | |
| warn!("watcher watchevent error 403: {err:?}"); | |
| } else { | |
| debug!("error watchevent error: {err:?}"); | |
| } | |
| (Some(Err(err).map_err(Error::WatchError)), new_state) | |
| } | |
| Some(Err(err)) => { | |
| if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { | |
| warn!("watcher error 403: {err:?}"); | |
| } else { | |
| debug!("watcher error: {err:?}"); | |
| } | |
| (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { | |
| resource_version, | |
| stream, | |
| }) | |
| } | |
| None => (None, State::InitListed { resource_version }), | |
| }, | |
| } | |
| } |
This has the potential to help reduce cloning between streams and reflectors. Given reflector stores and Kubernetes objects account for most of the memory usage of typical rust controllers, this could be a worthwhile avenue.
However, not sure if this is a practical solution.
3. Internally Arc with Arc wrapper helper for WatchStreamExt
- Create a new
WatchStreamExt::arc(or something like that) to map the OkKelements to anArc<K>. - Internally in
Controller, call::arcon all managed streams - Tell users using the unstable streams interface that they need an extra
::arc()call to be compatible
Fairly simple. Just propagate the Arc inside Controller only, and do it at an earlier point.
This does not solve memory issues with Store, and it puts the onus on the user to figure out when to arc-wrap or not.
Documentation, Adoption, Migration Strategy
Examples, doc tests, kube.rs controller guide updates where relevant.
I suspect this can be done in a mostly non-disruptive way. We have arc-wrapped a bunch of things in the past, and it's generally been a good idea.
Target crate for feature
kube-runtime
Metadata
Metadata
Assignees
Labels
Type
Projects
Status