@@ -5,12 +5,72 @@ use super::{
5
5
6
6
use core:: cell:: RefCell ;
7
7
use core:: future;
8
- use core:: task :: Poll ;
9
- use core:: task:: Waker ;
8
+ use core:: pin :: Pin ;
9
+ use core:: task:: { Context , Poll , Waker } ;
10
10
use std:: collections:: HashMap ;
11
11
use std:: rc:: Rc ;
12
12
use wasi:: io:: poll:: Pollable ;
13
13
14
+ #[ derive( Debug ) ]
15
+ struct Registration {
16
+ key : EventKey ,
17
+ }
18
+
19
+ impl Drop for Registration {
20
+ fn drop ( & mut self ) {
21
+ Reactor :: current ( ) . deregister_event ( self . key )
22
+ }
23
+ }
24
+
25
+ #[ derive( Debug , Clone ) ]
26
+ pub struct AsyncPollable ( Rc < Registration > ) ;
27
+
28
+ impl AsyncPollable {
29
+ pub fn wait_for ( & self ) -> WaitFor {
30
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
31
+ static COUNTER : AtomicUsize = AtomicUsize :: new ( 0 ) ;
32
+ let unique = COUNTER . fetch_add ( 1 , Ordering :: Relaxed ) ;
33
+ let key = self . 0 . key ;
34
+ WaitFor {
35
+ waitee : Waitee { key, unique } ,
36
+ needs_deregistration : false ,
37
+ }
38
+ }
39
+ }
40
+
41
+ #[ derive( Debug , PartialEq , Eq , Hash , Clone ) ]
42
+ struct Waitee {
43
+ key : EventKey ,
44
+ unique : usize ,
45
+ }
46
+
47
+ #[ must_use = "futures do nothing unless polled or .awaited" ]
48
+ #[ derive( Debug ) ]
49
+ pub struct WaitFor {
50
+ waitee : Waitee ,
51
+ needs_deregistration : bool ,
52
+ }
53
+ impl future:: Future for WaitFor {
54
+ type Output = ( ) ;
55
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
56
+ let reactor = Reactor :: current ( ) ;
57
+ if reactor. ready ( & self . as_ref ( ) . waitee , cx. waker ( ) ) {
58
+ Poll :: Ready ( ( ) )
59
+ } else {
60
+ self . as_mut ( ) . needs_deregistration = true ;
61
+ Poll :: Pending
62
+ }
63
+ }
64
+ }
65
+ impl Drop for WaitFor {
66
+ fn drop ( & mut self ) {
67
+ println ! ( "dropping {:?}" , self ) ;
68
+ if self . needs_deregistration {
69
+ Reactor :: current ( ) . deregister_waitee ( & self . waitee )
70
+ }
71
+ }
72
+ }
73
+
14
74
/// Manage async system resources for WASI 0.2
15
75
#[ derive( Debug , Clone ) ]
16
76
pub struct Reactor {
@@ -22,7 +82,7 @@ pub struct Reactor {
22
82
#[ derive( Debug ) ]
23
83
struct InnerReactor {
24
84
poller : Poller ,
25
- wakers : HashMap < EventKey , Waker > ,
85
+ wakers : HashMap < Waitee , Waker > ,
26
86
}
27
87
28
88
impl Reactor {
@@ -64,39 +124,52 @@ impl Reactor {
64
124
pub ( crate ) fn block_until ( & self ) {
65
125
let mut reactor = self . inner . borrow_mut ( ) ;
66
126
for key in reactor. poller . block_until ( ) {
67
- match reactor. wakers . get ( & key) {
68
- Some ( waker) => waker. wake_by_ref ( ) ,
69
- None => panic ! ( "tried to wake the waker for non-existent `{:?}`" , key) ,
127
+ for ( waitee, waker) in reactor. wakers . iter ( ) {
128
+ if waitee. key == key {
129
+ waker. wake_by_ref ( )
130
+ }
70
131
}
71
132
}
72
133
}
73
134
135
+ /// Turn a wasi [`Pollable`] into an [`AsyncPollable`]
136
+ pub fn schedule ( & self , pollable : Pollable ) -> AsyncPollable {
137
+ let mut reactor = self . inner . borrow_mut ( ) ;
138
+ let key = reactor. poller . insert ( pollable) ;
139
+ println ! ( "schedule pollable as {key:?}" ) ;
140
+ AsyncPollable ( Rc :: new ( Registration { key } ) )
141
+ }
142
+
143
+ fn deregister_event ( & self , key : EventKey ) {
144
+ let mut reactor = self . inner . borrow_mut ( ) ;
145
+ println ! ( "deregister {key:?}" , ) ;
146
+ reactor. poller . remove ( key) ;
147
+ }
148
+
149
+ fn deregister_waitee ( & self , waitee : & Waitee ) {
150
+ let mut reactor = self . inner . borrow_mut ( ) ;
151
+ println ! ( "deregister waker for {waitee:?}" , ) ;
152
+ reactor. wakers . remove ( waitee) ;
153
+ }
154
+
155
+ fn ready ( & self , waitee : & Waitee , waker : & Waker ) -> bool {
156
+ let mut reactor = self . inner . borrow_mut ( ) ;
157
+ let ready = reactor
158
+ . poller
159
+ . get ( & waitee. key )
160
+ . expect ( "only live EventKey can be checked for readiness" )
161
+ . ready ( ) ;
162
+ if !ready {
163
+ println ! ( "register waker for {waitee:?}" ) ;
164
+ reactor. wakers . insert ( waitee. clone ( ) , waker. clone ( ) ) ;
165
+ }
166
+ println ! ( "ready {ready} {waitee:?}" ) ;
167
+ ready
168
+ }
169
+
74
170
/// Wait for the pollable to resolve.
75
171
pub async fn wait_for ( & self , pollable : Pollable ) {
76
- let mut pollable = Some ( pollable) ;
77
- let mut key = None ;
78
- // This function is the core loop of our function; it will be called
79
- // multiple times as the future is resolving.
80
- future:: poll_fn ( |cx| {
81
- // Start by taking a lock on the reactor. This is single-threaded
82
- // and short-lived, so it will never be contended.
83
- let mut reactor = self . inner . borrow_mut ( ) ;
84
-
85
- // Schedule interest in the `pollable` on the first iteration. On
86
- // every iteration, register the waker with the reactor.
87
- let key = key. get_or_insert_with ( || reactor. poller . insert ( pollable. take ( ) . unwrap ( ) ) ) ;
88
- reactor. wakers . insert ( * key, cx. waker ( ) . clone ( ) ) ;
89
-
90
- // Check whether we're ready or need to keep waiting. If we're
91
- // ready, we clean up after ourselves.
92
- if reactor. poller . get ( key) . unwrap ( ) . ready ( ) {
93
- reactor. poller . remove ( * key) ;
94
- reactor. wakers . remove ( key) ;
95
- Poll :: Ready ( ( ) )
96
- } else {
97
- Poll :: Pending
98
- }
99
- } )
100
- . await
172
+ let p = self . schedule ( pollable) ;
173
+ p. wait_for ( ) . await
101
174
}
102
175
}
0 commit comments