- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 364
          Arc wrap watcher output
          #1266
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
  
    Arc wrap watcher output
  
  #1266
              
            Conversation
Signed-off-by: Matei David <[email protected]>
Signed-off-by: Matei David <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks a lot for doing this! having seen this play out, i personally think this approach makes more sense than opting into Arc at a later level.
if we get this to pass tests, then there a few things that would be good to have;
there is at least one place that it probably can be propagated into on the controller side (passing into user reconciler):
kube/kube-runtime/src/controller/mod.rs
Lines 306 to 308 in 823f4b8
| reconciler_span | |
| .in_scope(|| reconciler(Arc::clone(&obj), context.clone())) | |
| .into_future() | 
and there's also probably room for a stream sharing controller example (maybe a dumb thing with two co-hosted Controller instances re-using the same crd stream?) to verify that it all works.
Signed-off-by: Matei David <[email protected]>
| Codecov Report
 
 Additional details and impacted files@@            Coverage Diff             @@
##             main    #1266      +/-   ##
==========================================
+ Coverage   72.31%   72.76%   +0.44%     
==========================================
  Files          75       75              
  Lines        6343     6179     -164     
==========================================
- Hits         4587     4496      -91     
+ Misses       1756     1683      -73     
 | 
| @clux my pleasure! Thanks for having a look. I made some changes to get CI to pass. I'm surprised the change came out as small as it did. All in all, having worked on it, there are some nice things about the API change (particularly around the reflector, using shared memory to pass all of the state around, etc.), but I do think the change is pretty significant and scary. I'm happy to investigate more if either you or @nightkr want to see what other alternatives look like. Problems: 
 Still left to do: 
 cc @Dav1dde, I know you've also worked on parts of this and would love it if you could provide some feedback. No pressure! | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on two of the issues; user facing changes, and the modify() problem.
| fn handle_event(ev: Arc<Event>) -> anyhow::Result<()> { | ||
| info!( | ||
| "Event: \"{}\" via {} {}", | ||
| ev.message.unwrap().trim(), | ||
| ev.involved_object.kind.unwrap(), | ||
| ev.involved_object.name.unwrap() | ||
| ev.message.as_ref().unwrap().trim(), | ||
| ev.involved_object.kind.as_ref().unwrap(), | ||
| ev.involved_object.name.as_ref().unwrap() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like some of the more awkward user facing interactions can be simplified a bit by making fns like these take a &Event (say) and call as_ref() earlier as you've done in for wait.rs
| Event::Applied(obj) | Event::Deleted(obj) => { | ||
| if let Some(obj) = Arc::get_mut(obj) { | ||
| (f)(obj) | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you've noted, this is likely one of the more awkward problems that falls out of this. If this works like you are saying, i.e.
- watcherstream.modify(f).reflect(writer)modifies and works
- watcherstream.reflect(writer).modify(f)does not modify because the store has a ref to the arc
then this really highlights the fundamental pros and cons this PR is trying to make; we are fundamentally passing the same object around everywhere, and rely on users to clone if they need to (say via arc's clone-on-write behaviour on Arc::make_mut).
Given that reconcilers are already Arcd (and they are the main stream-end consumers) I think this is not too bad on the user side, plus it makes reconcilers and direct watch stream consumers the same, which is nice.
For the particular store into modify case, there are two options as i see:
- make modification after storing an error via  Arc::try_unwrap- propagating that error frommodify
- use Arc::make_mutin modify to allow the separation, while pasing the unmodified object around if chosen, having a copy shunted through the remains of the watcher stream.
I personally lead toward 1 because if people have a fn to modify, they can just apply that at the end of their controller stack in a reconciler helper that ultimately does a short term clone, rather than force the clone early on (when the object might be queued for minutes).
There is also a third option; type enforcement of the order (e.g. have reflector modify the output type in a way that makes it incompatible with modify). Although I am not sure how to do that cleanly.
| About  let w1 = watcher(api, cfg).reflect(writer); // a flattened stream of all events
let w2 = w1.subscribe().predicate_filter(p); // a copy of the flattened stream with more specific filters appliedand pass w1 and w2 to different controllers, so that they'd reuse the reflector, and filter more specifically after. as it stands, if the stream is unflattened, then both sides might have to make a reflector IIUC. I don't remember the reason for this, might have to dig in the original PR at #1131, but this feels wrong looking at it now. | 
Signed-off-by: Matei David <[email protected]>
| Coming back to this. I merged the latest main (the merge was pretty pain free actually). I'm going to prototype everything again in a different branch just so that I can reload the context into my brain and get acquainted with some of the controller logic. There are still 2 issues: 
 Will keep folks up to date! Thanks for bearing with me :) | 
| The time has come to finally retire this. Superseded by #1449. | 
Motivation
Controller machinery has been changed to support arbitrary streams; users may instantiate controllers with already established watches, or already created stores. A subscriber interface has also been added to allow for stream sharing; due to the implementation details of the subscriber interface, the output is incompatible with what the controller expects.
This change started as a proof of concept based on #1189. It attempts to bridge the two APIs by
Arcing the output from watchers. Since this is done at a very low level in the codepath, both the controller and other dependent helpers (e.g. the reflector or event flattener) have been changed to both read and write arc'd streams. As an added benefit, arced streams should relieve some memory pressure by avoiding clones on k8s objects.Solution
step_trampolinedto arc the object. This is propagated all the way up to thewatcher().Arc