Skip to content

Commit 2fbf0e4

Browse files
committed
stamp: revive the request failure tests
1 parent bb04850 commit 2fbf0e4

File tree

1 file changed

+198
-181
lines changed

1 file changed

+198
-181
lines changed
Lines changed: 198 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,181 +1,198 @@
1-
// /*use std::time::Duration;
2-
//
3-
// use anyhow::Context;
4-
// use base::rt_worker::worker_pool::{SupervisorPolicy, WorkerPoolPolicy};
5-
// use base::{integration_test, rt_worker::worker_ctx::TerminationToken, server::ServerEvent};
6-
// use http::{Request, StatusCode};
7-
// use hyper::{body::to_bytes, Body};
8-
// use serial_test::serial;
9-
// use tokio::{join, sync::mpsc};
10-
//
11-
// use crate::integration_test_helper::TestBedBuilder;
12-
//
13-
// #[path = "../src/utils/integration_test_helper.rs"]
14-
// mod integration_test_helper;
15-
//
16-
// #[tokio::test]
17-
// #[serial]
18-
// async fn req_failure_case_timeout() {
19-
// let policy = WorkerPoolPolicy::new(SupervisorPolicy::oneshot(), 1, 10);
20-
// let port = 7998;
21-
// integration_test!(
22-
// "./test_cases/main",
23-
// port,
24-
// "slow_resp",
25-
// Some(policy),
26-
// None,
27-
// None::<reqwest::RequestBuilder>,
28-
// (|resp: Result<reqwest::Response, reqwest::Error>| async {
29-
// let mut found_timeout = false;
30-
// let res = resp.unwrap();
31-
// let status_500 = res.status().as_u16() == 500;
32-
// let body_bytes = res.bytes().await.unwrap();
33-
//
34-
// let valid_output =
35-
// body_bytes == "{\"msg\":\"InvalidWorkerCreation: worker did not respond in time\"}";
36-
//
37-
// found_timeout = status_500 && valid_output;
38-
// assert!(found_timeout);
39-
// })
40-
// );
41-
// }
42-
//
43-
// #[tokio::test]
44-
// #[serial]
45-
// async fn req_failure_case_cpu_time_exhausted() {
46-
// let policy = WorkerPoolPolicy::new(SupervisorPolicy::oneshot(), 1, 100000);
47-
// let port = 7998;
48-
// integration_test!(
49-
// "./test_cases/main_small_cpu_time",
50-
// port,
51-
// "slow_resp",
52-
// Some(policy),
53-
// None,
54-
// None::<reqwest::RequestBuilder>,
55-
// (|resp: Result<reqwest::Response, reqwest::Error>| async {
56-
// let res = resp.unwrap();
57-
// let body_bytes = res.bytes().await.unwrap();
58-
// assert_eq!(
59-
// body_bytes,
60-
// "{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
61-
// );
62-
// })
63-
// );
64-
// }
65-
//
66-
// #[tokio::test]
67-
// #[serial]
68-
// async fn req_failure_case_wall_clock_reached() {
69-
// let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock")
70-
// .with_oneshot_policy(100000)
71-
// .build()
72-
// .await;
73-
//
74-
// let mut res = tb
75-
// .request(|| {
76-
// Request::builder()
77-
// .uri("/slow_resp")
78-
// .method("GET")
79-
// .body(Body::empty())
80-
// .context("can't make request")
81-
// })
82-
// .await
83-
// .unwrap();
84-
//
85-
// let buf = to_bytes(res.body_mut()).await.unwrap();
86-
//
87-
// assert_eq!(
88-
// buf,
89-
// "{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
90-
// );
91-
//
92-
// tb.exit().await;
93-
// }
94-
//
95-
// #[tokio::test]
96-
// #[serial]
97-
// async fn req_failure_case_wall_clock_reached_less_than_100ms() {
98-
// let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock_less_than_100ms")
99-
// .with_oneshot_policy(100000)
100-
// .build()
101-
// .await;
102-
//
103-
// let mut res = tb
104-
// .request(|| {
105-
// Request::builder()
106-
// .uri("/slow_resp")
107-
// .method("GET")
108-
// .body(Body::empty())
109-
// .context("can't make request")
110-
// })
111-
// .await
112-
// .unwrap();
113-
//
114-
// let buf = to_bytes(res.body_mut()).await.unwrap();
115-
//
116-
// assert!(
117-
// buf == "{\"msg\":\"InvalidWorkerResponse: user worker failed to respond\"}"
118-
// || buf == "{\"msg\":\"InvalidWorkerCreation: worker did not respond in time\"}"
119-
// || buf
120-
// == "{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
121-
// );
122-
//
123-
// tb.exit().await;
124-
// }
125-
//
126-
// #[tokio::test]
127-
// #[serial]
128-
// async fn req_failure_case_intentional_peer_reset() {
129-
// let termination_token = TerminationToken::new();
130-
// let (server_ev_tx, mut server_ev_rx) = mpsc::unbounded_channel();
131-
//
132-
// integration_test!(
133-
// "./test_cases/main",
134-
// 8999,
135-
// "slow_resp",
136-
// None,
137-
// None,
138-
// None::<reqwest::RequestBuilder>,
139-
// (
140-
// |port: usize,
141-
// url: &'static str,
142-
// req_builder: Option<reqwest::RequestBuilder>,
143-
// mut ev: mpsc::UnboundedReceiver<ServerEvent>| async move {
144-
// tokio::spawn(async move {
145-
// loop {
146-
// tokio::select! {
147-
// Some(ev) = ev.recv() => {
148-
// let _ = server_ev_tx.send(ev);
149-
// break;
150-
// }
151-
//
152-
// else => continue
153-
// }
154-
// }
155-
// });
156-
//
157-
// Some(
158-
// reqwest::Client::new()
159-
// .get(format!("http://localhost:{}/{}", port, url))
160-
// .timeout(Duration::from_millis(100))
161-
// .send()
162-
// .await,
163-
// )
164-
// },
165-
// |_resp: Result<reqwest::Response, reqwest::Error>| async {}
166-
// ),
167-
// termination_token.clone()
168-
// );
169-
//
170-
// let ev = loop {
171-
// tokio::select! {
172-
// Some(ev) = server_ev_rx.recv() => break ev,
173-
// else => continue,
174-
// }
175-
// };
176-
//
177-
// assert!(matches!(ev, ServerEvent::ConnectionError(e) if e.is_incomplete_message()));
178-
//
179-
// termination_token.cancel_and_wait().await;
180-
// }
181-
// */
1+
use std::time::Duration;
2+
3+
use anyhow::Context;
4+
use base::{integration_test, rt_worker::worker_ctx::TerminationToken, server::ServerEvent};
5+
use http::{Request, StatusCode};
6+
use hyper::{body::to_bytes, Body};
7+
use serial_test::serial;
8+
use tokio::{join, sync::mpsc};
9+
10+
use crate::integration_test_helper::TestBedBuilder;
11+
12+
#[path = "../src/utils/integration_test_helper.rs"]
13+
mod integration_test_helper;
14+
15+
#[tokio::test]
16+
#[serial]
17+
async fn req_failure_case_timeout() {
18+
let tb = TestBedBuilder::new("./test_cases/main")
19+
// NOTE: It should be small enough that the worker pool rejects the
20+
// request.
21+
.with_oneshot_policy(10)
22+
.build()
23+
.await;
24+
25+
let req_body_fn = || {
26+
Request::builder()
27+
.uri("/slow_resp")
28+
.method("GET")
29+
.body(Body::empty())
30+
.context("can't make request")
31+
};
32+
33+
let (res1, res2) = join!(tb.request(req_body_fn), tb.request(req_body_fn));
34+
35+
let res_iter = vec![res1, res2].into_iter();
36+
let mut found_timeout = false;
37+
38+
for res in res_iter {
39+
let mut res = res.unwrap();
40+
41+
if !found_timeout {
42+
let buf = to_bytes(res.body_mut()).await.unwrap();
43+
let status_500 = res.status() == StatusCode::INTERNAL_SERVER_ERROR;
44+
let valid_output =
45+
buf == "{\"msg\":\"InvalidWorkerCreation: worker did not respond in time\"}";
46+
47+
found_timeout = status_500 && valid_output;
48+
}
49+
}
50+
51+
tb.exit().await;
52+
assert!(found_timeout);
53+
}
54+
55+
#[tokio::test]
56+
#[serial]
57+
async fn req_failure_case_cpu_time_exhausted() {
58+
let tb = TestBedBuilder::new("./test_cases/main_small_cpu_time")
59+
.with_oneshot_policy(100000)
60+
.build()
61+
.await;
62+
63+
let mut res = tb
64+
.request(|| {
65+
Request::builder()
66+
.uri("/slow_resp")
67+
.method("GET")
68+
.body(Body::empty())
69+
.context("can't make request")
70+
})
71+
.await
72+
.unwrap();
73+
74+
let buf = to_bytes(res.body_mut()).await.unwrap();
75+
76+
assert_eq!(
77+
buf,
78+
"{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
79+
);
80+
81+
tb.exit().await;
82+
}
83+
84+
#[tokio::test]
85+
#[serial]
86+
async fn req_failure_case_wall_clock_reached() {
87+
let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock")
88+
.with_oneshot_policy(100000)
89+
.build()
90+
.await;
91+
92+
let mut res = tb
93+
.request(|| {
94+
Request::builder()
95+
.uri("/slow_resp")
96+
.method("GET")
97+
.body(Body::empty())
98+
.context("can't make request")
99+
})
100+
.await
101+
.unwrap();
102+
103+
let buf = to_bytes(res.body_mut()).await.unwrap();
104+
105+
assert_eq!(
106+
buf,
107+
"{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
108+
);
109+
110+
tb.exit().await;
111+
}
112+
113+
#[tokio::test]
114+
#[serial]
115+
async fn req_failure_case_wall_clock_reached_less_than_100ms() {
116+
let tb = TestBedBuilder::new("./test_cases/main_small_wall_clock_less_than_100ms")
117+
.with_oneshot_policy(100000)
118+
.build()
119+
.await;
120+
121+
let mut res = tb
122+
.request(|| {
123+
Request::builder()
124+
.uri("/slow_resp")
125+
.method("GET")
126+
.body(Body::empty())
127+
.context("can't make request")
128+
})
129+
.await
130+
.unwrap();
131+
132+
let buf = to_bytes(res.body_mut()).await.unwrap();
133+
134+
assert!(
135+
buf == "{\"msg\":\"InvalidWorkerResponse: user worker failed to respond\"}"
136+
|| buf == "{\"msg\":\"InvalidWorkerCreation: worker did not respond in time\"}"
137+
|| buf
138+
== "{\"msg\":\"WorkerRequestCancelled: request has been cancelled by supervisor\"}"
139+
);
140+
141+
tb.exit().await;
142+
}
143+
144+
#[tokio::test]
145+
#[serial]
146+
async fn req_failure_case_intentional_peer_reset() {
147+
let termination_token = TerminationToken::new();
148+
let (server_ev_tx, mut server_ev_rx) = mpsc::unbounded_channel();
149+
150+
integration_test!(
151+
"./test_cases/main",
152+
8999,
153+
"slow_resp",
154+
None,
155+
None,
156+
None::<reqwest::RequestBuilder>,
157+
(
158+
|port: usize,
159+
url: &'static str,
160+
_req_builder: Option<reqwest::RequestBuilder>,
161+
mut ev: mpsc::UnboundedReceiver<ServerEvent>| async move {
162+
tokio::spawn(async move {
163+
loop {
164+
tokio::select! {
165+
Some(ev) = ev.recv() => {
166+
let _ = server_ev_tx.send(ev);
167+
break;
168+
}
169+
170+
else => continue
171+
}
172+
}
173+
});
174+
175+
Some(
176+
reqwest::Client::new()
177+
.get(format!("http://localhost:{}/{}", port, url))
178+
.timeout(Duration::from_millis(100))
179+
.send()
180+
.await,
181+
)
182+
},
183+
|_resp: Result<reqwest::Response, reqwest::Error>| async {}
184+
),
185+
termination_token.clone()
186+
);
187+
188+
let ev = loop {
189+
tokio::select! {
190+
Some(ev) = server_ev_rx.recv() => break ev,
191+
else => continue,
192+
}
193+
};
194+
195+
assert!(matches!(ev, ServerEvent::ConnectionError(e) if e.is_incomplete_message()));
196+
197+
termination_token.cancel_and_wait().await;
198+
}

0 commit comments

Comments
 (0)