1
- use super :: {
2
- polling:: { EventKey , Poller } ,
3
- REACTOR ,
4
- } ;
1
+ use super :: REACTOR ;
5
2
6
3
use core:: cell:: RefCell ;
7
4
use core:: future;
8
5
use core:: pin:: Pin ;
9
6
use core:: task:: { Context , Poll , Waker } ;
7
+ use slab:: Slab ;
10
8
use std:: collections:: HashMap ;
11
9
use std:: rc:: Rc ;
12
10
use wasi:: io:: poll:: Pollable ;
13
11
12
+ /// A key representing an entry into the poller
13
+ #[ repr( transparent) ]
14
+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord , Hash , Clone , Copy ) ]
15
+ pub ( crate ) struct EventKey ( pub ( crate ) usize ) ;
16
+
14
17
#[ derive( Debug , PartialEq , Eq , Hash ) ]
15
18
struct Registration {
16
19
key : EventKey ,
@@ -83,7 +86,7 @@ pub struct Reactor {
83
86
/// a lock of the whole.
84
87
#[ derive( Debug ) ]
85
88
struct InnerReactor {
86
- poller : Poller ,
89
+ pollables : Slab < Pollable > ,
87
90
wakers : HashMap < Waitee , Waker > ,
88
91
}
89
92
@@ -105,7 +108,7 @@ impl Reactor {
105
108
pub ( crate ) fn new ( ) -> Self {
106
109
Self {
107
110
inner : Rc :: new ( RefCell :: new ( InnerReactor {
108
- poller : Poller :: new ( ) ,
111
+ pollables : Slab :: new ( ) ,
109
112
wakers : HashMap :: new ( ) ,
110
113
} ) ) ,
111
114
}
@@ -124,8 +127,42 @@ impl Reactor {
124
127
/// reason that we have to call all the wakers - even if by default they
125
128
/// will do nothing.
126
129
pub ( crate ) fn block_until ( & self ) {
127
- let mut reactor = self . inner . borrow_mut ( ) ;
128
- for key in reactor. poller . block_until ( ) {
130
+ let reactor = self . inner . borrow ( ) ;
131
+
132
+ // We're about to wait for a number of pollables. When they wake we get
133
+ // the *indexes* back for the pollables whose events were available - so
134
+ // we need to be able to associate the index with the right waker.
135
+
136
+ // We start by iterating over the pollables, and keeping note of which
137
+ // pollable belongs to which waker index
138
+ let mut indexes = Vec :: with_capacity ( reactor. wakers . len ( ) ) ;
139
+ let mut targets = Vec :: with_capacity ( reactor. wakers . len ( ) ) ;
140
+ for waitee in reactor. wakers . keys ( ) {
141
+ let pollable_index = waitee. pollable . 0 . key ;
142
+ indexes. push ( pollable_index) ;
143
+ targets. push ( & reactor. pollables [ pollable_index. 0 ] ) ;
144
+ }
145
+
146
+ debug_assert_ne ! (
147
+ targets. len( ) ,
148
+ 0 ,
149
+ "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
150
+ ) ;
151
+
152
+ println ! ( "polling for {indexes:?}" ) ;
153
+ // Now that we have that association, we're ready to poll our targets.
154
+ // This will block until an event has completed.
155
+ let ready_indexes = wasi:: io:: poll:: poll ( & targets) ;
156
+
157
+ // Once we have the indexes for which pollables are available, we need
158
+ // to convert it back to the right keys for the wakers. Earlier we
159
+ // established a positional index -> waker key relationship, so we can
160
+ // go right ahead and perform a lookup there.
161
+ let ready_keys = ready_indexes
162
+ . into_iter ( )
163
+ . map ( |index| indexes[ index as usize ] ) ;
164
+
165
+ for key in ready_keys {
129
166
for ( waitee, waker) in reactor. wakers . iter ( ) {
130
167
if waitee. pollable . 0 . key == key {
131
168
println ! ( "waking {key:?}" ) ;
@@ -138,15 +175,15 @@ impl Reactor {
138
175
/// Turn a wasi [`Pollable`] into an [`AsyncPollable`]
139
176
pub fn schedule ( & self , pollable : Pollable ) -> AsyncPollable {
140
177
let mut reactor = self . inner . borrow_mut ( ) ;
141
- let key = reactor. poller . insert ( pollable) ;
178
+ let key = EventKey ( reactor. pollables . insert ( pollable) ) ;
142
179
println ! ( "schedule pollable as {key:?}" ) ;
143
180
AsyncPollable ( Rc :: new ( Registration { key } ) )
144
181
}
145
182
146
183
fn deregister_event ( & self , key : EventKey ) {
147
184
let mut reactor = self . inner . borrow_mut ( ) ;
148
185
println ! ( "deregister {key:?}" , ) ;
149
- reactor. poller . remove ( key) ;
186
+ reactor. pollables . remove ( key. 0 ) ;
150
187
}
151
188
152
189
fn deregister_waitee ( & self , waitee : & Waitee ) {
@@ -158,8 +195,8 @@ impl Reactor {
158
195
fn ready ( & self , waitee : & Waitee , waker : & Waker ) -> bool {
159
196
let mut reactor = self . inner . borrow_mut ( ) ;
160
197
let ready = reactor
161
- . poller
162
- . get ( & waitee. pollable . 0 . key )
198
+ . pollables
199
+ . get ( waitee. pollable . 0 . key . 0 )
163
200
. expect ( "only live EventKey can be checked for readiness" )
164
201
. ready ( ) ;
165
202
if !ready {
@@ -180,13 +217,73 @@ impl Reactor {
180
217
#[ cfg( test) ]
181
218
mod test {
182
219
use super :: * ;
220
+ // Using WASMTIME_LOG, observe that this test doesn't even call poll() - the pollable is ready
221
+ // immediately.
183
222
#[ test]
184
- fn reactor_subscribe_duration ( ) {
223
+ fn subscribe_no_duration ( ) {
185
224
crate :: runtime:: block_on ( async {
186
225
let reactor = Reactor :: current ( ) ;
187
- let pollable = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 1000 ) ;
226
+ let pollable = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 0 ) ;
188
227
let sched = reactor. schedule ( pollable) ;
189
228
sched. wait_for ( ) . await ;
190
229
} )
191
230
}
231
+ // Using WASMTIME_LOG, observe that this test calls poll() until the timer is ready.
232
+ #[ test]
233
+ fn subscribe_some_duration ( ) {
234
+ crate :: runtime:: block_on ( async {
235
+ let reactor = Reactor :: current ( ) ;
236
+ let pollable = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
237
+ let sched = reactor. schedule ( pollable) ;
238
+ sched. wait_for ( ) . await ;
239
+ } )
240
+ }
241
+
242
+ // Using WASMTIME_LOG, observe that this test results in a single poll() on the second
243
+ // subscription, rather than spinning in poll() with first subscription, which is instantly
244
+ // ready, but not what the waker requests.
245
+ #[ test]
246
+ fn subscribe_multiple_durations ( ) {
247
+ crate :: runtime:: block_on ( async {
248
+ let reactor = Reactor :: current ( ) ;
249
+ let now = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 0 ) ;
250
+ let soon = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
251
+ let now = reactor. schedule ( now) ;
252
+ let soon = reactor. schedule ( soon) ;
253
+ soon. wait_for ( ) . await ;
254
+ drop ( now)
255
+ } )
256
+ }
257
+
258
+ // Using WASMTIME_LOG, observe that this test results in two calls to poll(), one with both
259
+ // pollables because both are awaiting, and one with just the later pollable.
260
+ #[ test]
261
+ fn subscribe_multiple_durations_zipped ( ) {
262
+ crate :: runtime:: block_on ( async {
263
+ let reactor = Reactor :: current ( ) ;
264
+ let start = wasi:: clocks:: monotonic_clock:: now ( ) ;
265
+ let soon = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 10_000_000 ) ;
266
+ let later = wasi:: clocks:: monotonic_clock:: subscribe_duration ( 40_000_000 ) ;
267
+ let soon = reactor. schedule ( soon) ;
268
+ let later = reactor. schedule ( later) ;
269
+
270
+ futures_lite:: future:: zip (
271
+ async move {
272
+ soon. wait_for ( ) . await ;
273
+ println ! (
274
+ "*** subscribe_duration(soon) ready ({})" ,
275
+ wasi:: clocks:: monotonic_clock:: now( ) - start
276
+ ) ;
277
+ } ,
278
+ async move {
279
+ later. wait_for ( ) . await ;
280
+ println ! (
281
+ "*** subscribe_duration(later) ready ({})" ,
282
+ wasi:: clocks:: monotonic_clock:: now( ) - start
283
+ ) ;
284
+ } ,
285
+ )
286
+ . await ;
287
+ } )
288
+ }
192
289
}
0 commit comments