diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 038e0b29..02b5c6ce 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -852,6 +852,9 @@ type ResourceResult = std::result::Result; pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static { /// Block until resources for a container are available. fn next_container(&self) -> BoxFuture<'static, ResourceResult>>; + + /// Block until someone reqeusts that you return an in-use container. + fn container_requested(&self) -> BoxFuture<'static, ()>; } /// Represents one allowed Docker container (or equivalent). @@ -884,6 +887,10 @@ impl CoordinatorFactory { Coordinator::new(limits, backend) } + + pub async fn container_requested(&self) { + self.limits.container_requested().await + } } #[derive(Debug)] diff --git a/compiler/base/orchestrator/src/coordinator/limits.rs b/compiler/base/orchestrator/src/coordinator/limits.rs index 0e1f741f..6a76c226 100644 --- a/compiler/base/orchestrator/src/coordinator/limits.rs +++ b/compiler/base/orchestrator/src/coordinator/limits.rs @@ -94,6 +94,7 @@ pub struct Global { lifecycle: L, container_semaphore: Arc, process_semaphore: Arc, + container_request_semaphore: Arc, start: u64, id: AtomicU64, } @@ -136,6 +137,7 @@ where pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self { let container_semaphore = Arc::new(Semaphore::new(container_limit)); let process_semaphore = Arc::new(Semaphore::new(process_limit)); + let container_request_semaphore = Arc::new(Semaphore::new(0)); let now = std::time::SystemTime::now(); let start = now @@ -149,6 +151,7 @@ where lifecycle, container_semaphore, process_semaphore, + container_request_semaphore, start, id, } @@ -163,13 +166,44 @@ where let lifecycle = self.lifecycle.clone(); let container_semaphore = self.container_semaphore.clone(); let process_semaphore = self.process_semaphore.clone(); + let container_request_semaphore = self.container_request_semaphore.clone(); let start = self.start; let id = self.id.fetch_add(1, Ordering::SeqCst); async move { let guard = ContainerAcquireGuard::start(&lifecycle); - let container_permit = container_semaphore.acquire_owned().await; + // Attempt to acquire the container semaphore. If we don't + // immediately get it, notify the container request + // semaphore. Any idle-but-not-yet-exited connections + // should watch that semaphore to see if they should give + // up thier container to allow someone else in. + // + // There *is* a race here: a container might naturally + // exit after we attempt to acquire the first time. In + // that case, we'd spuriously notify the request semaphore + // and a container might exit earlier than it needed + // to. However, this should be a transient issue and only + // occur when we are already at the upper bounds of our + // limits. In those cases, freeing an extra container or + // two shouldn't be the worst thing. + let container_permit = { + let fallback = { + let container_semaphore = container_semaphore.clone(); + async { + container_request_semaphore.add_permits(1); + container_semaphore.acquire_owned().await + } + }; + + tokio::select! { + biased; + + permit = container_semaphore.acquire_owned() => permit, + permit = fallback => permit, + } + }; + let container_permit = guard.complete(container_permit)?; let token = TrackContainer { @@ -183,6 +217,23 @@ where } .boxed() } + + fn container_requested(&self) -> BoxFuture<'static, ()> { + let container_request_semaphore = self.container_request_semaphore.clone(); + + async move { + let permit = container_request_semaphore + .acquire() + .await + .expect("The semaphore is never closed"); + + // We're now dealing with the request to return a + // container so we discard the permit to prevent anyone + // else from trying to handle it. + permit.forget(); + } + .boxed() + } } impl fmt::Display for TrackContainer diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 1895c8ac..4da6940e 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -16,6 +16,7 @@ use std::{ collections::BTreeMap, convert::TryFrom, mem, + ops::ControlFlow, pin::pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -444,13 +445,16 @@ async fn handle_core( }, _ = &mut idle_timeout, if manager.is_empty() => { - let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); + if handle_idle(&mut manager, &tx).await.is_break() { + break + } + }, - let Err(error) = idled else { continue }; + _ = factory.container_requested(), if manager.is_empty() => { + info!("Container requested to idle"); - if tx.send(Err((error, None))).await.is_err() { - // We can't send a response - break; + if handle_idle(&mut manager, &tx).await.is_break() { + break } }, @@ -506,6 +510,21 @@ fn response_to_message(response: MessageResponse) -> Message { Message::Text(resp) } +async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> { + let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); + + let Err(error) = idled else { + return ControlFlow::Continue(()); + }; + + if tx.send(Err((error, None))).await.is_err() { + // We can't send a response + return ControlFlow::Break(()); + } + + ControlFlow::Continue(()) +} + type ActiveExecutionInfo = (CancellationToken, Option>); async fn handle_msg(