@@ -39,7 +39,7 @@ use crate::{
3939 OperationRoster , Blocker , ManageInput , ChannelQueue , UnhandledErrors ,
4040 OperationSetup , OperationRequest , OperationResult , Operation , AddOperation ,
4141 OrBroken , OperationCleanup , ChannelItem , OperationError , Broken , ScopeStorage ,
42- OperationReachability , ReachabilityResult , emit_disposal, Disposal ,
42+ OperationReachability , ReachabilityResult , emit_disposal, Disposal , StreamPack ,
4343} ;
4444
4545struct JobWaker {
@@ -70,7 +70,7 @@ impl WakeQueue {
7070}
7171
7272#[ derive( Component ) ]
73- pub ( crate ) struct OperateTask < Response : ' static + Send + Sync > {
73+ pub ( crate ) struct OperateTask < Response : ' static + Send + Sync , Streams : StreamPack > {
7474 source : Entity ,
7575 session : Entity ,
7676 node : Entity ,
@@ -81,9 +81,10 @@ pub(crate) struct OperateTask<Response: 'static + Send + Sync> {
8181 disposal : Option < Disposal > ,
8282 being_cleaned : bool ,
8383 finished_normally : bool ,
84+ _ignore : std:: marker:: PhantomData < Streams > ,
8485}
8586
86- impl < Response : ' static + Send + Sync > OperateTask < Response > {
87+ impl < Response : ' static + Send + Sync , Streams : StreamPack > OperateTask < Response , Streams > {
8788 pub ( crate ) fn new (
8889 source : Entity ,
8990 session : Entity ,
@@ -104,6 +105,7 @@ impl<Response: 'static + Send + Sync> OperateTask<Response> {
104105 disposal : None ,
105106 being_cleaned : false ,
106107 finished_normally : false ,
108+ _ignore : Default :: default ( ) ,
107109 }
108110 }
109111
@@ -121,7 +123,11 @@ impl<Response: 'static + Send + Sync> OperateTask<Response> {
121123 }
122124}
123125
124- impl < Response : ' static + Send + Sync > Drop for OperateTask < Response > {
126+ impl < Response , Streams > Drop for OperateTask < Response , Streams >
127+ where
128+ Response : ' static + Send + Sync ,
129+ Streams : StreamPack ,
130+ {
125131 fn drop ( & mut self ) {
126132 if self . finished_normally {
127133 // The task finished normally so no special action needs to be taken
@@ -157,7 +163,11 @@ impl<Response: 'static + Send + Sync> Drop for OperateTask<Response> {
157163 }
158164}
159165
160- impl < Response : ' static + Send + Sync > Operation for OperateTask < Response > {
166+ impl < Response , Streams > Operation for OperateTask < Response , Streams >
167+ where
168+ Response : ' static + Send + Sync ,
169+ Streams : StreamPack ,
170+ {
161171 fn setup ( self , OperationSetup { source, world } : OperationSetup ) -> OperationResult {
162172 let wake_queue = world. get_resource_or_insert_with ( || WakeQueue :: new ( ) ) ;
163173 let waker = Arc :: new ( JobWaker {
@@ -172,7 +182,7 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
172182 . insert ( (
173183 self ,
174184 JobWakerStorage ( waker) ,
175- StopTask ( stop_task :: < Response > ) ,
185+ StopTask ( stop_task :: < Response , Streams > ) ,
176186 ) )
177187 . set_parent ( node) ;
178188
@@ -192,7 +202,7 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
192202 let mut source_mut = world. get_entity_mut ( source) . or_not_ready ( ) ?;
193203 // If the task has been stopped / cancelled then OperateTask will have
194204 // been removed, even if it has not despawned yet.
195- let mut operation = source_mut. get_mut :: < OperateTask < Response > > ( ) . or_not_ready ( ) ?;
205+ let mut operation = source_mut. get_mut :: < OperateTask < Response , Streams > > ( ) . or_not_ready ( ) ?;
196206 if operation. being_cleaned {
197207 // The operation is being cleaned up, so the task will not be
198208 // available and there will be nothing for us to do here. We should
@@ -228,13 +238,35 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
228238 // ChannelQueue has been processed so that any streams from this
229239 // task will be delivered before the final output.
230240 let r = world. entity_mut ( target) . defer_input ( session, result, roster) ;
231- world. get_mut :: < OperateTask < Response > > ( source) . or_broken ( ) ?. finished_normally = true ;
241+ world. get_mut :: < OperateTask < Response , Streams > > ( source) . or_broken ( ) ?. finished_normally = true ;
232242 cleanup_task :: < Response > ( session, source, node, unblock, being_cleaned, world, roster) ;
243+
244+ if Streams :: has_streams ( ) {
245+ if let Some ( scope) = world. get :: < ScopeStorage > ( node) {
246+ // When an async task with any number of streams >= 1 is
247+ // finished, we should always do a disposal notification
248+ // to force a reachability check. Normally there are
249+ // specific events that prompt us to check reachability,
250+ // but if a reachability test occurred while the async
251+ // node was running and the reachability depends on a
252+ // stream which may or may not have been emitted, then
253+ // the reachability test may have concluded with a false
254+ // positive, and it needs to be rechecked now that the
255+ // async node has finished.
256+ //
257+ // TODO(@mxgrey): Make this more efficient, e.g. only
258+ // trigger this disposal if we detected that a
259+ // reachability test happened while this task was
260+ // running.
261+ roster. disposed ( scope. get ( ) , session) ;
262+ }
263+ }
264+
233265 r?;
234266 }
235267 Poll :: Pending => {
236268 // Task is still running
237- if let Some ( mut operation) = world. get_mut :: < OperateTask < Response > > ( source) {
269+ if let Some ( mut operation) = world. get_mut :: < OperateTask < Response , Streams > > ( source) {
238270 operation. task = Some ( task) ;
239271 operation. blocker = unblock;
240272 world. entity_mut ( source) . insert ( JobWakerStorage ( waker) ) ;
@@ -249,7 +281,7 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
249281 || ChannelQueue :: default ( )
250282 ) . sender . clone ( ) ;
251283
252- let operation = OperateTask :: new (
284+ let operation = OperateTask :: < _ , Streams > :: new (
253285 source, session, node, target, task, unblock, sender,
254286 ) ;
255287
@@ -267,7 +299,7 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
267299 let session = clean. session ;
268300 let source = clean. source ;
269301 let mut source_mut = clean. world . get_entity_mut ( source) . or_broken ( ) ?;
270- let mut operation = source_mut. get_mut :: < OperateTask < Response > > ( ) . or_broken ( ) ?;
302+ let mut operation = source_mut. get_mut :: < OperateTask < Response , Streams > > ( ) . or_broken ( ) ?;
271303 operation. being_cleaned = true ;
272304 let node = operation. node ;
273305 let task = operation. task . take ( ) ;
@@ -292,7 +324,7 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
292324 fn is_reachable ( reachability : OperationReachability ) -> ReachabilityResult {
293325 let session = reachability. world
294326 . get_entity ( reachability. source ) . or_broken ( ) ?
295- . get :: < OperateTask < Response > > ( ) . or_broken ( ) ?. session ;
327+ . get :: < OperateTask < Response , Streams > > ( ) . or_broken ( ) ?. session ;
296328 Ok ( session == reachability. session )
297329 }
298330}
@@ -351,13 +383,13 @@ fn cleanup_task<Response>(
351383#[ derive( Component , Clone , Copy ) ]
352384pub ( crate ) struct StopTask ( pub ( crate ) fn ( OperationRequest , Disposal ) -> OperationResult ) ;
353385
354- fn stop_task < Response : ' static + Send + Sync > (
386+ fn stop_task < Response : ' static + Send + Sync , Streams : StreamPack > (
355387 OperationRequest { source, world, .. } : OperationRequest ,
356388 disposal : Disposal ,
357389) -> OperationResult {
358390 let mut operation = world
359391 . get_entity_mut ( source) . or_broken ( ) ?
360- . take :: < OperateTask < Response > > ( ) . or_broken ( ) ?;
392+ . take :: < OperateTask < Response , Streams > > ( ) . or_broken ( ) ?;
361393
362394 operation. disposal = Some ( disposal) ;
363395 drop ( operation) ;
0 commit comments