Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true }
anyhow.workspace = true
clap.workspace = true
futures-lite.workspace = true
futures-concurrency.workspace = true
humantime.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
Expand Down Expand Up @@ -63,6 +64,7 @@ cargo_metadata = "0.18.1"
clap = { version = "4.5.26", features = ["derive"] }
futures-core = "0.3.19"
futures-lite = "1.12.0"
futures-concurrency = "7.6"
humantime = "2.1.0"
heck = "0.5"
http = "1.1"
Expand Down
9 changes: 9 additions & 0 deletions src/runtime/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ where
// as awake, reset and poll again. otherwise, block until a
// pollable wakes a future.
if root.is_awake() {
reactor.nonblock_check_pollables();
root.reset()
} else {
// If there are no futures awake or waiting on a WASI
// pollable, its impossible for the reactor to make
// progress, and the only valid behaviors are to sleep
// forever or panic. This should only be reachable if the
// user's Futures are implemented incorrectly.
if !reactor.nonempty_pending_pollables() {
panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready")
}
reactor.block_on_pollables()
}
}
Expand Down
97 changes: 88 additions & 9 deletions src/runtime/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,66 @@ impl Reactor {
}
}

/// The reactor tracks the set of WASI pollables which have an associated
/// Future pending on their readiness. This function returns indicating
/// that set of pollables is not empty.
pub(crate) fn nonempty_pending_pollables(&self) -> bool {
!self.inner.borrow().wakers.is_empty()
}

/// Block until at least one pending pollable is ready, waking a pending future.
/// Precondition: self.nonempty_pending_pollables() is true.
pub(crate) fn block_on_pollables(&self) {
self.check_pollables(|targets| {
debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);
wasi::io::poll::poll(targets)

})
}

/// Without blocking, check for any ready pollables and wake the
/// associated futures.
pub(crate) fn nonblock_check_pollables(&self) {
// If there are no pollables with associated pending futures, there is
// no work to do here, so return immediately.
if !self.nonempty_pending_pollables() {
return;
}
// Lazily create a pollable which always resolves to ready.
use std::sync::LazyLock;
static READY_POLLABLE: LazyLock<Pollable> =
LazyLock::new(|| wasi::clocks::monotonic_clock::subscribe_duration(0));

self.check_pollables(|targets| {
// Create a new set of targets, with the addition of the ready
// pollable:
let ready_index = targets.len();
let mut new_targets = Vec::with_capacity(ready_index + 1);
new_targets.extend_from_slice(targets);
new_targets.push(&*READY_POLLABLE);

// Poll is now guaranteed to return immediately, because at least
// one member is ready:
let mut ready_list = wasi::io::poll::poll(&new_targets);

// Erase our extra ready pollable from the ready list:
ready_list.retain(|e| *e != ready_index as u32);
ready_list
})
}

/// Common core of blocking and nonblocking pollable checks. Wakes any
/// futures which are pending on the pollables, according to the result of
/// the check_ready function.
/// Precondition: self.nonempty_pending_pollables() is true.
fn check_pollables<F>(&self, check_ready: F)
where
F: FnOnce(&[&Pollable]) -> Vec<u32>,
{
let reactor = self.inner.borrow();

// We're about to wait for a number of pollables. When they wake we get
Expand All @@ -144,15 +202,9 @@ impl Reactor {
targets.push(&reactor.pollables[pollable_index.0]);
}

debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);

// Now that we have that association, we're ready to poll our targets.
// This will block until an event has completed.
let ready_indexes = wasi::io::poll::poll(&targets);
// Now that we have that association, we're ready to check our targets for readiness.
// (This is either a wasi poll, or the nonblocking variant.)
let ready_indexes = check_ready(&targets);

// Once we have the indexes for which pollables are available, we need
// to convert it back to the right keys for the wakers. Earlier we
Expand Down Expand Up @@ -321,4 +373,31 @@ mod test {
);
})
}

#[test]
fn cooperative_concurrency() {
crate::runtime::block_on(async {
let cpu_heavy = async move {
// Simulating a CPU-heavy task that runs for 1 second and yields occasionally
for _ in 0..10 {
std::thread::sleep(std::time::Duration::from_millis(100));
futures_lite::future::yield_now().await;
}
true
};
let timeout = async move {
crate::time::Timer::after(crate::time::Duration::from_millis(200))
.wait()
.await;
false
};
let mut future_group = futures_concurrency::future::FutureGroup::<
Pin<Box<dyn std::future::Future<Output = bool>>>,
>::new();
future_group.insert(Box::pin(cpu_heavy));
future_group.insert(Box::pin(timeout));
let result = futures_lite::StreamExt::next(&mut future_group).await;
Comment on lines +394 to +399
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this doesn't work here? Afaict this is just race-semantics?

Suggested change
let mut future_group = futures_concurrency::future::FutureGroup::<
Pin<Box<dyn std::future::Future<Output = bool>>>,
>::new();
future_group.insert(Box::pin(cpu_heavy));
future_group.insert(Box::pin(timeout));
let result = futures_lite::StreamExt::next(&mut future_group).await;
let result = (cpu_heavy, timeout).race().await;

Assuming futures_concurrency::prelude::*; is imported of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this change, and it doesn't actually trigger the broken behavior that the existing test case does. I don't actually understand why, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's really weird; they should both work very similarly. The only real diffence I can think of is perhaps some amount of randomness in the execution order.

@SilverMira do you remember why you chose to use FutureGroup rather than Future::race?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts

do you remember why you chose to use FutureGroup rather than Future::race?

I only used it for as an repro, because in my actual project, the issue comes in the form of a deadlock, like how @teohhanhui experienced in #73 (comment).

The difference in between FutureGroup and Future::race lies in how it polls its inner futures, if I recall correctly, Future::race polls its inner futures in a random order whenever any one of its inner futures wake. Which means the wasi future could potentially be randomly chosen to be polled before the other task (even when it's not woken), and Pollable::ready would complete the future, thus winning the future.

Meanwhile, FutureGroup and others like StreamExt will explicitly skip un-woken inner futures when it itself is polled. Since the runtime isn't doing poll::poll while the main task is busy, it never wakes the wasi pollable, hence the FutureGroup never tries to poll them, causing it to lose the race.

assert_eq!(result, Some(false), "cpu_heavy task should have timed out");
});
}
}
Loading