88//! environment.
99
1010#[ cfg( all( test, feature = "std" ) ) ]
11- use crate :: sync:: Mutex ;
11+ use crate :: sync:: { Arc , Mutex } ;
1212use crate :: util:: async_poll:: { MaybeSend , MaybeSync } ;
1313
14+ #[ cfg( all( test, not( feature = "std" ) ) ) ]
15+ use alloc:: rc:: Rc ;
16+
1417#[ cfg( all( test, not( feature = "std" ) ) ) ]
1518use core:: cell:: RefCell ;
19+ #[ cfg( test) ]
20+ use core:: convert:: Infallible ;
1621use core:: future:: Future ;
1722#[ cfg( test) ]
1823use core:: pin:: Pin ;
24+ #[ cfg( test) ]
25+ use core:: task:: { Context , Poll } ;
1926
20- /// A generic trait which is able to spawn futures in the background.
27+ /// A generic trait which is able to spawn futures to be polled in the background.
28+ ///
29+ /// When the spawned future completes, the returned [`Self::SpawnedFutureResult`] should resolve
30+ /// with the output of the spawned future.
31+ ///
32+ /// Spawned futures must be polled independently in the background even if the returned
33+ /// [`Self::SpawnedFutureResult`] is dropped without being polled. This matches the semantics of
34+ /// `tokio::spawn`.
2135///
2236/// This is not exported to bindings users as async is only supported in Rust.
2337pub trait FutureSpawner : MaybeSend + MaybeSync + ' static {
38+ /// The error type of [`Self::SpawnedFutureResult`]. This can be used to indicate that the
39+ /// spawned future was cancelled or panicked.
40+ type E ;
41+ /// The result of [`Self::spawn`], a future which completes when the spawned future completes.
42+ type SpawnedFutureResult < O > : Future < Output = Result < O , Self :: E > > + Unpin ;
2443 /// Spawns the given future as a background task.
2544 ///
2645 /// This method MUST NOT block on the given future immediately.
27- fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , future : T ) ;
46+ fn spawn < O : MaybeSend + ' static , T : Future < Output = O > + MaybeSend + ' static > (
47+ & self , future : T ,
48+ ) -> Self :: SpawnedFutureResult < O > ;
2849}
2950
3051#[ cfg( test) ]
@@ -39,6 +60,70 @@ pub(crate) struct FutureQueue(Mutex<Vec<Pin<Box<dyn MaybeSendableFuture>>>>);
3960#[ cfg( all( test, not( feature = "std" ) ) ) ]
4061pub ( crate ) struct FutureQueue ( RefCell < Vec < Pin < Box < dyn MaybeSendableFuture > > > > ) ;
4162
63+ /// A simple future which can be completed later. Used to implement [`FutureQueue`].
64+ #[ cfg( all( test, feature = "std" ) ) ]
65+ pub struct FutureQueueCompletion < O > ( Arc < Mutex < Option < O > > > ) ;
66+ #[ cfg( all( test, not( feature = "std" ) ) ) ]
67+ pub struct FutureQueueCompletion < O > ( Rc < RefCell < Option < O > > > ) ;
68+
69+ #[ cfg( all( test, feature = "std" ) ) ]
70+ impl < O > FutureQueueCompletion < O > {
71+ fn new ( ) -> Self {
72+ Self ( Arc :: new ( Mutex :: new ( None ) ) )
73+ }
74+
75+ fn complete ( & self , o : O ) {
76+ * self . 0 . lock ( ) . unwrap ( ) = Some ( o) ;
77+ }
78+ }
79+
80+ #[ cfg( all( test, feature = "std" ) ) ]
81+ impl < O > Clone for FutureQueueCompletion < O > {
82+ fn clone ( & self ) -> Self {
83+ Self ( self . 0 . clone ( ) )
84+ }
85+ }
86+
87+ #[ cfg( all( test, not( feature = "std" ) ) ) ]
88+ impl < O > FutureQueueCompletion < O > {
89+ fn new ( ) -> Self {
90+ Self ( Rc :: new ( RefCell :: new ( None ) ) )
91+ }
92+
93+ fn complete ( & self , o : O ) {
94+ * self . 0 . borrow_mut ( ) = Some ( o) ;
95+ }
96+ }
97+
98+ #[ cfg( all( test, not( feature = "std" ) ) ) ]
99+ impl < O > Clone for FutureQueueCompletion < O > {
100+ fn clone ( & self ) -> Self {
101+ Self ( self . 0 . clone ( ) )
102+ }
103+ }
104+
105+ #[ cfg( all( test, feature = "std" ) ) ]
106+ impl < O > Future for FutureQueueCompletion < O > {
107+ type Output = Result < O , Infallible > ;
108+ fn poll ( self : Pin < & mut Self > , _: & mut Context < ' _ > ) -> Poll < Result < O , Infallible > > {
109+ match Pin :: into_inner ( self ) . 0 . lock ( ) . unwrap ( ) . take ( ) {
110+ None => Poll :: Pending ,
111+ Some ( o) => Poll :: Ready ( Ok ( o) ) ,
112+ }
113+ }
114+ }
115+
116+ #[ cfg( all( test, not( feature = "std" ) ) ) ]
117+ impl < O > Future for FutureQueueCompletion < O > {
118+ type Output = Result < O , Infallible > ;
119+ fn poll ( self : Pin < & mut Self > , _: & mut Context < ' _ > ) -> Poll < Result < O , Infallible > > {
120+ match Pin :: into_inner ( self ) . 0 . borrow_mut ( ) . take ( ) {
121+ None => Poll :: Pending ,
122+ Some ( o) => Poll :: Ready ( Ok ( o) ) ,
123+ }
124+ }
125+ }
126+
42127#[ cfg( test) ]
43128impl FutureQueue {
44129 pub ( crate ) fn new ( ) -> Self {
@@ -74,7 +159,6 @@ impl FutureQueue {
74159 futures = self . 0 . borrow_mut ( ) ;
75160 }
76161 futures. retain_mut ( |fut| {
77- use core:: task:: { Context , Poll } ;
78162 let waker = crate :: util:: async_poll:: dummy_waker ( ) ;
79163 match fut. as_mut ( ) . poll ( & mut Context :: from_waker ( & waker) ) {
80164 Poll :: Ready ( ( ) ) => false ,
@@ -86,7 +170,16 @@ impl FutureQueue {
86170
87171#[ cfg( test) ]
88172impl FutureSpawner for FutureQueue {
89- fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , future : T ) {
173+ type E = Infallible ;
174+ type SpawnedFutureResult < O > = FutureQueueCompletion < O > ;
175+ fn spawn < O : MaybeSend + ' static , F : Future < Output = O > + MaybeSend + ' static > (
176+ & self , f : F ,
177+ ) -> FutureQueueCompletion < O > {
178+ let completion = FutureQueueCompletion :: new ( ) ;
179+ let compl_ref = completion. clone ( ) ;
180+ let future = async move {
181+ compl_ref. complete ( f. await ) ;
182+ } ;
90183 #[ cfg( feature = "std" ) ]
91184 {
92185 self . 0 . lock ( ) . unwrap ( ) . push ( Box :: pin ( future) ) ;
@@ -95,14 +188,24 @@ impl FutureSpawner for FutureQueue {
95188 {
96189 self . 0 . borrow_mut ( ) . push ( Box :: pin ( future) ) ;
97190 }
191+ completion
98192 }
99193}
100194
101195#[ cfg( test) ]
102196impl < D : core:: ops:: Deref < Target = FutureQueue > + MaybeSend + MaybeSync + ' static > FutureSpawner
103197 for D
104198{
105- fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , future : T ) {
199+ type E = Infallible ;
200+ type SpawnedFutureResult < O > = FutureQueueCompletion < O > ;
201+ fn spawn < O : MaybeSend + ' static , F : Future < Output = O > + MaybeSend + ' static > (
202+ & self , f : F ,
203+ ) -> FutureQueueCompletion < O > {
204+ let completion = FutureQueueCompletion :: new ( ) ;
205+ let compl_ref = completion. clone ( ) ;
206+ let future = async move {
207+ compl_ref. complete ( f. await ) ;
208+ } ;
106209 #[ cfg( feature = "std" ) ]
107210 {
108211 self . 0 . lock ( ) . unwrap ( ) . push ( Box :: pin ( future) ) ;
@@ -111,5 +214,6 @@ impl<D: core::ops::Deref<Target = FutureQueue> + MaybeSend + MaybeSync + 'static
111214 {
112215 self . 0 . borrow_mut ( ) . push ( Box :: pin ( future) ) ;
113216 }
217+ completion
114218 }
115219}
0 commit comments