|
| 1 | +use std::time::Duration; |
| 2 | + |
1 | 3 | use anyhow::Context;
|
| 4 | +use base::{integration_test, rt_worker::worker_ctx::TerminationToken, server::ServerEvent}; |
2 | 5 | use http::{Request, StatusCode};
|
3 | 6 | use hyper::{body::to_bytes, Body};
|
4 |
| -use tokio::join; |
| 7 | +use serial_test::serial; |
| 8 | +use tokio::{join, sync::mpsc}; |
5 | 9 |
|
6 | 10 | use crate::integration_test_helper::TestBedBuilder;
|
7 | 11 |
|
8 | 12 | #[path = "../src/utils/integration_test_helper.rs"]
|
9 | 13 | mod integration_test_helper;
|
10 | 14 |
|
11 | 15 | #[tokio::test]
|
| 16 | +#[serial] |
12 | 17 | async fn req_failure_case_timeout() {
|
13 | 18 | let tb = TestBedBuilder::new("./test_cases/main")
|
14 | 19 | // NOTE: It should be small enough that the worker pool rejects the
|
@@ -46,3 +51,53 @@ async fn req_failure_case_timeout() {
|
46 | 51 | tb.exit().await;
|
47 | 52 | assert!(found_timeout);
|
48 | 53 | }
|
| 54 | + |
| 55 | +#[tokio::test] |
| 56 | +#[serial] |
| 57 | +async fn req_failure_case_intentional_peer_reset() { |
| 58 | + let termination_token = TerminationToken::new(); |
| 59 | + let (server_ev_tx, mut server_ev_rx) = mpsc::unbounded_channel(); |
| 60 | + |
| 61 | + integration_test!( |
| 62 | + "./test_cases/main", |
| 63 | + 8999, |
| 64 | + "slow_resp", |
| 65 | + ( |
| 66 | + |port: usize, url: &'static str, mut ev: mpsc::UnboundedReceiver<ServerEvent>| async move { |
| 67 | + tokio::spawn(async move { |
| 68 | + loop { |
| 69 | + tokio::select! { |
| 70 | + Some(ev) = ev.recv() => { |
| 71 | + let _ = server_ev_tx.send(ev); |
| 72 | + break; |
| 73 | + } |
| 74 | + |
| 75 | + else => continue |
| 76 | + } |
| 77 | + } |
| 78 | + }); |
| 79 | + |
| 80 | + Some( |
| 81 | + reqwest::Client::new() |
| 82 | + .get(format!("http://localhost:{}/{}", port, url)) |
| 83 | + .timeout(Duration::from_millis(100)) |
| 84 | + .send() |
| 85 | + .await, |
| 86 | + ) |
| 87 | + }, |
| 88 | + |_resp: Result<reqwest::Response, reqwest::Error>| async {} |
| 89 | + ), |
| 90 | + termination_token.clone() |
| 91 | + ); |
| 92 | + |
| 93 | + let ev = loop { |
| 94 | + tokio::select! { |
| 95 | + Some(ev) = server_ev_rx.recv() => break ev, |
| 96 | + else => continue, |
| 97 | + } |
| 98 | + }; |
| 99 | + |
| 100 | + assert!(matches!(ev, ServerEvent::ConnectionError(e) if e.is_incomplete_message())); |
| 101 | + |
| 102 | + termination_token.cancel_and_wait().await; |
| 103 | +} |
0 commit comments