-
-
Notifications
You must be signed in to change notification settings - Fork 315
Add a test to demonstrate connection logical deadlock under high concurrency #852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sandersaares
wants to merge
6
commits into
hyperium:master
Choose a base branch
from
sandersaares:logically-deadlocked
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
1b7e27f
Deadlock start
sandersaares 5e195dd
Demonstrate logical deadlock of HTTP2 connection when handling more r…
sandersaares 20759ba
Remove misleading comment
sandersaares 13edf15
Delete misleading comment
sandersaares 1e08581
Tidy
sandersaares 66abdc0
tokio::time::sleep
sandersaares File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
use std::{ | ||
mem, | ||
net::SocketAddr, | ||
sync::atomic::{self, AtomicUsize}, | ||
time::Instant, | ||
}; | ||
|
||
use futures::StreamExt; | ||
use h2_support::prelude::*; | ||
use tokio::{net::TcpStream, runtime::Runtime}; | ||
|
||
static REQUESTS_COMPLETED: AtomicUsize = AtomicUsize::new(0); | ||
|
||
// The magic happens when concurrency is much greater than max_concurrent_streams. | ||
const CONCURRENCY: usize = 100; | ||
const MAX_CONCURRENT_STREAMS: u32 = 20; | ||
|
||
// If we successfully complete this many requests per task, we can call the test a success. | ||
// Typically, this test seems to fail in the 10-25 request range but no doubt there are many | ||
// variables in play, as failures are somewhat random and timing-dependent. | ||
const TARGET_REQUESTS_PER_TASK: usize = 100; | ||
|
||
// The test is successful if all the expected requests have been completed. | ||
const TARGET_REQUESTS_COMPLETED: usize = CONCURRENCY * TARGET_REQUESTS_PER_TASK; | ||
|
||
// If no requests have been completed in this long, we consider the connection deadlocked. | ||
const MUST_MAKE_PROGRESS_INTERVAL: Duration = Duration::from_secs(2); | ||
const CHECK_FOR_PROGRESS_INTERVAL: Duration = Duration::from_millis(100); | ||
|
||
#[test] | ||
fn logical_deadlock() { | ||
let server_addr = serve(); | ||
|
||
let runtime = Runtime::new().unwrap(); | ||
|
||
runtime.block_on(async move { | ||
let tcp_connection = TcpStream::connect(server_addr).await.unwrap(); | ||
let (client, h2_connection) = client::handshake(tcp_connection).await.unwrap(); | ||
|
||
tokio::spawn(async move { | ||
// This just does "connection stuff" independent of any requests. | ||
h2_connection.await.unwrap(); | ||
}); | ||
|
||
let mut join_handles = Vec::new(); | ||
|
||
for _ in 0..CONCURRENCY { | ||
join_handles.push(tokio::spawn({ | ||
let mut client = client.clone(); | ||
|
||
async move { | ||
for _ in 0..TARGET_REQUESTS_PER_TASK { | ||
let request = Request::builder() | ||
.method(Method::POST) | ||
.uri("/") | ||
.body(()) | ||
.unwrap(); | ||
|
||
let (response_future, mut request_body) = | ||
client.send_request(request, false).unwrap(); | ||
|
||
request_body | ||
.send_data(Bytes::from_static(REQUEST_BODY), true) | ||
.unwrap(); | ||
|
||
// The anomaly we expect to see will result in this await never completing | ||
// for all `CONCURRENCY` concurrent requests enqueued on this connection. | ||
let mut response = match response_future.await { | ||
Ok(response) => response, | ||
Err(e) | ||
if e.is_reset() && e.reason() == Some(Reason::REFUSED_STREAM) => | ||
{ | ||
// Not relevant - it means the server sent a max_concurrent_streams | ||
// limit and the client has not yet seen it, so it is not enforced. | ||
// Not an ideal outcome in general but not relevant to this test | ||
// scenario - the anomaly we expect to see results in deadlocked | ||
// requests, not failing requests. | ||
REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed); | ||
continue; | ||
} | ||
Err(e) => { | ||
// This failure is not expected as part of the test scenario, neither for fail nor pass results. | ||
panic!("Request failed unexpectedly: {:?}", e); | ||
} | ||
}; | ||
|
||
let response_body_stream = response.body_mut(); | ||
let mut total_length: usize = 0; | ||
|
||
while let Some(Ok(chunk)) = response_body_stream.next().await { | ||
total_length += chunk.len(); | ||
} | ||
|
||
assert_eq!(total_length, 10 * 1024); | ||
|
||
REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed); | ||
} | ||
} | ||
})); | ||
} | ||
|
||
let mut last_progress = Instant::now(); | ||
let mut last_requests_completed = 0; | ||
|
||
while REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed) < TARGET_REQUESTS_COMPLETED { | ||
let requests_completed = REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed); | ||
|
||
println!("Requests completed: {requests_completed} of {TARGET_REQUESTS_COMPLETED}",); | ||
|
||
if requests_completed > last_requests_completed { | ||
last_progress = Instant::now(); | ||
} | ||
|
||
last_requests_completed = requests_completed; | ||
|
||
if Instant::now().duration_since(last_progress) > MUST_MAKE_PROGRESS_INTERVAL { | ||
panic!( | ||
"No requests completed in the last {:?}, deadlock likely occurred", | ||
MUST_MAKE_PROGRESS_INTERVAL | ||
); | ||
} | ||
|
||
thread::sleep(CHECK_FOR_PROGRESS_INTERVAL); | ||
} | ||
|
||
println!("All requests completed successfully."); | ||
|
||
for handle in join_handles { | ||
handle.await.unwrap(); | ||
} | ||
}); | ||
} | ||
|
||
static REQUEST_BODY: &[u8] = &[77; 10 * 1024]; | ||
static RESPONSE_BODY: &[u8] = &[88; 10 * 1024]; | ||
|
||
/// Starts a test server that will open a TCP socket and listen for HTTP/2 connections. | ||
/// | ||
/// For simplify, it will listen forever (until the test binary terminates). | ||
/// | ||
/// Expects to read 10 KB of request body from each request to `/`, | ||
/// to which it responds with a 200 OK with 10 KB of response body. | ||
fn serve() -> SocketAddr { | ||
let runtime = Runtime::new().unwrap(); | ||
|
||
let local_addr = runtime.block_on(async { | ||
let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) | ||
.await | ||
.unwrap(); | ||
|
||
let local_addr = listener.local_addr().unwrap(); | ||
|
||
tokio::spawn(async move { | ||
loop { | ||
let (socket, _) = listener.accept().await.unwrap(); | ||
|
||
// Fork each connection. | ||
tokio::spawn(async move { | ||
let mut connection = server::Builder::new() | ||
.max_concurrent_streams(MAX_CONCURRENT_STREAMS) | ||
.handshake(socket) | ||
.await | ||
.expect("Failed to handshake"); | ||
|
||
while let Some(incoming) = connection.next().await { | ||
let Ok((mut request, mut responder)) = incoming else { | ||
// This generally means connection terminated. | ||
break; | ||
}; | ||
|
||
// Fork each request. | ||
tokio::spawn(async move { | ||
let request_body_stream = request.body_mut(); | ||
let mut total_length: usize = 0; | ||
|
||
while let Some(Ok(chunk)) = request_body_stream.next().await { | ||
total_length += chunk.len(); | ||
} | ||
|
||
assert_eq!(total_length, 10 * 1024); | ||
|
||
let response = | ||
Response::builder().status(StatusCode::OK).body(()).unwrap(); | ||
let mut body_sender = responder.send_response(response, false).unwrap(); | ||
body_sender | ||
.send_data(Bytes::from_static(RESPONSE_BODY), true) | ||
.unwrap(); | ||
}); | ||
} | ||
}); | ||
} | ||
}); | ||
|
||
local_addr | ||
}); | ||
|
||
// Keep it running until end of process. | ||
mem::forget(runtime); | ||
|
||
local_addr | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd also want to use something like
tokio::time::sleep(CHECK_FOR_PROGRESS_INTERVAL).await
to not block any of the other tasks on this thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - updated.