Skip to content

Commit f9e8a6a

Browse files
committed
fix: introduce termination token for all worker control hierarchy
1 parent f74431b commit f9e8a6a

File tree

7 files changed

+106
-56
lines changed

7 files changed

+106
-56
lines changed

crates/base/src/commands.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
rt_worker::worker_pool::WorkerPoolPolicy,
2+
rt_worker::{worker_ctx::TerminationToken, worker_pool::WorkerPoolPolicy},
33
server::{Server, ServerCodes, WorkerEntrypoints},
44
};
55
use anyhow::Error;
@@ -16,6 +16,7 @@ pub async fn start_server(
1616
no_module_cache: bool,
1717
callback_tx: Option<Sender<ServerCodes>>,
1818
entrypoints: WorkerEntrypoints,
19+
termination_token: Option<TerminationToken>,
1920
) -> Result<(), Error> {
2021
let mut server = Server::new(
2122
ip,
@@ -27,7 +28,9 @@ pub async fn start_server(
2728
no_module_cache,
2829
callback_tx,
2930
entrypoints,
31+
termination_token,
3032
)
3133
.await?;
34+
3235
server.listen().await
3336
}
Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
#[macro_export]
22
macro_rules! integration_test {
3-
($main_file:expr, $port:expr, $url:expr, $function: expr) => {
4-
let (tx, mut rx) = mpsc::channel::<ServerCodes>(1);
3+
($main_file:expr, $port:expr, $url:expr, ($($function:tt)+) $(, $termination_token: expr)?) => {
4+
let (tx, mut rx) = tokio::sync::mpsc::channel::<base::server::ServerCodes>(1);
55

66
let signal = tokio::spawn(async move {
7-
while let Some(ServerCodes::Listening) = rx.recv().await {
8-
let req = reqwest::get(format!("http://localhost:{}/{}", $port, $url)).await;
9-
return Some(req);
7+
while let Some(base::server::ServerCodes::Listening) = rx.recv().await {
8+
integration_test!(@req $port, $url, ($($function)+));
109
}
1110
None
1211
});
1312

14-
select! {
13+
tokio::select! {
1514
resp = signal => {
1615
if let Ok(maybe_response_from_server) = resp {
17-
$function(maybe_response_from_server.unwrap()).await;
16+
let resp = maybe_response_from_server.unwrap();
17+
integration_test!(@resp resp, ($($function)+)).await;
1818
} else {
1919
panic!("Request thread had a heart attack");
2020
}
2121
}
22-
_ = start_server(
22+
_ = base::commands::start_server(
2323
"0.0.0.0",
2424
$port,
2525
String::from($main_file),
@@ -31,10 +31,36 @@ macro_rules! integration_test {
3131
$crate::server::WorkerEntrypoints {
3232
main: None,
3333
events: None,
34-
}
34+
},
35+
integration_test!(@term $(, $termination_token)?)
3536
) => {
3637
panic!("This one should not end first");
3738
}
3839
}
3940
};
41+
42+
(@term , $termination_token:expr) => {
43+
Some($termination_token)
44+
};
45+
46+
(@term) => {
47+
None
48+
};
49+
50+
(@req $port:expr, $url:expr, ($req:expr, $_:expr)) => {
51+
return $req($port, $url).await;
52+
};
53+
54+
(@req $port:expr, $url:expr, $_:expr) => {
55+
let req = reqwest::get(format!("http://localhost:{}/{}", $port, $url)).await;
56+
return Some(req);
57+
};
58+
59+
(@resp $var:ident, ($_:expr, $resp:expr)) => {
60+
$resp($var)
61+
};
62+
63+
(@resp $var:ident, $resp:expr) => {
64+
$resp($var)
65+
};
4066
}

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,9 @@ impl From<(WorkerContextInitOpts, SupervisorPolicy)> for CreateWorkerArgs {
267267
}
268268
}
269269

270-
impl From<(WorkerContextInitOpts, TerminationToken)> for CreateWorkerArgs {
271-
fn from(val: (WorkerContextInitOpts, TerminationToken)) -> Self {
272-
CreateWorkerArgs(val.0, None, Some(val.1))
270+
impl<T: Into<Option<TerminationToken>>> From<(WorkerContextInitOpts, T)> for CreateWorkerArgs {
271+
fn from(val: (WorkerContextInitOpts, T)) -> Self {
272+
CreateWorkerArgs(val.0, None, val.1.into())
273273
}
274274
}
275275

@@ -417,6 +417,7 @@ pub async fn create_main_worker(
417417
no_module_cache: bool,
418418
user_worker_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
419419
maybe_entrypoint: Option<String>,
420+
termination_token: Option<TerminationToken>,
420421
) -> Result<mpsc::UnboundedSender<WorkerRequestMsg>, Error> {
421422
let mut service_path = main_worker_path.clone();
422423
let mut maybe_eszip = None;
@@ -427,20 +428,23 @@ pub async fn create_main_worker(
427428
}
428429
}
429430

430-
let main_worker_req_tx = create_worker(WorkerContextInitOpts {
431-
service_path,
432-
import_map_path,
433-
no_module_cache,
434-
events_rx: None,
435-
timing: None,
436-
maybe_eszip,
437-
maybe_entrypoint,
438-
maybe_module_code: None,
439-
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
440-
worker_pool_tx: user_worker_msgs_tx,
441-
}),
442-
env_vars: std::env::vars().collect(),
443-
})
431+
let main_worker_req_tx = create_worker((
432+
WorkerContextInitOpts {
433+
service_path,
434+
import_map_path,
435+
no_module_cache,
436+
events_rx: None,
437+
timing: None,
438+
maybe_eszip,
439+
maybe_entrypoint,
440+
maybe_module_code: None,
441+
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
442+
worker_pool_tx: user_worker_msgs_tx,
443+
}),
444+
env_vars: std::env::vars().collect(),
445+
},
446+
termination_token,
447+
))
444448
.await
445449
.map_err(|err| anyhow!("main worker boot error: {}", err))?;
446450

@@ -452,6 +456,7 @@ pub async fn create_events_worker(
452456
import_map_path: Option<String>,
453457
no_module_cache: bool,
454458
maybe_entrypoint: Option<String>,
459+
termination_token: Option<TerminationToken>,
455460
) -> Result<mpsc::UnboundedSender<WorkerEventWithMetadata>, Error> {
456461
let (events_tx, events_rx) = mpsc::unbounded_channel::<WorkerEventWithMetadata>();
457462

@@ -466,18 +471,21 @@ pub async fn create_events_worker(
466471
}
467472
}
468473

469-
let _ = create_worker(WorkerContextInitOpts {
470-
service_path,
471-
no_module_cache,
472-
import_map_path,
473-
env_vars: std::env::vars().collect(),
474-
events_rx: Some(events_rx),
475-
timing: None,
476-
maybe_eszip,
477-
maybe_entrypoint,
478-
maybe_module_code: None,
479-
conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}),
480-
})
474+
let _ = create_worker((
475+
WorkerContextInitOpts {
476+
service_path,
477+
no_module_cache,
478+
import_map_path,
479+
env_vars: std::env::vars().collect(),
480+
events_rx: Some(events_rx),
481+
timing: None,
482+
maybe_eszip,
483+
maybe_entrypoint,
484+
maybe_module_code: None,
485+
conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}),
486+
},
487+
termination_token,
488+
))
481489
.await
482490
.map_err(|err| anyhow!("events worker boot error: {}", err))?;
483491

crates/base/src/server.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::rt_worker::worker_ctx::{
2-
create_events_worker, create_main_worker, create_user_worker_pool,
2+
create_events_worker, create_main_worker, create_user_worker_pool, TerminationToken,
33
};
44
use crate::rt_worker::worker_pool::WorkerPoolPolicy;
55
use anyhow::Error;
@@ -159,6 +159,7 @@ pub struct Server {
159159
port: u16,
160160
main_worker_req_tx: mpsc::UnboundedSender<WorkerRequestMsg>,
161161
callback_tx: Option<Sender<ServerCodes>>,
162+
termination_token: TerminationToken,
162163
}
163164

164165
impl Server {
@@ -173,10 +174,12 @@ impl Server {
173174
no_module_cache: bool,
174175
callback_tx: Option<Sender<ServerCodes>>,
175176
entrypoints: WorkerEntrypoints,
177+
termination_token: Option<TerminationToken>,
176178
) -> Result<Self, Error> {
177179
let mut worker_events_sender: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>> = None;
178180
let maybe_events_entrypoint = entrypoints.events;
179181
let maybe_main_entrypoint = entrypoints.main;
182+
let termination_token = termination_token.unwrap_or_default();
180183

181184
// Create Event Worker
182185
if let Some(events_service_path) = maybe_events_service_path {
@@ -188,6 +191,7 @@ impl Server {
188191
import_map_path.clone(),
189192
no_module_cache,
190193
maybe_events_entrypoint,
194+
Some(termination_token.child_token()),
191195
)
192196
.await?;
193197

@@ -210,6 +214,7 @@ impl Server {
210214
no_module_cache,
211215
user_worker_msgs_tx,
212216
maybe_main_entrypoint,
217+
Some(termination_token.child_token()),
213218
)
214219
.await?;
215220

@@ -219,12 +224,19 @@ impl Server {
219224
port,
220225
main_worker_req_tx,
221226
callback_tx,
227+
termination_token,
222228
})
223229
}
224230

231+
pub async fn terminate(&self) {
232+
self.termination_token.cancel_and_wait().await;
233+
}
234+
225235
pub async fn listen(&mut self) -> Result<(), Error> {
226236
let addr = SocketAddr::new(IpAddr::V4(self.ip), self.port);
227237
let listener = TcpListener::bind(&addr).await?;
238+
let termination_token = self.termination_token.clone();
239+
228240
debug!("edge-runtime is listening on {:?}", listener.local_addr()?);
229241

230242
if let Some(callback) = self.callback_tx.clone() {
@@ -260,6 +272,12 @@ impl Server {
260272
Err(e) => error!("socket error: {}", e)
261273
}
262274
}
275+
276+
_ = termination_token.outbound.cancelled() => {
277+
info!("termination token resolved");
278+
break;
279+
}
280+
263281
// wait for shutdown signal...
264282
_ = tokio::signal::ctrl_c() => {
265283
info!("shutdown signal received");
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
11
extern crate core;
22

3-
use base::commands::start_server;
43
use base::integration_test;
5-
use base::server::ServerCodes;
6-
use tokio::select;
7-
use tokio::sync::mpsc;
84

95
#[tokio::test]
106
async fn test_custom_readable_stream_response() {
117
integration_test!(
128
"./test_cases/main",
139
8999,
1410
"readable-stream-resp",
15-
|resp: Result<reqwest::Response, reqwest::Error>| async {
11+
(|resp: Result<reqwest::Response, reqwest::Error>| async {
1612
assert_eq!(
1713
resp.unwrap().text().await.unwrap(),
1814
"Hello world from streams"
1915
);
20-
}
16+
})
2117
);
2218
}
Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
1-
use base::commands::start_server;
21
use base::integration_test;
3-
use base::server::ServerCodes;
4-
use tokio::select;
5-
use tokio::sync::mpsc;
62

73
#[cfg(target_os = "linux")]
84
#[tokio::test]
95
async fn test_not_trigger_pku_sigsegv_due_to_jit_compilation_cli() {
10-
integration_test!("./test_cases/main", 8999, "slow_resp", |resp: Result<
11-
reqwest::Response,
12-
reqwest::Error,
13-
>| async {
14-
assert!(resp.unwrap().text().await.unwrap().starts_with("meow: "));
15-
});
6+
integration_test!(
7+
"./test_cases/main",
8+
8999,
9+
"slow_resp",
10+
(|resp: Result<reqwest::Response, reqwest::Error>| async {
11+
assert!(resp.unwrap().text().await.unwrap().starts_with("meow: "));
12+
})
13+
);
1614
}

crates/cli/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ fn main() -> Result<(), anyhow::Error> {
172172
main: maybe_main_entrypoint,
173173
events: maybe_events_entrypoint,
174174
},
175+
None,
175176
)
176177
.await?;
177178
}

0 commit comments

Comments
 (0)