Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use futures::{
use serde::Deserialize;
use snafu::prelude::*;
use std::{
collections::HashMap,
collections::{BTreeSet, HashMap},
fmt, mem, ops,
process::Stdio,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, LazyLock, Mutex,
},
time::Duration,
};
Expand Down Expand Up @@ -2532,11 +2532,23 @@ pub enum CommanderError {
WorkerOperationFailed { source: SerializedError2 },
}

pub static TRACKED_CONTAINERS: LazyLock<Mutex<BTreeSet<Arc<str>>>> =
LazyLock::new(Default::default);

#[derive(Debug)]
pub struct TerminateContainer(Option<(String, Command)>);

impl TerminateContainer {
pub fn new(name: String, command: Command) -> Self {
let was_inserted = TRACKED_CONTAINERS
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(name.clone().into());

if !was_inserted {
error!(%name, "This container was already tracked; duplicates are bad logic");
}

Self(Some((name, command)))
}

Expand All @@ -2548,6 +2560,7 @@ impl TerminateContainer {
use terminate_container_error::*;

if let Some((name, mut kill_child)) = self.0.take() {
Self::stop_tracking(&name);
let o = kill_child
.output()
.await
Expand All @@ -2558,6 +2571,16 @@ impl TerminateContainer {
Ok(())
}

fn stop_tracking(name: &str) {
let was_tracked = TRACKED_CONTAINERS
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(name);
if !was_tracked {
error!(%name, "Container was not in the tracking set");
}
}

fn report_failure(name: String, s: std::process::Output) {
// We generally don't care if the command itself succeeds or
// not; the container may already be dead! However, let's log
Expand All @@ -2570,6 +2593,9 @@ impl TerminateContainer {
let stdout = String::from_utf8_lossy(&s.stdout);
let stderr = String::from_utf8_lossy(&s.stderr);

let stdout = stdout.trim();
let stderr = stderr.trim();

error!(?code, %stdout, %stderr, %name, "Killing the container failed");
}
}
Expand All @@ -2578,6 +2604,7 @@ impl TerminateContainer {
impl Drop for TerminateContainer {
fn drop(&mut self) {
if let Some((name, mut kill_child)) = self.0.take() {
Self::stop_tracking(&name);
match kill_child.as_std_mut().output() {
Ok(o) => Self::report_failure(name, o),
Err(e) => error!("Unable to kill container {name} while dropping: {e}"),
Expand Down
20 changes: 18 additions & 2 deletions ui/src/server_axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use axum_extra::{
TypedHeader,
};
use futures::{future::BoxFuture, FutureExt};
use orchestrator::coordinator::{self, CoordinatorFactory, DockerBackend, Versions};
use orchestrator::coordinator::{
self, CoordinatorFactory, DockerBackend, Versions, TRACKED_CONTAINERS,
};
use snafu::prelude::*;
use std::{
convert::TryInto,
Expand Down Expand Up @@ -100,7 +102,11 @@ pub(crate) async fn serve(config: Config) {
.route("/metrics", get(metrics))
.route("/websocket", get(websocket))
.route("/nowebsocket", post(nowebsocket))
.route("/whynowebsocket", get(whynowebsocket))
.route("/internal/debug/whynowebsocket", get(whynowebsocket))
.route(
"/internal/debug/tracked-containers",
get(tracked_containers),
)
.layer(Extension(factory))
.layer(Extension(db_handle))
.layer(Extension(Arc::new(SandboxCache::default())))
Expand Down Expand Up @@ -680,6 +686,16 @@ async fn whynowebsocket() -> String {
format!("{:#?}", WS_ERRORS.lock().unwrap_or_else(|e| e.into_inner()))
}

async fn tracked_containers() -> String {
let tracked_containers = TRACKED_CONTAINERS
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone();
tracked_containers
.iter()
.fold(String::new(), |a, s| a + s + "\n")
}

#[derive(Debug)]
struct MetricsAuthorization;

Expand Down