Skip to content

Commit 9cd6f58

Browse files
authored
docs(examples): Add example for capturing metadata
* docs(examples): Add example for capturing metadata Adds an example of capturing metadata within the future creation function. * refactor(examples): Added ByDuration::duration helper * style(examples): Deref the &mut &Cell to &Cell immediately One less deref involved after this. * style(examples): Fix clippy
1 parent 182c4dd commit 9cd6f58

File tree

2 files changed

+145
-0
lines changed

2 files changed

+145
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ flaky_test = "0.1"
2525
flume = { version = "0.10", default-features = false }
2626
futures-lite = "1.12.0"
2727
once_cell = "1"
28+
pin-project-lite = "0.2.10"
2829
smol = "1"
2930

3031
# rewrite dependencies to use the this version of async-task when running tests

examples/with-metadata.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//! A single threaded executor that uses shortest-job-first scheduling.
2+
3+
use std::cell::RefCell;
4+
use std::collections::BinaryHeap;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
use std::thread;
8+
use std::time::{Duration, Instant};
9+
use std::{cell::Cell, future::Future};
10+
11+
use async_task::{Builder, Runnable, Task};
12+
use pin_project_lite::pin_project;
13+
use smol::{channel, future};
14+
15+
struct ByDuration(Runnable<DurationMetadata>);
16+
17+
impl ByDuration {
18+
fn duration(&self) -> Duration {
19+
self.0.metadata().inner.get()
20+
}
21+
}
22+
23+
impl PartialEq for ByDuration {
24+
fn eq(&self, other: &Self) -> bool {
25+
self.duration() == other.duration()
26+
}
27+
}
28+
29+
impl Eq for ByDuration {}
30+
31+
impl PartialOrd for ByDuration {
32+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
33+
self.duration()
34+
.partial_cmp(&other.duration())
35+
.map(|ord| ord.reverse())
36+
}
37+
}
38+
39+
impl Ord for ByDuration {
40+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
41+
self.duration().cmp(&other.duration()).reverse()
42+
}
43+
}
44+
45+
pin_project! {
46+
#[must_use = "futures do nothing unless you `.await` or poll them"]
47+
struct MeasureRuntime<'a, F> {
48+
#[pin]
49+
f: F,
50+
duration: &'a Cell<Duration>
51+
}
52+
}
53+
54+
impl<'a, F: Future> Future for MeasureRuntime<'a, F> {
55+
type Output = F::Output;
56+
57+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
58+
let this = self.project();
59+
let duration_cell: &Cell<Duration> = this.duration;
60+
let start = Instant::now();
61+
let res = F::poll(this.f, cx);
62+
let new_duration = Instant::now() - start;
63+
duration_cell.set(duration_cell.get() / 2 + new_duration / 2);
64+
res
65+
}
66+
}
67+
68+
pub struct DurationMetadata {
69+
inner: Cell<Duration>,
70+
}
71+
72+
thread_local! {
73+
// A queue that holds scheduled tasks.
74+
static QUEUE: RefCell<BinaryHeap<ByDuration>> = RefCell::new(BinaryHeap::new());
75+
}
76+
77+
fn make_future_fn<'a, F>(
78+
future: F,
79+
) -> impl (FnOnce(&'a DurationMetadata) -> MeasureRuntime<'a, F>) {
80+
move |duration_meta| MeasureRuntime {
81+
f: future,
82+
duration: &duration_meta.inner,
83+
}
84+
}
85+
86+
fn ensure_safe_schedule<F: Send + Sync + 'static>(f: F) -> F {
87+
f
88+
}
89+
90+
/// Spawns a future on the executor.
91+
pub fn spawn<F, T>(future: F) -> Task<T, DurationMetadata>
92+
where
93+
F: Future<Output = T> + 'static,
94+
T: 'static,
95+
{
96+
let spawn_thread_id = thread::current().id();
97+
// Create a task that is scheduled by pushing it into the queue.
98+
let schedule = ensure_safe_schedule(move |runnable| {
99+
if thread::current().id() != spawn_thread_id {
100+
panic!("Task would be run on a different thread than spawned on.");
101+
}
102+
QUEUE.with(move |queue| queue.borrow_mut().push(ByDuration(runnable)));
103+
});
104+
let future_fn = make_future_fn(future);
105+
let (runnable, task) = unsafe {
106+
Builder::new()
107+
.metadata(DurationMetadata {
108+
inner: Cell::new(Duration::default()),
109+
})
110+
.spawn_unchecked(future_fn, schedule)
111+
};
112+
113+
// Schedule the task by pushing it into the queue.
114+
runnable.schedule();
115+
116+
task
117+
}
118+
119+
pub fn block_on<F>(future: F)
120+
where
121+
F: Future<Output = ()> + 'static,
122+
{
123+
let task = spawn(future);
124+
while !task.is_finished() {
125+
let Some(runnable) = QUEUE.with(|queue| queue.borrow_mut().pop()) else { thread::yield_now(); continue };
126+
runnable.0.run();
127+
}
128+
}
129+
130+
fn main() {
131+
// Spawn a future and await its result.
132+
block_on(async {
133+
let (sender, receiver) = channel::bounded(1);
134+
let world = spawn(async move {
135+
receiver.recv().await.unwrap();
136+
println!("world.")
137+
});
138+
let hello = spawn(async move {
139+
sender.send(()).await.unwrap();
140+
print!("Hello, ")
141+
});
142+
future::zip(hello, world).await;
143+
});
144+
}

0 commit comments

Comments
 (0)