Skip to content

nonblocking check of pollables when reactor is busy #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true }
anyhow.workspace = true
clap.workspace = true
futures-lite.workspace = true
futures-concurrency.workspace = true
humantime.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
Expand Down Expand Up @@ -63,6 +64,7 @@ cargo_metadata = "0.18.1"
clap = { version = "4.5.26", features = ["derive"] }
futures-core = "0.3.19"
futures-lite = "1.12.0"
futures-concurrency = "7.6"
humantime = "2.1.0"
heck = "0.5"
http = "1.1"
Expand Down
1 change: 1 addition & 0 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where
// as awake, reset and poll again. otherwise, block until a
// pollable wakes a future.
if root.is_awake() {
reactor.nonblock_check_pollables();
root.reset()
} else {
reactor.block_on_pollables()
Expand Down
89 changes: 80 additions & 9 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,58 @@ impl Reactor {

/// Block until at least one pending pollable is ready, waking a pending future.
pub(crate) fn block_on_pollables(&self) {
self.check_pollables(|targets| {
debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);
wasi::io::poll::poll(targets)

})
}

/// Without blocking, check for any ready pollables and wake the
/// associated futures.
pub(crate) fn nonblock_check_pollables(&self) {
// Lazily create a pollable which always resolves to ready.
use std::sync::LazyLock;
static READY_POLLABLE: LazyLock<Pollable> =
LazyLock::new(|| wasi::clocks::monotonic_clock::subscribe_duration(0));

self.check_pollables(|targets| {
// Create a new set of targets, with the addition of the ready
// pollable:
let ready_index = targets.len();
let mut new_targets = Vec::with_capacity(ready_index + 1);
new_targets.extend_from_slice(targets);
new_targets.push(&*READY_POLLABLE);

// Poll is now guaranteed to return immediately, because at least
// one member is ready:
let mut ready_list = wasi::io::poll::poll(&new_targets);

// Erase our extra ready pollable from the ready list:
ready_list.retain(|e| *e != ready_index as u32);
ready_list
})
}

/// Common core of blocking and nonblocking pollable checks. Wakes any
/// futures which are pending on the pollables, according to the result of
/// the check_ready function.
fn check_pollables<F>(&self, check_ready: F)
where
F: FnOnce(&[&Pollable]) -> Vec<u32>,
{
let reactor = self.inner.borrow();

// If no wakers are pending on pollables, there is no work to be done
// here:
if reactor.wakers.is_empty() {
return;
}

// We're about to wait for a number of pollables. When they wake we get
// the *indexes* back for the pollables whose events were available - so
// we need to be able to associate the index with the right waker.
Expand All @@ -144,15 +194,9 @@ impl Reactor {
targets.push(&reactor.pollables[pollable_index.0]);
}

debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);

// Now that we have that association, we're ready to poll our targets.
// This will block until an event has completed.
let ready_indexes = wasi::io::poll::poll(&targets);
// Now that we have that association, we're ready to check our targets for readiness.
// (This is either a wasi poll, or the nonblocking variant.)
let ready_indexes = check_ready(&targets);

// Once we have the indexes for which pollables are available, we need
// to convert it back to the right keys for the wakers. Earlier we
Expand Down Expand Up @@ -321,4 +365,31 @@ mod test {
);
})
}

#[test]
fn cooperative_concurrency() {
crate::runtime::block_on(async {
let cpu_heavy = async move {
// Simulating a CPU-heavy task that runs for 1 second and yields occasionally
for _ in 0..10 {
std::thread::sleep(std::time::Duration::from_millis(100));
futures_lite::future::yield_now().await;
}
true
};
let timeout = async move {
crate::time::Timer::after(crate::time::Duration::from_millis(200))
.wait()
.await;
false
};
let mut future_group = futures_concurrency::future::FutureGroup::<
Pin<Box<dyn std::future::Future<Output = bool>>>,
>::new();
future_group.insert(Box::pin(cpu_heavy));
future_group.insert(Box::pin(timeout));
let result = futures_lite::StreamExt::next(&mut future_group).await;
assert_eq!(result, Some(false), "cpu_heavy task should have timed out");
});
}
}
Loading