Skip to content

Commit f74431b

Browse files
committed
fix: add request timeout test
1 parent 758648a commit f74431b

File tree

2 files changed

+245
-43
lines changed

2 files changed

+245
-43
lines changed

crates/base/src/utils/integration_test_helper.rs

Lines changed: 197 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,29 @@
11
#![allow(dead_code)]
22

33
use std::{
4+
collections::HashMap,
45
marker::PhantomPinned,
6+
path::PathBuf,
57
sync::Arc,
68
task::{ready, Poll},
79
};
810

9-
use anyhow::Error;
11+
use anyhow::{bail, Context, Error};
1012
use base::rt_worker::{
11-
worker_ctx::{create_worker, CreateWorkerArgs, TerminationToken},
13+
worker_ctx::{create_user_worker_pool, create_worker, CreateWorkerArgs, TerminationToken},
1214
worker_pool::{SupervisorPolicy, WorkerPoolPolicy},
1315
};
1416
use futures_util::{future::BoxFuture, Future, FutureExt};
17+
use http::{Request, Response};
18+
use hyper::Body;
1519
use pin_project::pin_project;
1620
use sb_core::conn_sync::ConnSync;
17-
use sb_workers::context::{Timing, UserWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRequestMsg};
21+
use sb_workers::context::{
22+
MainWorkerRuntimeOpts, Timing, UserWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRequestMsg,
23+
WorkerRuntimeOpts,
24+
};
1825
use scopeguard::ScopeGuard;
19-
use tokio::sync::{mpsc, watch, Notify};
20-
21-
pub fn create_conn_watch() -> (
22-
ScopeGuard<watch::Sender<ConnSync>, impl FnOnce(watch::Sender<ConnSync>)>,
23-
watch::Receiver<ConnSync>,
24-
) {
25-
let (conn_watch_tx, conn_watch_rx) = watch::channel(ConnSync::Want);
26-
let conn_watch_tx = scopeguard::guard(conn_watch_tx, |tx| tx.send(ConnSync::Recv).unwrap());
27-
28-
(conn_watch_tx, conn_watch_rx)
29-
}
26+
use tokio::sync::{mpsc, oneshot, watch, Notify};
3027

3128
pub struct CreateTestUserWorkerArgs(WorkerContextInitOpts, Option<SupervisorPolicy>);
3229

@@ -42,36 +39,14 @@ impl From<(WorkerContextInitOpts, SupervisorPolicy)> for CreateTestUserWorkerArg
4239
}
4340
}
4441

45-
pub async fn create_test_user_worker<Opt: Into<CreateTestUserWorkerArgs>>(
46-
opts: Opt,
47-
) -> Result<(mpsc::UnboundedSender<WorkerRequestMsg>, RequestScope), Error> {
48-
let CreateTestUserWorkerArgs(mut opts, maybe_policy) = opts.into();
49-
let (req_start_tx, req_start_rx) = mpsc::unbounded_channel();
50-
let (req_end_tx, req_end_rx) = mpsc::unbounded_channel();
51-
let (conn_tx, conn_rx) = watch::channel(ConnSync::Want);
52-
53-
let policy = maybe_policy.unwrap_or_else(SupervisorPolicy::oneshot);
54-
let termination_token = TerminationToken::new();
55-
56-
opts.timing = Some(Timing {
57-
req: (req_start_rx, req_end_rx),
58-
..Default::default()
59-
});
42+
pub fn create_conn_watch() -> (
43+
ScopeGuard<watch::Sender<ConnSync>, impl FnOnce(watch::Sender<ConnSync>)>,
44+
watch::Receiver<ConnSync>,
45+
) {
46+
let (conn_watch_tx, conn_watch_rx) = watch::channel(ConnSync::Want);
47+
let conn_watch_tx = scopeguard::guard(conn_watch_tx, |tx| tx.send(ConnSync::Recv).unwrap());
6048

61-
Ok((
62-
create_worker(
63-
opts.with_policy(policy)
64-
.with_termination_token(termination_token.clone()),
65-
)
66-
.await?,
67-
RequestScope {
68-
policy,
69-
req_start_tx,
70-
req_end_tx,
71-
termination_token,
72-
conn: (Some(conn_tx), conn_rx),
73-
},
74-
))
49+
(conn_watch_tx, conn_watch_rx)
7550
}
7651

7752
#[derive(Debug)]
@@ -155,6 +130,185 @@ impl WorkerContextInitOptsForTesting for WorkerContextInitOpts {
155130
}
156131
}
157132

133+
pub struct TestBedBuilder {
134+
main_service_path: PathBuf,
135+
worker_pool_policy: Option<WorkerPoolPolicy>,
136+
main_worker_init_opts: Option<WorkerContextInitOpts>,
137+
}
138+
139+
impl TestBedBuilder {
140+
pub fn new<T>(main_service_path: T) -> Self
141+
where
142+
T: Into<PathBuf>,
143+
{
144+
Self {
145+
main_service_path: main_service_path.into(),
146+
worker_pool_policy: None,
147+
main_worker_init_opts: None,
148+
}
149+
}
150+
151+
pub fn with_worker_pool_policy(mut self, worker_pool_policy: WorkerPoolPolicy) -> Self {
152+
self.worker_pool_policy = Some(worker_pool_policy);
153+
self
154+
}
155+
156+
pub fn with_oneshot_policy(mut self, request_wait_timeout_ms: u64) -> Self {
157+
self.worker_pool_policy = Some(WorkerPoolPolicy::new(
158+
SupervisorPolicy::oneshot(),
159+
1,
160+
Some(request_wait_timeout_ms),
161+
));
162+
163+
self
164+
}
165+
166+
pub fn with_per_worker_policy(mut self, request_wait_timeout_ms: u64) -> Self {
167+
self.worker_pool_policy = Some(WorkerPoolPolicy::new(
168+
SupervisorPolicy::PerWorker,
169+
1,
170+
Some(request_wait_timeout_ms),
171+
));
172+
173+
self
174+
}
175+
176+
pub fn with_per_request_policy(mut self, request_wait_timeout_ms: u64) -> Self {
177+
self.worker_pool_policy = Some(WorkerPoolPolicy::new(
178+
SupervisorPolicy::PerRequest { oneshot: false },
179+
1,
180+
Some(request_wait_timeout_ms),
181+
));
182+
183+
self
184+
}
185+
186+
pub fn with_main_worker_init_opts(
187+
mut self,
188+
main_worker_init_opts: WorkerContextInitOpts,
189+
) -> Self {
190+
self.main_worker_init_opts = Some(main_worker_init_opts);
191+
self
192+
}
193+
194+
pub async fn build(self) -> TestBed {
195+
let (worker_pool_msg_tx, pool_termination_token) = {
196+
let token = TerminationToken::new();
197+
(
198+
create_user_worker_pool(
199+
self.worker_pool_policy
200+
.unwrap_or_else(test_user_worker_pool_policy),
201+
None,
202+
Some(token.clone()),
203+
)
204+
.await
205+
.unwrap(),
206+
token,
207+
)
208+
};
209+
210+
let main_worker_init_opts = WorkerContextInitOpts {
211+
service_path: self.main_service_path,
212+
no_module_cache: false,
213+
import_map_path: None,
214+
env_vars: HashMap::new(),
215+
events_rx: None,
216+
timing: None,
217+
maybe_eszip: None,
218+
maybe_entrypoint: None,
219+
maybe_module_code: None,
220+
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
221+
worker_pool_tx: worker_pool_msg_tx,
222+
}),
223+
};
224+
225+
let main_termination_token = TerminationToken::new();
226+
let main_worker_msg_tx =
227+
create_worker((main_worker_init_opts, main_termination_token.clone()))
228+
.await
229+
.unwrap();
230+
231+
TestBed {
232+
pool_termination_token,
233+
main_termination_token,
234+
main_worker_msg_tx,
235+
}
236+
}
237+
}
238+
239+
pub struct TestBed {
240+
pool_termination_token: TerminationToken,
241+
main_termination_token: TerminationToken,
242+
main_worker_msg_tx: mpsc::UnboundedSender<WorkerRequestMsg>,
243+
}
244+
245+
impl TestBed {
246+
pub async fn request<F>(
247+
&self,
248+
request_factory_fn: F,
249+
) -> Result<ScopeGuard<Response<Body>, impl FnOnce(Response<Body>)>, Error>
250+
where
251+
F: FnOnce() -> Result<Request<Body>, Error>,
252+
{
253+
let (conn_tx, conn_rx) = create_conn_watch();
254+
let (res_tx, res_rx) = oneshot::channel();
255+
256+
let req: Request<Body> = request_factory_fn()?;
257+
258+
let _ = self.main_worker_msg_tx.send(WorkerRequestMsg {
259+
req,
260+
res_tx,
261+
conn_watch: Some(conn_rx),
262+
});
263+
264+
let Ok(res) = res_rx.await else {
265+
bail!("can't send request to the main worker");
266+
};
267+
268+
Ok(scopeguard::guard(
269+
res.context("request failure")?,
270+
move |_| drop(conn_tx),
271+
))
272+
}
273+
274+
pub async fn exit(self) {
275+
self.pool_termination_token.cancel_and_wait().await;
276+
self.main_termination_token.cancel_and_wait().await;
277+
}
278+
}
279+
280+
pub async fn create_test_user_worker<Opt: Into<CreateTestUserWorkerArgs>>(
281+
opts: Opt,
282+
) -> Result<(mpsc::UnboundedSender<WorkerRequestMsg>, RequestScope), Error> {
283+
let CreateTestUserWorkerArgs(mut opts, maybe_policy) = opts.into();
284+
let (req_start_tx, req_start_rx) = mpsc::unbounded_channel();
285+
let (req_end_tx, req_end_rx) = mpsc::unbounded_channel();
286+
let (conn_tx, conn_rx) = watch::channel(ConnSync::Want);
287+
288+
let policy = maybe_policy.unwrap_or_else(SupervisorPolicy::oneshot);
289+
let termination_token = TerminationToken::new();
290+
291+
opts.timing = Some(Timing {
292+
req: (req_start_rx, req_end_rx),
293+
..Default::default()
294+
});
295+
296+
Ok((
297+
create_worker(
298+
opts.with_policy(policy)
299+
.with_termination_token(termination_token.clone()),
300+
)
301+
.await?,
302+
RequestScope {
303+
policy,
304+
req_start_tx,
305+
req_end_tx,
306+
termination_token,
307+
conn: (Some(conn_tx), conn_rx),
308+
},
309+
))
310+
}
311+
158312
pub fn test_user_worker_pool_policy() -> WorkerPoolPolicy {
159313
WorkerPoolPolicy::new(SupervisorPolicy::oneshot(), 1, 4 * 1000 * 3600)
160314
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use anyhow::Context;
2+
use http::{Request, StatusCode};
3+
use hyper::{body::to_bytes, Body};
4+
use tokio::join;
5+
6+
use crate::integration_test_helper::TestBedBuilder;
7+
8+
#[path = "../src/utils/integration_test_helper.rs"]
9+
mod integration_test_helper;
10+
11+
#[tokio::test]
12+
async fn req_failure_case_timeout() {
13+
let tb = TestBedBuilder::new("./test_cases/main")
14+
// NOTE: It should be small enough that the worker pool rejects the
15+
// request.
16+
.with_oneshot_policy(10)
17+
.build()
18+
.await;
19+
20+
let req_body_fn = || {
21+
Request::builder()
22+
.uri("/slow_resp")
23+
.method("GET")
24+
.body(Body::empty())
25+
.context("can't make request")
26+
};
27+
28+
let (res1, res2) = join!(tb.request(req_body_fn), tb.request(req_body_fn));
29+
30+
let res_iter = vec![res1, res2].into_iter();
31+
let mut found_timeout = false;
32+
33+
for res in res_iter {
34+
let mut res = res.unwrap();
35+
36+
if !found_timeout {
37+
let buf = to_bytes(res.body_mut()).await.unwrap();
38+
let status_500 = res.status() == StatusCode::INTERNAL_SERVER_ERROR;
39+
let valid_output =
40+
buf == "{\"msg\":\"InvalidWorkerCreation: worker did not respond in time\"}";
41+
42+
found_timeout = status_500 && valid_output;
43+
}
44+
}
45+
46+
tb.exit().await;
47+
assert!(found_timeout);
48+
}

0 commit comments

Comments
 (0)