Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Properly wake Payload receivers when feeding errors or EOF

## 3.11.1

- Prevent more hangs after client disconnects.
Expand Down
2 changes: 1 addition & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ serde_json = "1.0"
static_assertions = "1"
tls-openssl = { package = "openssl", version = "0.10.55" }
tls-rustls_023 = { package = "rustls", version = "0.23" }
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }

[lints]
workspace = true
Expand Down
68 changes: 68 additions & 0 deletions actix-http/src/h1/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@ impl Inner {
#[inline]
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
self.wake();
}

#[inline]
fn feed_eof(&mut self) {
self.eof = true;
self.wake();
}

#[inline]
Expand Down Expand Up @@ -253,8 +255,13 @@ impl Inner {

#[cfg(test)]
mod tests {
use std::{task::Poll, time::Duration};

use actix_rt::time::timeout;
use actix_utils::future::poll_fn;
use futures_util::{FutureExt, StreamExt};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use tokio::sync::oneshot;

use super::*;

Expand All @@ -263,6 +270,67 @@ mod tests {

assert_impl_all!(Inner: Unpin, Send, Sync);

const WAKE_TIMEOUT: Duration = Duration::from_secs(2);

fn prepare_waking_test(
mut payload: Payload,
expected: Option<Result<(), ()>>,
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
let (tx, rx) = oneshot::channel();

let handle = actix_rt::spawn(async move {
// Make sure to poll once to set the waker
poll_fn(|cx| {
assert!(payload.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
tx.send(()).unwrap();

// actix-rt is single-threaded, so this won't race with `rx.await`
let mut pend_once = false;
poll_fn(|_| {
if pend_once {
Poll::Ready(())
} else {
// Return pending without storing wakers, we already did on the previous
// `poll_fn`, now this task will only continue if the `sender` wakes us
pend_once = true;
Poll::Pending
}
})
.await;

let got = payload.next().now_or_never().unwrap();
match expected {
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
Some(Err(_)) => assert!(got.unwrap().is_err()),
None => assert!(got.is_none()),
}
});
(rx, handle)
}

#[actix_rt::test]
async fn wake_on_error() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));

rx.await.unwrap();
sender.set_error(PayloadError::Incomplete(None));
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}

#[actix_rt::test]
async fn wake_on_eof() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, None);

rx.await.unwrap();
sender.feed_eof();
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}

#[actix_rt::test]
async fn test_unread_data() {
let (_, mut payload) = Payload::create(false);
Expand Down
Loading