Skip to content

Commit d3c4653

Browse files
fix(http): Wake Payload when feeding error or EOF (#3749)
* fix(http): Add failing tests to demonstrate the payload problem Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com> * fix(http): Wake Payload when feeding error or eof Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com> --------- Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>
1 parent 8996198 commit d3c4653

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

actix-http/CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Properly wake Payload receivers when feeding errors or EOF
6+
57
## 3.11.1
68

79
- Prevent more hangs after client disconnects.

actix-http/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ serde_json = "1.0"
156156
static_assertions = "1"
157157
tls-openssl = { package = "openssl", version = "0.10.55" }
158158
tls-rustls_023 = { package = "rustls", version = "0.23" }
159-
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
159+
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
160160

161161
[lints]
162162
workspace = true

actix-http/src/h1/payload.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,13 @@ impl Inner {
200200
#[inline]
201201
fn set_error(&mut self, err: PayloadError) {
202202
self.err = Some(err);
203+
self.wake();
203204
}
204205

205206
#[inline]
206207
fn feed_eof(&mut self) {
207208
self.eof = true;
209+
self.wake();
208210
}
209211

210212
#[inline]
@@ -253,8 +255,13 @@ impl Inner {
253255

254256
#[cfg(test)]
255257
mod tests {
258+
use std::{task::Poll, time::Duration};
259+
260+
use actix_rt::time::timeout;
256261
use actix_utils::future::poll_fn;
262+
use futures_util::{FutureExt, StreamExt};
257263
use static_assertions::{assert_impl_all, assert_not_impl_any};
264+
use tokio::sync::oneshot;
258265

259266
use super::*;
260267

@@ -263,6 +270,67 @@ mod tests {
263270

264271
assert_impl_all!(Inner: Unpin, Send, Sync);
265272

273+
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
274+
275+
fn prepare_waking_test(
276+
mut payload: Payload,
277+
expected: Option<Result<(), ()>>,
278+
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
279+
let (tx, rx) = oneshot::channel();
280+
281+
let handle = actix_rt::spawn(async move {
282+
// Make sure to poll once to set the waker
283+
poll_fn(|cx| {
284+
assert!(payload.poll_next_unpin(cx).is_pending());
285+
Poll::Ready(())
286+
})
287+
.await;
288+
tx.send(()).unwrap();
289+
290+
// actix-rt is single-threaded, so this won't race with `rx.await`
291+
let mut pend_once = false;
292+
poll_fn(|_| {
293+
if pend_once {
294+
Poll::Ready(())
295+
} else {
296+
// Return pending without storing wakers, we already did on the previous
297+
// `poll_fn`, now this task will only continue if the `sender` wakes us
298+
pend_once = true;
299+
Poll::Pending
300+
}
301+
})
302+
.await;
303+
304+
let got = payload.next().now_or_never().unwrap();
305+
match expected {
306+
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
307+
Some(Err(_)) => assert!(got.unwrap().is_err()),
308+
None => assert!(got.is_none()),
309+
}
310+
});
311+
(rx, handle)
312+
}
313+
314+
#[actix_rt::test]
315+
async fn wake_on_error() {
316+
let (mut sender, payload) = Payload::create(false);
317+
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
318+
319+
rx.await.unwrap();
320+
sender.set_error(PayloadError::Incomplete(None));
321+
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
322+
}
323+
324+
#[actix_rt::test]
325+
async fn wake_on_eof() {
326+
let (mut sender, payload) = Payload::create(false);
327+
let (rx, handle) = prepare_waking_test(payload, None);
328+
329+
rx.await.unwrap();
330+
sender.feed_eof();
331+
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
332+
}
333+
266334
#[actix_rt::test]
267335
async fn test_unread_data() {
268336
let (_, mut payload) = Payload::create(false);

0 commit comments

Comments
 (0)