Skip to content

Commit 4699815

Browse files
committed
gracefully handle sigint
1 parent 0bbdd27 commit 4699815

File tree

4 files changed

+48
-17
lines changed

4 files changed

+48
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pulsebeam/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mimalloc = { version = "0.1.48", features = ["v3"] }
3333
hyper = "1.7.0"
3434
pulsebeam-runtime = { version = "^0.1.0", path = "../pulsebeam-runtime" }
3535
thread-priority = "3.0.0"
36+
tokio-util = "0.7.16"
3637

3738
[dev-dependencies]
3839
turmoil = "0.6.6"

pulsebeam/src/main.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use mimalloc::MiMalloc;
2+
use tokio_util::sync::CancellationToken;
23

34
#[global_allocator]
45
static GLOBAL: MiMalloc = MiMalloc;
@@ -55,15 +56,17 @@ fn main() {
5556
.build()
5657
.unwrap();
5758

59+
let shutdown = CancellationToken::new();
5860
let io_rt = tokio::runtime::Builder::new_current_thread()
5961
.enable_all()
6062
.thread_name("io")
6163
.build()
6264
.unwrap();
63-
io_rt.block_on(run(cpu_rt));
65+
io_rt.block_on(run(shutdown.clone(), &cpu_rt));
66+
shutdown.cancel();
6467
}
6568

66-
pub async fn run(cpu_rt: rt::Runtime) {
69+
pub async fn run(shutdown: CancellationToken, cpu_rt: &rt::Runtime) {
6770
let external_ip = select_host_address();
6871
let external_addr: SocketAddr = format!("{}:3478", external_ip).parse().unwrap();
6972
let local_addr: SocketAddr = "0.0.0.0:3478".parse().unwrap();
@@ -73,17 +76,18 @@ pub async fn run(cpu_rt: rt::Runtime) {
7376
.expect("bind to udp socket");
7477
let http_addr: SocketAddr = "0.0.0.0:3000".parse().unwrap();
7578
tracing::info!(
76-
"✅ Signaling server listening. Clients should connect to http://{}:3000 or http://localhost:3000",
79+
"server listening at http://{}:3000 or http://localhost:3000",
7780
external_ip,
7881
);
7982

8083
// Run the main logic and signal handler concurrently
8184
tokio::select! {
82-
Err(err) = node::run(cpu_rt, external_addr, unified_socket, http_addr) => {
83-
tracing::warn!("node exited with an error: {err}");
85+
Err(err) = node::run(shutdown.clone(), cpu_rt, external_addr, unified_socket, http_addr) => {
86+
tracing::warn!("node exited with error: {err}");
8487
}
8588
_ = tokio::signal::ctrl_c() => {
86-
tracing::info!("received SIGINT, shutting down gracefully");
89+
tracing::info!("received SIGINT, shutting down gracefully...");
90+
shutdown.cancel();
8791
}
8892
}
8993
}

pulsebeam/src/node.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,30 @@ use crate::{api, controller, system};
22
use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
33
use pulsebeam_runtime::{actor, net, rt};
44
use std::{net::SocketAddr, sync::Arc, time::Duration};
5+
use tokio_util::sync::CancellationToken;
56
use tower_http::cors::{AllowOrigin, CorsLayer};
67

78
pub async fn run(
8-
cpu_rt: rt::Runtime,
9+
shutdown: CancellationToken,
10+
cpu_rt: &rt::Runtime,
911
external_addr: SocketAddr,
1012
unified_socket: net::UnifiedSocket<'static>,
1113
http_addr: SocketAddr,
1214
) -> anyhow::Result<()> {
13-
// Configure CORS
1415
let cors = CorsLayer::very_permissive()
1516
.allow_origin(AllowOrigin::mirror_request())
1617
.expose_headers([hyper::header::LOCATION])
1718
.max_age(Duration::from_secs(86400));
1819

19-
// Spawn system and controller actors
2020
let mut join_set = FuturesUnordered::new();
2121

2222
let (system_ctx, system_join) = system::SystemContext::spawn(unified_socket);
2323
join_set.push(system_join.map(|_| ()).boxed());
2424

2525
let (controller_ready_tx, controller_ready_rx) = tokio::sync::oneshot::channel();
2626

27-
// TODO: handle cleanup
27+
// Spawn controller on CPU runtime
28+
let shutdown_for_controller = shutdown.clone();
2829
cpu_rt.spawn(async move {
2930
let controller_actor = controller::ControllerActor::new(
3031
system_ctx,
@@ -33,28 +34,52 @@ pub async fn run(
3334
);
3435
let (controller_handle, controller_join) =
3536
actor::spawn(controller_actor, actor::RunnerConfig::default());
36-
controller_ready_tx.send(controller_handle).unwrap();
37+
let _ = controller_ready_tx.send(controller_handle);
3738
tracing::debug!("controller is ready");
38-
controller_join.await;
39+
40+
tokio::select! {
41+
_ = controller_join => {}
42+
_ = shutdown_for_controller.cancelled() => {
43+
tracing::debug!("controller received shutdown");
44+
}
45+
}
3946
});
4047

4148
tracing::debug!("waiting on controller to be ready");
4249
let controller_handle = controller_ready_rx.await?;
43-
// Set up signaling router
50+
51+
// HTTP API
4452
let api_cfg = api::ApiConfig {
4553
base_path: "/api/v1".to_string(),
4654
default_host: http_addr.to_string(),
4755
};
4856
let router = api::router(controller_handle, api_cfg).layer(cors);
49-
tracing::debug!("listening on {http_addr}");
5057

58+
let shutdown_for_http = shutdown.clone();
5159
let signaling = async move {
5260
let listener = tokio::net::TcpListener::bind(http_addr).await.unwrap();
53-
axum::serve(listener, router).await.unwrap();
61+
tracing::debug!("listening on {http_addr}");
62+
tokio::select! {
63+
res = axum::serve(listener, router) => {
64+
if let Err(e) = res {
65+
tracing::error!("http server error: {e}");
66+
}
67+
}
68+
_ = shutdown_for_http.cancelled() => {
69+
tracing::info!("http server shutting down");
70+
}
71+
}
5472
};
73+
5574
join_set.push(tokio::spawn(signaling).map(|_| ()).boxed());
5675

57-
// Wait for all tasks to complete
58-
while join_set.next().await.is_some() {}
76+
// Wait for all tasks to complete OR shutdown
77+
tokio::select! {
78+
_ = async { while join_set.next().await.is_some() {} } => {}
79+
_ = shutdown.cancelled() => {
80+
tracing::info!("node received shutdown");
81+
}
82+
}
83+
5984
Ok(())
6085
}

0 commit comments

Comments
 (0)