Skip to content

Commit bbcdbf0

Browse files
committed
First attempt at a more optimized LocalExecutor
1 parent b3269e1 commit bbcdbf0

File tree

7 files changed

+1002
-247
lines changed

7 files changed

+1002
-247
lines changed

src/lib.rs

Lines changed: 2 additions & 230 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use std::fmt;
4444
use std::marker::PhantomData;
4545
use std::panic::{RefUnwindSafe, UnwindSafe};
4646
use std::pin::Pin;
47-
use std::rc::Rc;
4847
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
4948
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
5049
use std::task::{Context, Poll, Waker};
@@ -55,11 +54,13 @@ use futures_lite::{future, prelude::*};
5554
use pin_project_lite::pin_project;
5655
use slab::Slab;
5756

57+
mod local_executor;
5858
#[cfg(feature = "static")]
5959
mod static_executors;
6060

6161
#[doc(no_inline)]
6262
pub use async_task::{FallibleTask, Task};
63+
pub use local_executor::*;
6364
#[cfg(feature = "static")]
6465
#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
6566
pub use static_executors::*;
@@ -431,235 +432,6 @@ impl<'a> Default for Executor<'a> {
431432
}
432433
}
433434

434-
/// A thread-local executor.
435-
///
436-
/// The executor can only be run on the thread that created it.
437-
///
438-
/// # Examples
439-
///
440-
/// ```
441-
/// use async_executor::LocalExecutor;
442-
/// use futures_lite::future;
443-
///
444-
/// let local_ex = LocalExecutor::new();
445-
///
446-
/// future::block_on(local_ex.run(async {
447-
/// println!("Hello world!");
448-
/// }));
449-
/// ```
450-
pub struct LocalExecutor<'a> {
451-
/// The inner executor.
452-
inner: Executor<'a>,
453-
454-
/// Makes the type `!Send` and `!Sync`.
455-
_marker: PhantomData<Rc<()>>,
456-
}
457-
458-
impl UnwindSafe for LocalExecutor<'_> {}
459-
impl RefUnwindSafe for LocalExecutor<'_> {}
460-
461-
impl fmt::Debug for LocalExecutor<'_> {
462-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463-
debug_executor(&self.inner, "LocalExecutor", f)
464-
}
465-
}
466-
467-
impl<'a> LocalExecutor<'a> {
468-
/// Creates a single-threaded executor.
469-
///
470-
/// # Examples
471-
///
472-
/// ```
473-
/// use async_executor::LocalExecutor;
474-
///
475-
/// let local_ex = LocalExecutor::new();
476-
/// ```
477-
pub const fn new() -> LocalExecutor<'a> {
478-
LocalExecutor {
479-
inner: Executor::new(),
480-
_marker: PhantomData,
481-
}
482-
}
483-
484-
/// Returns `true` if there are no unfinished tasks.
485-
///
486-
/// # Examples
487-
///
488-
/// ```
489-
/// use async_executor::LocalExecutor;
490-
///
491-
/// let local_ex = LocalExecutor::new();
492-
/// assert!(local_ex.is_empty());
493-
///
494-
/// let task = local_ex.spawn(async {
495-
/// println!("Hello world");
496-
/// });
497-
/// assert!(!local_ex.is_empty());
498-
///
499-
/// assert!(local_ex.try_tick());
500-
/// assert!(local_ex.is_empty());
501-
/// ```
502-
pub fn is_empty(&self) -> bool {
503-
self.inner().is_empty()
504-
}
505-
506-
/// Spawns a task onto the executor.
507-
///
508-
/// # Examples
509-
///
510-
/// ```
511-
/// use async_executor::LocalExecutor;
512-
///
513-
/// let local_ex = LocalExecutor::new();
514-
///
515-
/// let task = local_ex.spawn(async {
516-
/// println!("Hello world");
517-
/// });
518-
/// ```
519-
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
520-
let mut active = self.inner().state().active();
521-
522-
// SAFETY: This executor is not thread safe, so the future and its result
523-
// cannot be sent to another thread.
524-
unsafe { self.inner().spawn_inner(future, &mut active) }
525-
}
526-
527-
/// Spawns many tasks onto the executor.
528-
///
529-
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
530-
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
531-
/// contention.
532-
///
533-
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
534-
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
535-
/// mutex is not released, as there are no other threads that can poll this executor.
536-
///
537-
/// ## Example
538-
///
539-
/// ```
540-
/// use async_executor::LocalExecutor;
541-
/// use futures_lite::{stream, prelude::*};
542-
/// use std::future::ready;
543-
///
544-
/// # futures_lite::future::block_on(async {
545-
/// let mut ex = LocalExecutor::new();
546-
///
547-
/// let futures = [
548-
/// ready(1),
549-
/// ready(2),
550-
/// ready(3)
551-
/// ];
552-
///
553-
/// // Spawn all of the futures onto the executor at once.
554-
/// let mut tasks = vec![];
555-
/// ex.spawn_many(futures, &mut tasks);
556-
///
557-
/// // Await all of them.
558-
/// let results = ex.run(async move {
559-
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
560-
/// }).await;
561-
/// assert_eq!(results, [1, 2, 3]);
562-
/// # });
563-
/// ```
564-
///
565-
/// [`spawn`]: LocalExecutor::spawn
566-
/// [`Executor::spawn_many`]: Executor::spawn_many
567-
pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
568-
&self,
569-
futures: impl IntoIterator<Item = F>,
570-
handles: &mut impl Extend<Task<F::Output>>,
571-
) {
572-
let mut active = self.inner().state().active();
573-
574-
// Convert all of the futures to tasks.
575-
let tasks = futures.into_iter().map(|future| {
576-
// SAFETY: This executor is not thread safe, so the future and its result
577-
// cannot be sent to another thread.
578-
unsafe { self.inner().spawn_inner(future, &mut active) }
579-
580-
// As only one thread can spawn or poll tasks at a time, there is no need
581-
// to release lock contention here.
582-
});
583-
584-
// Push them to the user's collection.
585-
handles.extend(tasks);
586-
}
587-
588-
/// Attempts to run a task if at least one is scheduled.
589-
///
590-
/// Running a scheduled task means simply polling its future once.
591-
///
592-
/// # Examples
593-
///
594-
/// ```
595-
/// use async_executor::LocalExecutor;
596-
///
597-
/// let ex = LocalExecutor::new();
598-
/// assert!(!ex.try_tick()); // no tasks to run
599-
///
600-
/// let task = ex.spawn(async {
601-
/// println!("Hello world");
602-
/// });
603-
/// assert!(ex.try_tick()); // a task was found
604-
/// ```
605-
pub fn try_tick(&self) -> bool {
606-
self.inner().try_tick()
607-
}
608-
609-
/// Runs a single task.
610-
///
611-
/// Running a task means simply polling its future once.
612-
///
613-
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
614-
///
615-
/// # Examples
616-
///
617-
/// ```
618-
/// use async_executor::LocalExecutor;
619-
/// use futures_lite::future;
620-
///
621-
/// let ex = LocalExecutor::new();
622-
///
623-
/// let task = ex.spawn(async {
624-
/// println!("Hello world");
625-
/// });
626-
/// future::block_on(ex.tick()); // runs the task
627-
/// ```
628-
pub async fn tick(&self) {
629-
self.inner().tick().await
630-
}
631-
632-
/// Runs the executor until the given future completes.
633-
///
634-
/// # Examples
635-
///
636-
/// ```
637-
/// use async_executor::LocalExecutor;
638-
/// use futures_lite::future;
639-
///
640-
/// let local_ex = LocalExecutor::new();
641-
///
642-
/// let task = local_ex.spawn(async { 1 + 2 });
643-
/// let res = future::block_on(local_ex.run(async { task.await * 2 }));
644-
///
645-
/// assert_eq!(res, 6);
646-
/// ```
647-
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
648-
self.inner().run(future).await
649-
}
650-
651-
/// Returns a reference to the inner executor.
652-
fn inner(&self) -> &Executor<'a> {
653-
&self.inner
654-
}
655-
}
656-
657-
impl<'a> Default for LocalExecutor<'a> {
658-
fn default() -> LocalExecutor<'a> {
659-
LocalExecutor::new()
660-
}
661-
}
662-
663435
/// The state of a executor.
664436
struct State {
665437
/// The global queue.

0 commit comments

Comments
 (0)