Skip to content

Commit 6ef448e

Browse files
SrTobicramertj
authored andcommitted
Added functions to partially progress a local pool.
fixes #1369
1 parent 6780d74 commit 6ef448e

File tree

2 files changed

+258
-12
lines changed

2 files changed

+258
-12
lines changed

futures-executor/src/local_pool.rs

Lines changed: 110 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
7474
})
7575
}
7676

77+
fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
78+
let _enter = enter()
79+
.expect("cannot execute `LocalPool` executor from within \
80+
another executor");
81+
82+
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
83+
let waker = waker_ref(thread_notify);
84+
let mut cx = Context::from_waker(&waker);
85+
f(&mut cx)
86+
})
87+
}
88+
7789
impl LocalPool {
7890
/// Create a new, empty pool of tasks.
7991
pub fn new() -> LocalPool {
@@ -135,8 +147,8 @@ impl LocalPool {
135147
/// The function will block the calling thread *only* until the future `f`
136148
/// completes; there may still be incomplete tasks in the pool, which will
137149
/// be inert after the call completes, but can continue with further use of
138-
/// `run` or `run_until`. While the function is running, however, all tasks
139-
/// in the pool will try to make progress.
150+
/// one of the pool's run or poll mothods. While the function is running,
151+
/// however, all tasks in the pool will try to make progress.
140152
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
141153
pin_mut!(future);
142154

@@ -154,20 +166,94 @@ impl LocalPool {
154166
})
155167
}
156168

169+
/// Runs all tasks and returns after completing one future or until no more progress
170+
/// can be made. Returns `true` if one future was completed, `false` otherwise.
171+
///
172+
/// ```
173+
/// #![feature(futures_api)]
174+
/// use futures::executor::LocalPool;
175+
/// use futures::task::LocalSpawnExt;
176+
/// use futures::future::{ready, empty};
177+
///
178+
/// let mut pool = LocalPool::new();
179+
/// let mut spawner = pool.spawner();
180+
///
181+
/// spawner.spawn_local(ready(()));
182+
/// spawner.spawn_local(ready(()));
183+
/// spawner.spawn_local(empty());
184+
///
185+
/// // Run the two ready tasks and return true for them.
186+
/// pool.poll_one(); // returns true after completing one of the ready futures
187+
/// pool.poll_one(); // returns true after completing the other ready future
188+
///
189+
/// // the remaining task can not be completed
190+
/// pool.poll_one(); // returns false
191+
/// ```
192+
///
193+
/// This function will not block the calling thread and will return the moment
194+
/// that there are no tasks left for which progress can be made or after exactly one
195+
/// task was completed; Remaining incomplete tasks in the pool can continue with
196+
/// further use of one of the pool's run or poll methods.
197+
/// Though only one task will be completed, progress may be made on multiple tasks.
198+
pub fn poll_one(&mut self) -> bool {
199+
poll_executor(|ctx| {
200+
let ret = self.poll_pool_once(ctx);
201+
202+
// return if we really have executed a future
203+
match ret {
204+
Poll::Ready(Some(_)) => true,
205+
_ => false
206+
}
207+
})
208+
}
209+
210+
/// Runs all tasks in the pool and returns if no more progress can be made
211+
/// on any task.
212+
///
213+
/// ```
214+
/// #![feature(futures_api)]
215+
/// use futures::executor::LocalPool;
216+
/// use futures::task::LocalSpawnExt;
217+
/// use futures::future::{ready, empty};
218+
///
219+
/// let mut pool = LocalPool::new();
220+
/// let mut spawner = pool.spawner();
221+
///
222+
/// spawner.spawn_local(ready(()));
223+
/// spawner.spawn_local(ready(()));
224+
/// spawner.spawn_local(empty());
225+
///
226+
/// // Runs the two ready task and returns.
227+
/// // The empty task remains in the pool.
228+
/// pool.poll();
229+
/// ```
230+
///
231+
/// This function will not block the calling thread and will return the moment
232+
/// that there are no tasks left for which progress can be made;
233+
/// remaining incomplete tasks in the pool can continue with further use of one
234+
/// of the pool's run or poll methods. While the function is running, all tasks
235+
/// in the pool will try to make progress.
236+
pub fn poll(&mut self) {
237+
poll_executor(|ctx| {
238+
loop {
239+
let result = self.poll_pool_once(ctx);
240+
241+
// if there are no more ready futures exit
242+
match result {
243+
Poll::Pending | Poll::Ready(None) => return,
244+
_ => continue
245+
}
246+
}
247+
})
248+
}
249+
157250
// Make maximal progress on the entire pool of spawned task, returning `Ready`
158251
// if the pool is empty and `Pending` if no further progress can be made.
159252
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
160253
// state for the FuturesUnordered, which will never be used
161254
loop {
162-
// empty the incoming queue of newly-spawned tasks
163-
{
164-
let mut incoming = self.incoming.borrow_mut();
165-
for task in incoming.drain(..) {
166-
self.pool.push(task)
167-
}
168-
}
255+
let ret = self.poll_pool_once(cx);
169256

170-
let ret = self.pool.poll_next_unpin(cx);
171257
// we queued up some new tasks; add them and poll again
172258
if !self.incoming.borrow().is_empty() {
173259
continue;
@@ -181,6 +267,20 @@ impl LocalPool {
181267
}
182268
}
183269
}
270+
271+
// Try make minimal progress on the pool of spawned tasks
272+
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
273+
// empty the incoming queue of newly-spawned tasks
274+
{
275+
let mut incoming = self.incoming.borrow_mut();
276+
for task in incoming.drain(..) {
277+
self.pool.push(task)
278+
}
279+
}
280+
281+
// try to execute the next ready future
282+
self.pool.poll_next_unpin(cx)
283+
}
184284
}
185285

186286
impl Default for LocalPool {

futures-executor/tests/local_pool.rs

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
use futures::channel::oneshot;
44
use futures::executor::LocalPool;
5-
use futures::future::{Future, lazy};
6-
use futures::task::{Context, Poll, Spawn, LocalSpawn};
5+
use futures::future::{Future, lazy, poll_fn};
6+
use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
77
use std::cell::{Cell, RefCell};
88
use std::pin::Pin;
99
use std::rc::Rc;
@@ -58,6 +58,13 @@ fn run_until_executes_spawned() {
5858
pool.run_until(rx).unwrap();
5959
}
6060

61+
#[test]
62+
fn run_returns_if_empty() {
63+
let mut pool = LocalPool::new();
64+
pool.run();
65+
pool.run();
66+
}
67+
6168
#[test]
6269
fn run_executes_spawned() {
6370
let cnt = Rc::new(Cell::new(0));
@@ -103,6 +110,130 @@ fn run_spawn_many() {
103110
assert_eq!(cnt.get(), ITER);
104111
}
105112

113+
#[test]
114+
fn poll_one_returns_if_empty() {
115+
let mut pool = LocalPool::new();
116+
assert!(pool.poll_one() == false);
117+
}
118+
119+
#[test]
120+
fn poll_one_executes_one_ready() {
121+
const ITER: usize = 200;
122+
123+
let cnt = Rc::new(Cell::new(0));
124+
125+
let mut pool = LocalPool::new();
126+
let mut spawn = pool.spawner();
127+
128+
for _ in 0..ITER {
129+
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
130+
131+
let cnt = cnt.clone();
132+
spawn.spawn_local_obj(Box::pin(lazy(move |_| {
133+
cnt.set(cnt.get() + 1);
134+
()
135+
})).into()).unwrap();
136+
137+
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
138+
}
139+
140+
for i in 0..ITER {
141+
assert_eq!(cnt.get(), i);
142+
assert!(pool.poll_one());
143+
assert_eq!(cnt.get(), i + 1);
144+
}
145+
assert!(pool.poll_one() == false);
146+
}
147+
148+
#[test]
149+
fn poll_one_returns_on_no_progress() {
150+
const ITER: usize = 10;
151+
152+
let cnt = Rc::new(Cell::new(0));
153+
154+
let mut pool = LocalPool::new();
155+
let mut spawn = pool.spawner();
156+
157+
let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
158+
{
159+
let cnt = cnt.clone();
160+
let waker = waker.clone();
161+
spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| {
162+
cnt.set(cnt.get() + 1);
163+
waker.set(Some(ctx.waker().clone()));
164+
if cnt.get() == ITER {
165+
Poll::Ready(())
166+
} else {
167+
Poll::Pending
168+
}
169+
})).into()).unwrap();
170+
}
171+
172+
for i in 0..ITER - 1 {
173+
assert_eq!(cnt.get(), i);
174+
assert!(!pool.poll_one());
175+
assert_eq!(cnt.get(), i + 1);
176+
let w = waker.take();
177+
assert!(w.is_some());
178+
w.unwrap().wake();
179+
}
180+
assert!(pool.poll_one());
181+
assert_eq!(cnt.get(), ITER);
182+
}
183+
184+
#[test]
185+
fn poll_returns_if_empty() {
186+
let mut pool = LocalPool::new();
187+
pool.poll();
188+
pool.poll();
189+
}
190+
191+
#[test]
192+
fn poll_returns_multiple_times() {
193+
let mut pool = LocalPool::new();
194+
let mut spawn = pool.spawner();
195+
let cnt = Rc::new(Cell::new(0));
196+
197+
let cnt1 = cnt.clone();
198+
spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap();
199+
pool.poll();
200+
assert_eq!(cnt.get(), 1);
201+
202+
let cnt2 = cnt.clone();
203+
spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap();
204+
pool.poll();
205+
assert_eq!(cnt.get(), 2);
206+
}
207+
208+
#[test]
209+
fn poll_executes_all_ready() {
210+
const ITER: usize = 200;
211+
const PER_ITER: usize = 3;
212+
213+
let cnt = Rc::new(Cell::new(0));
214+
215+
let mut pool = LocalPool::new();
216+
let mut spawn = pool.spawner();
217+
218+
for i in 0..ITER {
219+
for _ in 0..PER_ITER {
220+
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
221+
222+
let cnt = cnt.clone();
223+
spawn.spawn_local_obj(Box::pin(lazy(move |_| {
224+
cnt.set(cnt.get() + 1);
225+
()
226+
})).into()).unwrap();
227+
228+
// also add some pending tasks to test if they are ignored
229+
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
230+
}
231+
assert_eq!(cnt.get(), i * PER_ITER);
232+
pool.poll();
233+
assert_eq!(cnt.get(), (i + 1) * PER_ITER);
234+
}
235+
}
236+
106237
#[test]
107238
#[should_panic]
108239
fn nesting_run() {
@@ -113,6 +244,21 @@ fn nesting_run() {
113244
let mut pool = LocalPool::new();
114245
pool.run();
115246
})).into()).unwrap();
247+
248+
pool.run();
249+
}
250+
251+
#[test]
252+
#[should_panic]
253+
fn nesting_run_poll() {
254+
let mut pool = LocalPool::new();
255+
let mut spawn = pool.spawner();
256+
257+
spawn.spawn_obj(Box::pin(lazy(|_| {
258+
let mut pool = LocalPool::new();
259+
pool.poll();
260+
})).into()).unwrap();
261+
116262
pool.run();
117263
}
118264

0 commit comments

Comments
 (0)