diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 02b5c6ce..f888e461 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -2848,8 +2848,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok let stdin = SyncIoBridge::new(stdin); let mut stdin = BufWriter::new(stdin); + let handle = tokio::runtime::Handle::current(); + loop { - let coordinator_msg = futures::executor::block_on(async { + let coordinator_msg = handle.block_on(async { select! { () = token.cancelled() => None, msg = rx.recv() => msg, diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs index e4aa0292..3e912c3b 100644 --- a/compiler/base/orchestrator/src/worker.rs +++ b/compiler/base/orchestrator/src/worker.rs @@ -450,7 +450,8 @@ impl ProcessState { let statistics_task = tokio::task::spawn_blocking({ let child_id = child.id(); let worker_msg_tx = worker_msg_tx.clone(); - move || stream_command_statistics(child_id, worker_msg_tx) + let handle = tokio::runtime::Handle::current(); + move || stream_command_statistics(child_id, worker_msg_tx, handle) }); let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); @@ -943,6 +944,7 @@ mod stats { fn stream_command_statistics( child_id: Option, worker_msg_tx: MultiplexingSender, + handle: tokio::runtime::Handle, ) -> Result<(), CommandStatisticsError> { use command_statistics_error::*; use stats::*; @@ -959,7 +961,7 @@ fn stream_command_statistics( let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?; while let Some(stats) = process.stats() { - let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats)); + let sent = handle.block_on(worker_msg_tx.send_ok(stats)); if sent.is_err() { // No one listening anymore break; diff --git a/ui/src/request_database.rs b/ui/src/request_database.rs index 23670b51..b5d65d08 100644 --- a/ui/src/request_database.rs +++ b/ui/src/request_database.rs @@ -215,7 +215,7 @@ impl Handle { let g = self .attempt_start_request(category, payload) .await - .map(|id| EndGuardInner(id, How::Abandoned, self)); + .map(|id| EndGuardInner(id, How::Abandoned, Some(self))); EndGuard(g) } } @@ -238,12 +238,16 @@ impl EndGuard { } } -struct EndGuardInner(Id, How, Handle); +struct EndGuardInner(Id, How, Option); impl Drop for EndGuardInner { fn drop(&mut self) { - let Self(id, how, ref handle) = *self; - futures::executor::block_on(handle.attempt_end_request(id, how)) + let Self(id, how, ref mut handle) = *self; + if let Ok(h) = tokio::runtime::Handle::try_current() { + if let Some(handle) = handle.take() { + h.spawn(async move { handle.attempt_end_request(id, how).await }); + } + } } } diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index 6d01422b..0be65f19 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -38,7 +38,7 @@ use std::{ sync::{Arc, LazyLock}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -use tokio::sync::Mutex; +use tokio::{select, sync::Mutex}; use tower_http::{ cors::{self, CorsLayer}, request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}, @@ -71,7 +71,7 @@ pub(crate) async fn serve(config: Config) { let factory = Factory(Arc::new(config.coordinator_factory())); let request_db = config.request_database(); - let (_db_task, db_handle) = request_db.spawn(); + let (db_task, db_handle) = request_db.spawn(); let root_files = static_file_service(config.root_path(), MAX_AGE_ONE_DAY); let asset_files = static_file_service(config.asset_path(), MAX_AGE_ONE_YEAR); @@ -170,9 +170,12 @@ pub(crate) async fn serve(config: Config) { .await .unwrap(); - axum::serve(listener, app.into_make_service()) - .await - .unwrap(); + let server = axum::serve(listener, app.into_make_service()); + + select! { + v = server => v.unwrap(), + v = db_task => v.unwrap(), + } } fn get_or_post(handler: impl Handler + Copy) -> MethodRouter {