Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ wasmtime-wasi-http = "18.0.1"
wit-component = "0.200.0"

[workspace.package]
version = "3.1.0"
version = "3.1.1"
authors = ["Fermyon Engineering <[email protected]>"]
edition = "2021"
license = "Apache-2.0 WITH LLVM-exception"
Expand Down
29 changes: 21 additions & 8 deletions crates/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type Wrapped = Arc<Mutex<Option<io::poll::Pollable>>>;

static WAKERS: Mutex<Vec<(Wrapped, Waker)>> = Mutex::new(Vec::new());

/// Handle to a Pollable pushed using `push_waker` which may be used to cancel
/// and drop the Pollable.
/// Handle to a Pollable registered using `push_waker_and_get_token` which may
/// be used to cancel and drop the Pollable.
pub struct CancelToken(Wrapped);

impl CancelToken {
Expand All @@ -36,8 +36,8 @@ impl CancelToken {
}
}

/// Handle to a Pollable pushed using `push_waker` which, when dropped, will
/// cancel and drop the Pollable.
/// Handle to a Pollable registered using `push_waker_and_get_token` which, when
/// dropped, will cancel and drop the Pollable.
pub struct CancelOnDropToken(Wrapped);

impl From<CancelToken> for CancelOnDropToken {
Expand All @@ -52,16 +52,27 @@ impl Drop for CancelOnDropToken {
}
}

/// Push a Pollable and Waker to WAKERS.
pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) -> CancelToken {
/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
/// loop.
pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) {
_ = push_waker_and_get_token(pollable, waker);
}

/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
/// loop and retrieve a [`CancelToken`] to cancel the registration later, if
/// desired.
pub fn push_waker_and_get_token(pollable: io::poll::Pollable, waker: Waker) -> CancelToken {
let wrapped = Arc::new(Mutex::new(Some(pollable)));
WAKERS.lock().unwrap().push((wrapped.clone(), waker));
CancelToken(wrapped)
}

/// Run the specified future to completion blocking until it yields a result.
/// Run the specified future to completion, blocking until it yields a result.
///
/// Based on an executor using `wasi::io/poll/poll-list`,
/// This will alternate between polling the specified future and polling any
/// `Pollable`s registered using [`push_waker`] or [`push_waker_and_get_token`]
/// using `wasi::io/poll/poll-list`. It will panic if the future returns
/// `Poll::Pending` without having registered at least one `Pollable`.
pub fn run<T>(future: impl Future<Output = T>) -> T {
futures::pin_mut!(future);
struct DummyWaker;
Expand All @@ -85,6 +96,8 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
})
.collect::<Vec<_>>();

assert!(!wakers.is_empty());

let pollables = wakers
.iter()
.map(|(_, pollable, _)| pollable)
Expand Down
24 changes: 14 additions & 10 deletions src/http/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ pub(crate) fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = St
loop {
match stream.check_write() {
Ok(0) => {
outgoing.cancel_token =
Some(CancelOnDropToken::from(spin_executor::push_waker(
outgoing.cancel_token = Some(CancelOnDropToken::from(
spin_executor::push_waker_and_get_token(
stream.subscribe(),
context.waker().clone(),
)));
),
));
break Poll::Pending;
}
Ok(count) => {
Expand Down Expand Up @@ -126,10 +127,12 @@ pub(crate) fn outgoing_request_send(
if let Some(response) = response.get() {
Poll::Ready(response.unwrap())
} else {
state.cancel_token = Some(CancelOnDropToken::from(spin_executor::push_waker(
response.subscribe(),
context.waker().clone(),
)));
state.cancel_token = Some(CancelOnDropToken::from(
spin_executor::push_waker_and_get_token(
response.subscribe(),
context.waker().clone(),
),
));
Poll::Pending
}
}
Expand Down Expand Up @@ -170,11 +173,12 @@ pub fn incoming_body(
match stream.read(READ_SIZE) {
Ok(buffer) => {
if buffer.is_empty() {
incoming.cancel_token =
Some(CancelOnDropToken::from(spin_executor::push_waker(
incoming.cancel_token = Some(CancelOnDropToken::from(
spin_executor::push_waker_and_get_token(
stream.subscribe(),
context.waker().clone(),
)));
),
));
Poll::Pending
} else {
Poll::Ready(Some(Ok(buffer)))
Expand Down
4 changes: 2 additions & 2 deletions src/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum InferencingModel<'a> {
Other(&'a str),
}

impl<'a> std::fmt::Display for InferencingModel<'a> {
impl std::fmt::Display for InferencingModel<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
InferencingModel::Llama2Chat => "llama2-chat",
Expand Down Expand Up @@ -100,7 +100,7 @@ pub enum EmbeddingModel<'a> {
Other(&'a str),
}

impl<'a> std::fmt::Display for EmbeddingModel<'a> {
impl std::fmt::Display for EmbeddingModel<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
EmbeddingModel::AllMiniLmL6V2 => "all-minilm-l6-v2",
Expand Down
Loading