Skip to content

Commit 6ff72bb

Browse files
committed
fixes
1 parent d0c4764 commit 6ff72bb

File tree

4 files changed

+261
-199
lines changed

4 files changed

+261
-199
lines changed

crates/theater-handler-http-framework/src/lib.rs

Lines changed: 133 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,153 +1253,156 @@ impl Handler for HttpFrameworkHandler {
12531253
Box::pin(async move {
12541254
info!("HTTP Framework started, monitoring for shutdown signal");
12551255

1256-
tokio::spawn(async move {
1257-
debug!("HTTP Framework shutdown monitor started");
1258-
1259-
// Wait for shutdown signal
1260-
shutdown_receiver.wait_for_shutdown().await;
1261-
info!("HTTP Framework received shutdown signal");
1262-
1263-
// First stop all the servers
1264-
let server_count = {
1265-
let servers = servers_ref.read().await;
1266-
let count = servers.len();
1267-
debug!("HTTP Framework shutting down {} servers", count);
1268-
count
1269-
};
1270-
1271-
if server_count > 0 {
1272-
// Create a vector to hold futures
1273-
let mut shutdown_tasks = Vec::new();
1274-
1275-
// Stop each server in parallel with individual timeouts
1276-
for server_id in servers_ref
1277-
.read()
1278-
.await
1279-
.keys()
1280-
.cloned()
1281-
.collect::<Vec<u64>>()
1282-
{
1283-
let servers_clone = servers_ref.clone();
1284-
let task = tokio::spawn(async move {
1285-
let start_time = std::time::Instant::now();
1286-
debug!("Starting shutdown of HTTP server {}", server_id);
1287-
1288-
let result = {
1289-
let mut servers = servers_clone.write().await;
1290-
if let Some(server) = servers.get_mut(&server_id) {
1291-
server.stop().await
1292-
} else {
1293-
debug!("Server {} not found during shutdown", server_id);
1294-
Ok(())
1295-
}
1296-
};
1297-
1298-
match result {
1299-
Ok(_) => debug!(
1300-
"Successfully stopped HTTP server {} in {:?}",
1301-
server_id,
1302-
start_time.elapsed()
1303-
),
1304-
Err(ref e) => {
1305-
warn!("Error stopping HTTP server {}: {}", server_id, e)
1306-
}
1307-
}
1308-
(server_id, result)
1309-
});
1310-
shutdown_tasks.push(task);
1311-
}
1312-
1313-
// Wait for all servers to be stopped with a global timeout
1314-
debug!("Waiting for all servers to stop (timeout: 10s)");
1315-
match tokio::time::timeout(
1316-
std::time::Duration::from_secs(10),
1317-
futures::future::join_all(shutdown_tasks),
1318-
)
1256+
// Wait for shutdown signal - no longer spawned in a detached task
1257+
debug!("HTTP Framework shutdown monitor started");
1258+
let signal = shutdown_receiver.wait_for_shutdown().await;
1259+
info!("HTTP Framework received shutdown signal");
1260+
1261+
// First stop all the servers
1262+
let server_count = {
1263+
let servers = servers_ref.read().await;
1264+
let count = servers.len();
1265+
debug!("HTTP Framework shutting down {} servers", count);
1266+
count
1267+
};
1268+
1269+
if server_count > 0 {
1270+
// Create a vector to hold futures
1271+
let mut shutdown_tasks = Vec::new();
1272+
1273+
// Stop each server in parallel with individual timeouts
1274+
for server_id in servers_ref
1275+
.read()
13191276
.await
1320-
{
1321-
Ok(results) => {
1322-
let success_count = results
1323-
.iter()
1324-
.filter(|r| r.is_ok() && r.as_ref().unwrap().1.is_ok())
1325-
.count();
1326-
let failure_count = results.len() - success_count;
1327-
1328-
if failure_count > 0 {
1329-
warn!(
1330-
"Stopped {}/{} HTTP servers successfully, {} had errors",
1331-
success_count,
1332-
results.len(),
1333-
failure_count
1334-
);
1277+
.keys()
1278+
.cloned()
1279+
.collect::<Vec<u64>>()
1280+
{
1281+
let servers_clone = servers_ref.clone();
1282+
let task = tokio::spawn(async move {
1283+
let start_time = std::time::Instant::now();
1284+
debug!("Starting shutdown of HTTP server {}", server_id);
1285+
1286+
let result = {
1287+
let mut servers = servers_clone.write().await;
1288+
if let Some(server) = servers.get_mut(&server_id) {
1289+
server.stop().await
13351290
} else {
1336-
info!("All {} HTTP servers stopped successfully", success_count);
1291+
debug!("Server {} not found during shutdown", server_id);
1292+
Ok(())
13371293
}
1338-
}
1339-
Err(_) => {
1340-
error!("Global timeout reached while waiting for servers to stop");
1341-
1342-
// Log which servers might still be running
1343-
let servers = servers_ref.read().await;
1344-
let running_servers: Vec<u64> = servers
1345-
.iter()
1346-
.filter(|(_, server)| server.is_running())
1347-
.map(|(id, _)| *id)
1348-
.collect();
1349-
1350-
if !running_servers.is_empty() {
1351-
error!(
1352-
"The following servers may still be running: {:?}",
1353-
running_servers
1354-
);
1294+
};
1295+
1296+
match result {
1297+
Ok(_) => debug!(
1298+
"Successfully stopped HTTP server {} in {:?}",
1299+
server_id,
1300+
start_time.elapsed()
1301+
),
1302+
Err(ref e) => {
1303+
warn!("Error stopping HTTP server {}: {}", server_id, e)
13551304
}
13561305
}
1357-
}
1358-
} else {
1359-
debug!("No HTTP servers to shut down");
1306+
(server_id, result)
1307+
});
1308+
shutdown_tasks.push(task);
13601309
}
13611310

1362-
// Then clean up the handles
1311+
// Wait for all servers to be stopped with a global timeout
1312+
debug!("Waiting for all servers to stop (timeout: 10s)");
1313+
match tokio::time::timeout(
1314+
std::time::Duration::from_secs(10),
1315+
futures::future::join_all(shutdown_tasks),
1316+
)
1317+
.await
13631318
{
1364-
let mut handles = server_handles_ref.write().await;
1365-
if !handles.is_empty() {
1366-
debug!("Cleaning up {} server handles", handles.len());
1367-
let handle_ids: Vec<u64> = handles.keys().cloned().collect();
1368-
1369-
for handle_id in handle_ids {
1370-
if let Some(handle) = handles.remove(&handle_id) {
1371-
if let Some(task) = handle.task {
1372-
if !task.is_finished() {
1373-
debug!("Aborting server task for server {}", handle_id);
1374-
task.abort();
1375-
// Wait a tiny bit for the abort to register
1376-
tokio::time::sleep(Duration::from_millis(50)).await;
1377-
debug!("Aborted server task for server {}", handle_id);
1378-
}
1319+
Ok(results) => {
1320+
let success_count = results
1321+
.iter()
1322+
.filter(|r| r.is_ok() && r.as_ref().unwrap().1.is_ok())
1323+
.count();
1324+
let failure_count = results.len() - success_count;
1325+
1326+
if failure_count > 0 {
1327+
warn!(
1328+
"Stopped {}/{} HTTP servers successfully, {} had errors",
1329+
success_count,
1330+
results.len(),
1331+
failure_count
1332+
);
1333+
} else {
1334+
info!("All {} HTTP servers stopped successfully", success_count);
1335+
}
1336+
}
1337+
Err(_) => {
1338+
error!("Global timeout reached while waiting for servers to stop");
1339+
1340+
// Log which servers might still be running
1341+
let servers = servers_ref.read().await;
1342+
let running_servers: Vec<u64> = servers
1343+
.iter()
1344+
.filter(|(_, server)| server.is_running())
1345+
.map(|(id, _)| *id)
1346+
.collect();
1347+
1348+
if !running_servers.is_empty() {
1349+
error!(
1350+
"The following servers may still be running: {:?}",
1351+
running_servers
1352+
);
1353+
}
1354+
}
1355+
}
1356+
} else {
1357+
debug!("No HTTP servers to shut down");
1358+
}
1359+
1360+
// Then clean up the handles
1361+
{
1362+
let mut handles = server_handles_ref.write().await;
1363+
if !handles.is_empty() {
1364+
debug!("Cleaning up {} server handles", handles.len());
1365+
let handle_ids: Vec<u64> = handles.keys().cloned().collect();
1366+
1367+
for handle_id in handle_ids {
1368+
if let Some(handle) = handles.remove(&handle_id) {
1369+
if let Some(task) = handle.task {
1370+
if !task.is_finished() {
1371+
debug!("Aborting server task for server {}", handle_id);
1372+
task.abort();
1373+
// Wait a tiny bit for the abort to register
1374+
tokio::time::sleep(Duration::from_millis(50)).await;
1375+
debug!("Aborted server task for server {}", handle_id);
13791376
}
1377+
}
13801378

1381-
if let Some(tx) = handle.shutdown_tx {
1382-
if tx.is_closed() {
1383-
debug!(
1384-
"Shutdown channel for server {} already closed",
1385-
handle_id
1386-
);
1387-
} else {
1388-
debug!("Sending final shutdown signal to server {}", handle_id);
1389-
let _ = tx.send(()); // Ignore errors, this is just a backup
1390-
}
1379+
if let Some(tx) = handle.shutdown_tx {
1380+
if tx.is_closed() {
1381+
debug!(
1382+
"Shutdown channel for server {} already closed",
1383+
handle_id
1384+
);
1385+
} else {
1386+
debug!("Sending final shutdown signal to server {}", handle_id);
1387+
let _ = tx.send(()); // Ignore errors, this is just a backup
13911388
}
13921389
}
13931390
}
13941391
}
13951392
}
1393+
}
13961394

1397-
// Add a longer delay to ensure OS has time to release sockets
1398-
debug!("Waiting for OS to release socket resources");
1399-
tokio::time::sleep(Duration::from_millis(500)).await;
1395+
// Add a longer delay to ensure OS has time to release sockets
1396+
debug!("Waiting for OS to release socket resources");
1397+
tokio::time::sleep(Duration::from_millis(500)).await;
14001398

1401-
info!("HTTP Framework shutdown complete");
1402-
});
1399+
info!("HTTP Framework shutdown complete");
1400+
1401+
// Send acknowledgment back to shutdown controller
1402+
if let Some(sender) = signal.sender {
1403+
debug!("Sending shutdown acknowledgment");
1404+
let _ = sender.send(());
1405+
}
14031406

14041407
Ok(())
14051408
})

crates/theater-handler-http-framework/src/server_instance.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,13 +671,30 @@ impl ServerInstance {
671671
return Ok(());
672672
}
673673

674+
// Send shutdown signal first to allow graceful shutdown
674675
if let Some(shutdown_tx) = self.shutdown_tx.take() {
676+
debug!("Sending shutdown signal to server {}", self.id);
675677
let _ = shutdown_tx.send(());
676678
}
677679

680+
// Wait for the server task to complete gracefully instead of aborting
678681
if let Some(handle) = self.server_handle.take() {
679-
handle.abort();
682+
debug!("Waiting for server {} to shut down gracefully", self.id);
680683

684+
// Give it a reasonable timeout in case it hangs
685+
match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
686+
Ok(Ok(_)) => {
687+
debug!("Server {} shut down gracefully", self.id);
688+
}
689+
Ok(Err(e)) => {
690+
warn!("Server {} task panicked during shutdown: {:?}", self.id, e);
691+
}
692+
Err(_) => {
693+
warn!("Server {} shutdown timed out after 5s, port may not be released immediately", self.id);
694+
}
695+
}
696+
697+
// Clean up WebSocket connections after server has stopped
681698
let connections_count = self.active_ws_connections.read().await.len();
682699
if connections_count > 0 {
683700
debug!(
@@ -692,7 +709,8 @@ impl ServerInstance {
692709
self.running = false;
693710
info!("HTTP server {} on port {} stopped", self.id, self.port);
694711

695-
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
712+
// Longer delay to ensure OS releases the port
713+
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
696714
Ok(())
697715
}
698716

0 commit comments

Comments
 (0)