Skip to content

Commit 73d9a6b

Browse files
authored
fix "resource has children" trap due to undropped pollables (#63)
Previously, it was possible to drop an `input-stream`, `output-stream`, or `future-incoming-response` while it still had an undropped child `pollable` given that `spin-executor` provided no way to cancel a `pollable` previously registered using `push_waker`. In addition, it was possible to unintentionally register more than one pollable for such a resource concurrently. This commit adds a `CancelToken` return value to `push_waker`, which may be used to explicitly cancel the registration and immediately drop the `pollable`. We now use that, along with `CancelOnDropToken`, to ensure that we cancel any previous registration before creating a new one, as well as guarantee that the registration is cancelled before attempting to drop the parent resource. Signed-off-by: Joel Dice <[email protected]>
1 parent 2b0dbd7 commit 73d9a6b

File tree

2 files changed

+106
-28
lines changed

2 files changed

+106
-28
lines changed

crates/executor/src/lib.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use bindings::wasi::io;
22
use std::future::Future;
33
use std::mem;
4+
use std::ops::DerefMut;
45
use std::sync::{Arc, Mutex};
56
use std::task::{Context, Poll, Wake, Waker};
67

@@ -20,11 +21,42 @@ impl std::fmt::Display for io::streams::Error {
2021

2122
impl std::error::Error for io::streams::Error {}
2223

23-
static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
24+
type Wrapped = Arc<Mutex<Option<io::poll::Pollable>>>;
25+
26+
static WAKERS: Mutex<Vec<(Wrapped, Waker)>> = Mutex::new(Vec::new());
27+
28+
/// Handle to a Pollable pushed using `push_waker` which may be used to cancel
29+
/// and drop the Pollable.
30+
pub struct CancelToken(Wrapped);
31+
32+
impl CancelToken {
33+
/// Cancel and drop the Pollable.
34+
pub fn cancel(self) {
35+
drop(self.0.lock().unwrap().take())
36+
}
37+
}
38+
39+
/// Handle to a Pollable pushed using `push_waker` which, when dropped, will
40+
/// cancel and drop the Pollable.
41+
pub struct CancelOnDropToken(Wrapped);
42+
43+
impl From<CancelToken> for CancelOnDropToken {
44+
fn from(token: CancelToken) -> Self {
45+
Self(token.0)
46+
}
47+
}
48+
49+
impl Drop for CancelOnDropToken {
50+
fn drop(&mut self) {
51+
drop(self.0.lock().unwrap().take())
52+
}
53+
}
2454

2555
/// Push a Pollable and Waker to WAKERS.
26-
pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) {
27-
WAKERS.lock().unwrap().push((pollable, waker));
56+
pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) -> CancelToken {
57+
let wrapped = Arc::new(Mutex::new(Some(pollable)));
58+
WAKERS.lock().unwrap().push((wrapped.clone(), waker));
59+
CancelToken(wrapped)
2860
}
2961

3062
/// Run the specified future to completion blocking until it yields a result.
@@ -45,13 +77,17 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
4577
Poll::Pending => {
4678
let mut new_wakers = Vec::new();
4779

48-
let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap());
49-
50-
assert!(!wakers.is_empty());
80+
let wakers = mem::take(WAKERS.lock().unwrap().deref_mut())
81+
.into_iter()
82+
.filter_map(|(wrapped, waker)| {
83+
let pollable = wrapped.lock().unwrap().take();
84+
pollable.map(|pollable| (wrapped, pollable, waker))
85+
})
86+
.collect::<Vec<_>>();
5187

5288
let pollables = wakers
5389
.iter()
54-
.map(|(pollable, _)| pollable)
90+
.map(|(_, pollable, _)| pollable)
5591
.collect::<Vec<_>>();
5692

5793
let mut ready = vec![false; wakers.len()];
@@ -60,11 +96,12 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
6096
ready[usize::try_from(index).unwrap()] = true;
6197
}
6298

63-
for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
99+
for (ready, (wrapped, pollable, waker)) in ready.into_iter().zip(wakers) {
64100
if ready {
65101
waker.wake()
66102
} else {
67-
new_wakers.push((pollable, waker));
103+
*wrapped.lock().unwrap() = Some(pollable);
104+
new_wakers.push((wrapped, waker));
68105
}
69106
}
70107

src/http/executor.rs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use crate::wit::wasi::http0_2_0::outgoing_handler;
22
use crate::wit::wasi::http0_2_0::types::{
3-
ErrorCode, IncomingBody, IncomingResponse, OutgoingBody, OutgoingRequest,
3+
ErrorCode, FutureIncomingResponse, IncomingBody, IncomingResponse, OutgoingBody,
4+
OutgoingRequest,
45
};
56

67
use spin_executor::bindings::wasi::io;
78
use spin_executor::bindings::wasi::io::streams::{InputStream, OutputStream, StreamError};
89

910
use futures::{future, sink, stream, Sink, Stream};
1011

11-
pub use spin_executor::run;
12+
pub use spin_executor::{run, CancelOnDropToken};
1213

1314
use std::cell::RefCell;
1415
use std::future::Future;
@@ -18,37 +19,46 @@ use std::task::Poll;
1819
const READ_SIZE: u64 = 16 * 1024;
1920

2021
pub(crate) fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = StreamError> {
21-
struct Outgoing(Option<(OutputStream, OutgoingBody)>);
22+
struct Outgoing {
23+
stream_and_body: Option<(OutputStream, OutgoingBody)>,
24+
cancel_token: Option<CancelOnDropToken>,
25+
}
2226

2327
impl Drop for Outgoing {
2428
fn drop(&mut self) {
25-
if let Some((stream, body)) = self.0.take() {
29+
drop(self.cancel_token.take());
30+
31+
if let Some((stream, body)) = self.stream_and_body.take() {
2632
drop(stream);
2733
_ = OutgoingBody::finish(body, None);
2834
}
2935
}
3036
}
3137

3238
let stream = body.write().expect("response body should be writable");
33-
let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
39+
let outgoing = Rc::new(RefCell::new(Outgoing {
40+
stream_and_body: Some((stream, body)),
41+
cancel_token: None,
42+
}));
3443

3544
sink::unfold((), {
3645
move |(), chunk: Vec<u8>| {
3746
future::poll_fn({
3847
let mut offset = 0;
3948
let mut flushing = false;
40-
let pair = pair.clone();
49+
let outgoing = outgoing.clone();
4150

4251
move |context| {
43-
let pair = pair.borrow();
44-
let (stream, _) = &pair.0.as_ref().unwrap();
52+
let mut outgoing = outgoing.borrow_mut();
53+
let (stream, _) = &outgoing.stream_and_body.as_ref().unwrap();
4554
loop {
4655
match stream.check_write() {
4756
Ok(0) => {
48-
spin_executor::push_waker(
49-
stream.subscribe(),
50-
context.waker().clone(),
51-
);
57+
outgoing.cancel_token =
58+
Some(CancelOnDropToken::from(spin_executor::push_waker(
59+
stream.subscribe(),
60+
context.waker().clone(),
61+
)));
5262
break Poll::Pending;
5363
}
5464
Ok(count) => {
@@ -93,14 +103,33 @@ pub(crate) fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = St
93103
pub(crate) fn outgoing_request_send(
94104
request: OutgoingRequest,
95105
) -> impl Future<Output = Result<IncomingResponse, ErrorCode>> {
106+
struct State {
107+
response: Option<Result<FutureIncomingResponse, ErrorCode>>,
108+
cancel_token: Option<CancelOnDropToken>,
109+
}
110+
111+
impl Drop for State {
112+
fn drop(&mut self) {
113+
drop(self.cancel_token.take());
114+
drop(self.response.take());
115+
}
116+
}
117+
96118
let response = outgoing_handler::handle(request, None);
119+
let mut state = State {
120+
response: Some(response),
121+
cancel_token: None,
122+
};
97123
future::poll_fn({
98-
move |context| match &response {
124+
move |context| match &state.response.as_ref().unwrap() {
99125
Ok(response) => {
100126
if let Some(response) = response.get() {
101127
Poll::Ready(response.unwrap())
102128
} else {
103-
spin_executor::push_waker(response.subscribe(), context.waker().clone());
129+
state.cancel_token = Some(CancelOnDropToken::from(spin_executor::push_waker(
130+
response.subscribe(),
131+
context.waker().clone(),
132+
)));
104133
Poll::Pending
105134
}
106135
}
@@ -113,11 +142,16 @@ pub(crate) fn outgoing_request_send(
113142
pub fn incoming_body(
114143
body: IncomingBody,
115144
) -> impl Stream<Item = Result<Vec<u8>, io::streams::Error>> {
116-
struct Incoming(Option<(InputStream, IncomingBody)>);
145+
struct Incoming {
146+
stream_and_body: Option<(InputStream, IncomingBody)>,
147+
cancel_token: Option<CancelOnDropToken>,
148+
}
117149

118150
impl Drop for Incoming {
119151
fn drop(&mut self) {
120-
if let Some((stream, body)) = self.0.take() {
152+
drop(self.cancel_token.take());
153+
154+
if let Some((stream, body)) = self.stream_and_body.take() {
121155
drop(stream);
122156
IncomingBody::finish(body);
123157
}
@@ -126,14 +160,21 @@ pub fn incoming_body(
126160

127161
stream::poll_fn({
128162
let stream = body.stream().expect("response body should be readable");
129-
let pair = Incoming(Some((stream, body)));
163+
let mut incoming = Incoming {
164+
stream_and_body: Some((stream, body)),
165+
cancel_token: None,
166+
};
130167

131168
move |context| {
132-
if let Some((stream, _)) = &pair.0 {
169+
if let Some((stream, _)) = &incoming.stream_and_body {
133170
match stream.read(READ_SIZE) {
134171
Ok(buffer) => {
135172
if buffer.is_empty() {
136-
spin_executor::push_waker(stream.subscribe(), context.waker().clone());
173+
incoming.cancel_token =
174+
Some(CancelOnDropToken::from(spin_executor::push_waker(
175+
stream.subscribe(),
176+
context.waker().clone(),
177+
)));
137178
Poll::Pending
138179
} else {
139180
Poll::Ready(Some(Ok(buffer)))

0 commit comments

Comments
 (0)