Skip to content

Commit d0a1f9e

Browse files
committed
Refactor spawn_inner
1 parent 646c77a commit d0a1f9e

File tree

1 file changed

+23
-18
lines changed

1 file changed

+23
-18
lines changed

src/local_executor.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
66
use std::rc::Rc;
77
use std::task::{Poll, Waker};
88

9-
use async_task::{Builder, Runnable};
9+
use async_task::{Builder, Runnable, Schedule};
1010
use futures_lite::{future, prelude::*};
1111
use slab::Slab;
1212

@@ -103,11 +103,13 @@ impl<'a> LocalExecutor<'a> {
103103
/// });
104104
/// ```
105105
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
106+
let state = self.state_as_rc();
106107
// SAFETY: All UnsafeCell accesses to active are tightly scoped, and because
107108
// `LocalExecutor` is !Send, there is no way to have concurrent access to the
108109
// values in `State`, including the active field.
109110
let active = unsafe { &mut *self.state().active.get() };
110-
self.spawn_inner(future, active)
111+
let schedule = Self::schedule(state.clone());
112+
Self::spawn_inner(state, future, active, schedule)
111113
}
112114

113115
/// Spawns many tasks onto the executor.
@@ -155,30 +157,35 @@ impl<'a> LocalExecutor<'a> {
155157
futures: impl IntoIterator<Item = F>,
156158
handles: &mut impl Extend<Task<F::Output>>,
157159
) {
158-
// SAFETY: All UnsafeCell accesses to active are tightly scoped, and because
159-
// `LocalExecutor` is !Send, there is no way to have concurrent access to the
160-
// values in `State`, including the active field.
161-
let active = unsafe { &mut *self.state().active.get() };
160+
let tasks = {
161+
let state = self.state_as_rc();
162162

163-
// Convert the futures into tasks.
164-
let tasks = futures
165-
.into_iter()
166-
.map(move |future| self.spawn_inner(future, active));
163+
// SAFETY: All UnsafeCell accesses to active are tightly scoped, and because
164+
// `LocalExecutor` is !Send, there is no way to have concurrent access to the
165+
// values in `State`, including the active field.
166+
let active = unsafe { &mut *state.active.get() };
167+
168+
// Convert the futures into tasks.
169+
futures.into_iter().map(move |future| {
170+
let schedule = Self::schedule(state.clone());
171+
Self::spawn_inner(state.clone(), future, active, schedule)
172+
})
173+
};
167174

168175
// Push the tasks to the user's collection.
169176
handles.extend(tasks);
170177
}
171178

172179
/// Spawn a future while holding the inner lock.
173180
fn spawn_inner<T: 'a>(
174-
&self,
181+
state: Rc<State>,
175182
future: impl Future<Output = T> + 'a,
176183
active: &mut Slab<Waker>,
184+
schedule: impl Schedule + 'static,
177185
) -> Task<T> {
178186
// Remove the task from the set of active tasks when the future finishes.
179187
let entry = active.vacant_entry();
180188
let index = entry.key();
181-
let state = self.state_as_rc();
182189
let future = AsyncCallOnDrop::new(future, move || {
183190
// SAFETY: All UnsafeCell accesses to active are tightly scoped, and because
184191
// `LocalExecutor` is !Send, there is no way to have concurrent access to the
@@ -204,16 +211,16 @@ impl<'a> LocalExecutor<'a> {
204211
// the `Executor` is drained of all of its runnables. This ensures that
205212
// runnables are dropped and this precondition is satisfied.
206213
//
207-
// `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not
214+
// `schedule` is not `Send` nor `Sync`. As LocalExecutor is not
208215
// `Send`, the `Waker` is guaranteed// to only be used on the same thread
209216
// it was spawned on.
210217
//
211-
// `self.schedule()` is `'static`, and thus will outlive all borrowed
218+
// `schedule` is `'static`, and thus will outlive all borrowed
212219
// variables in the future.
213220
let (runnable, task) = unsafe {
214221
Builder::new()
215222
.propagate_panic(true)
216-
.spawn_unchecked(|()| future, self.schedule())
223+
.spawn_unchecked(|()| future, schedule)
217224
};
218225
entry.insert(runnable.waker());
219226

@@ -285,9 +292,7 @@ impl<'a> LocalExecutor<'a> {
285292
}
286293

287294
/// Returns a function that schedules a runnable task when it gets woken up.
288-
fn schedule(&self) -> impl Fn(Runnable) + 'static {
289-
let state = self.state_as_rc();
290-
295+
fn schedule(state: Rc<State>) -> impl Fn(Runnable) + 'static {
291296
move |runnable| {
292297
{
293298
// SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because

0 commit comments

Comments
 (0)