From 8b3e32d5a328345f471de34adcbce398265177e6 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 12 Nov 2024 11:50:15 -0500 Subject: [PATCH 1/7] Extract common request handling code --- compiler/base/orchestrator/src/coordinator.rs | 368 +++++++----------- 1 file changed, 148 insertions(+), 220 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 7c19f732..e1b1a49e 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -366,12 +366,12 @@ pub struct ExecuteRequest { pub code: String, } -impl ExecuteRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { +impl LowerRequest for ExecuteRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } @@ -457,15 +457,39 @@ pub struct CompileRequest { } impl CompileRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { + const OUTPUT_PATH: &str = "compilation"; + + fn read_output_request(&self) -> ReadFileRequest { + ReadFileRequest { + path: Self::OUTPUT_PATH.to_owned(), + } + } + + pub(crate) fn postprocess_result(&self, mut code: String) -> String { + if let CompileTarget::Assembly(_, demangle, process) = self.target { + if demangle == DemangleAssembly::Demangle { + code = asm_cleanup::demangle_asm(&code); + } + + if process == ProcessAssembly::Filter { + code = asm_cleanup::filter_asm(&code); + } + } + + code + } +} + +impl LowerRequest for CompileRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } - pub(crate) fn execute_cargo_request(&self, output_path: &str) -> ExecuteCommandRequest { + fn execute_cargo_request(&self) -> ExecuteCommandRequest { use CompileTarget::*; let mut args = if let Wasm = self.target { @@ -495,8 +519,8 @@ impl CompileRequest { } LlvmIr => args.extend(&["--", "--emit", "llvm-ir=compilation"]), Mir => args.extend(&["--", "--emit", "mir=compilation"]), - Hir => args.extend(&["--", "-Zunpretty=hir", "-o", output_path]), - Wasm => args.extend(&["-o", output_path]), + Hir => args.extend(&["--", "-Zunpretty=hir", "-o", Self::OUTPUT_PATH]), + Wasm => args.extend(&["-o", Self::OUTPUT_PATH]), } let mut envs = HashMap::new(); if self.backtrace { @@ -510,20 +534,6 @@ impl CompileRequest { cwd: None, } } - - pub(crate) fn postprocess_result(&self, mut code: String) -> String { - if let CompileTarget::Assembly(_, demangle, process) = self.target { - if demangle == DemangleAssembly::Demangle { - code = asm_cleanup::demangle_asm(&code); - } - - if process == ProcessAssembly::Filter { - code = asm_cleanup::filter_asm(&code); - } - } - - code - } } impl CargoTomlModifier for CompileRequest { @@ -563,15 +573,23 @@ pub struct FormatRequest { } impl FormatRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { + fn read_output_request(&self) -> ReadFileRequest { + ReadFileRequest { + path: self.crate_type.primary_path().to_owned(), + } + } +} + +impl LowerRequest for FormatRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } - pub(crate) fn execute_cargo_request(&self) -> ExecuteCommandRequest { + fn execute_cargo_request(&self) -> ExecuteCommandRequest { ExecuteCommandRequest { cmd: "cargo".to_owned(), args: vec!["fmt".to_owned()], @@ -611,16 +629,16 @@ pub struct ClippyRequest { pub code: String, } -impl ClippyRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { +impl LowerRequest for ClippyRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } - pub(crate) fn execute_cargo_request(&self) -> ExecuteCommandRequest { + fn execute_cargo_request(&self) -> ExecuteCommandRequest { ExecuteCommandRequest { cmd: "cargo".to_owned(), args: vec!["clippy".to_owned()], @@ -659,16 +677,16 @@ pub struct MiriRequest { pub code: String, } -impl MiriRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { +impl LowerRequest for MiriRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } - pub(crate) fn execute_cargo_request(&self) -> ExecuteCommandRequest { + fn execute_cargo_request(&self) -> ExecuteCommandRequest { ExecuteCommandRequest { cmd: "cargo".to_owned(), args: vec!["miri-playground".to_owned()], @@ -707,16 +725,16 @@ pub struct MacroExpansionRequest { pub code: String, } -impl MacroExpansionRequest { - pub(crate) fn delete_previous_main_request(&self) -> DeleteFileRequest { +impl LowerRequest for MacroExpansionRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest { delete_previous_primary_file_request(self.crate_type) } - pub(crate) fn write_main_request(&self) -> WriteFileRequest { + fn write_main_request(&self) -> WriteFileRequest { write_primary_file_request(self.crate_type, &self.code) } - pub(crate) fn execute_cargo_request(&self) -> ExecuteCommandRequest { + fn execute_cargo_request(&self) -> ExecuteCommandRequest { ExecuteCommandRequest { cmd: "cargo".to_owned(), args: ["rustc", "--", "-Zunpretty=expanded"] @@ -1320,21 +1338,6 @@ impl Container { ) -> Result { use execute_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1342,10 +1345,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(request, token).await?; let task = async move { let ExecuteCommandResponse { @@ -1409,26 +1409,6 @@ impl Container { ) -> Result { use compile_error::*; - let output_path: &str = "compilation"; - - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(output_path); - let read_output = ReadFileRequest { - path: output_path.to_owned(), - }; - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1436,10 +1416,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(&request, token).await?; drop(stdin_tx); drop(status_rx); @@ -1455,6 +1432,8 @@ impl Container { .context(CargoFailedSnafu)?; let code = if success { + let read_output = request.read_output_request(); + let file: ReadFileResponse = commander .one(read_output) .await @@ -1506,24 +1485,6 @@ impl Container { ) -> Result { use format_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); - let read_output = ReadFileRequest { - path: request.crate_type.primary_path().to_owned(), - }; - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1531,10 +1492,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(&request, token).await?; drop(stdin_tx); drop(status_rx); @@ -1549,6 +1507,7 @@ impl Container { .context(CargoTaskPanickedSnafu)? .context(CargoFailedSnafu)?; + let read_output = request.read_output_request(); let file = commander .one(read_output) .await @@ -1594,21 +1553,6 @@ impl Container { ) -> Result { use clippy_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1616,10 +1560,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(request, token).await?; drop(stdin_tx); drop(status_rx); @@ -1668,21 +1609,6 @@ impl Container { ) -> Result { use miri_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1690,10 +1616,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(request, token).await?; drop(stdin_tx); drop(status_rx); @@ -1745,21 +1668,6 @@ impl Container { ) -> Result { use macro_expansion_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); - - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); - - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); - - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; - let SpawnCargo { permit, task, @@ -1767,10 +1675,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = self - .spawn_cargo_task(token, execute_cargo) - .await - .context(CouldNotStartCargoSnafu)?; + } = self.do_request(request, token).await?; drop(stdin_tx); drop(status_rx); @@ -1799,6 +1704,33 @@ impl Container { }) } + async fn do_request( + &self, + request: impl LowerRequest + CargoTomlModifier, + token: CancellationToken, + ) -> Result { + use do_request_error::*; + + let delete_previous_main = request.delete_previous_main_request(); + let write_main = request.write_main_request(); + let execute_cargo = request.execute_cargo_request(); + + let delete_previous_main = self.commander.one(delete_previous_main); + let write_main = self.commander.one(write_main); + let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); + + let (delete_previous_main, write_main, modify_cargo_toml) = + join!(delete_previous_main, write_main, modify_cargo_toml); + + delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; + write_main.context(CouldNotWriteCodeSnafu)?; + modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; + + self.spawn_cargo_task(token, execute_cargo) + .await + .context(CouldNotStartCargoSnafu) + } + async fn spawn_cargo_task( &self, token: CancellationToken, @@ -1950,17 +1882,8 @@ pub enum ExecuteError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, - #[snafu(display("Could not modify Cargo.toml"))] - CouldNotModifyCargoToml { source: ModifyCargoTomlError }, - - #[snafu(display("Could not delete previous source code"))] - CouldNotDeletePreviousCode { source: CommanderError }, - - #[snafu(display("Could not write source code"))] - CouldNotWriteCode { source: CommanderError }, - - #[snafu(display("Could not start Cargo task"))] - CouldNotStartCargo { source: SpawnCargoError }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, #[snafu(display("The Cargo task panicked"))] CargoTaskPanicked { source: tokio::task::JoinError }, @@ -1992,17 +1915,8 @@ pub enum CompileError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, - #[snafu(display("Could not modify Cargo.toml"))] - CouldNotModifyCargoToml { source: ModifyCargoTomlError }, - - #[snafu(display("Could not delete previous source code"))] - CouldNotDeletePreviousCode { source: CommanderError }, - - #[snafu(display("Could not write source code"))] - CouldNotWriteCode { source: CommanderError }, - - #[snafu(display("Could not start Cargo task"))] - CouldNotStartCargo { source: SpawnCargoError }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, #[snafu(display("The Cargo task panicked"))] CargoTaskPanicked { source: tokio::task::JoinError }, @@ -2040,17 +1954,8 @@ pub enum FormatError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, - #[snafu(display("Could not modify Cargo.toml"))] - CouldNotModifyCargoToml { source: ModifyCargoTomlError }, - - #[snafu(display("Could not delete previous source code"))] - CouldNotDeletePreviousCode { source: CommanderError }, - - #[snafu(display("Could not write source code"))] - CouldNotWriteCode { source: CommanderError }, - - #[snafu(display("Could not start Cargo task"))] - CouldNotStartCargo { source: SpawnCargoError }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, #[snafu(display("The Cargo task panicked"))] CargoTaskPanicked { source: tokio::task::JoinError }, @@ -2088,17 +1993,8 @@ pub enum ClippyError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, - #[snafu(display("Could not modify Cargo.toml"))] - CouldNotModifyCargoToml { source: ModifyCargoTomlError }, - - #[snafu(display("Could not delete previous source code"))] - CouldNotDeletePreviousCode { source: CommanderError }, - - #[snafu(display("Could not write source code"))] - CouldNotWriteCode { source: CommanderError }, - - #[snafu(display("Could not start Cargo task"))] - CouldNotStartCargo { source: SpawnCargoError }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, #[snafu(display("The Cargo task panicked"))] CargoTaskPanicked { source: tokio::task::JoinError }, @@ -2130,17 +2026,8 @@ pub enum MiriError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, - #[snafu(display("Could not modify Cargo.toml"))] - CouldNotModifyCargoToml { source: ModifyCargoTomlError }, - - #[snafu(display("Could not delete previous source code"))] - CouldNotDeletePreviousCode { source: CommanderError }, - - #[snafu(display("Could not write source code"))] - CouldNotWriteCode { source: CommanderError }, - - #[snafu(display("Could not start Cargo task"))] - CouldNotStartCargo { source: SpawnCargoError }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, #[snafu(display("The Cargo task panicked"))] CargoTaskPanicked { source: tokio::task::JoinError }, @@ -2172,6 +2059,19 @@ pub enum MacroExpansionError { #[snafu(display("Could not start the container"))] CouldNotStartContainer { source: Error }, + #[snafu(transparent)] + DoRequest { source: DoRequestError }, + + #[snafu(display("The Cargo task panicked"))] + CargoTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("Cargo task failed"))] + CargoFailed { source: SpawnCargoError }, +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum DoRequestError { #[snafu(display("Could not modify Cargo.toml"))] CouldNotModifyCargoToml { source: ModifyCargoTomlError }, @@ -2183,12 +2083,6 @@ pub enum MacroExpansionError { #[snafu(display("Could not start Cargo task"))] CouldNotStartCargo { source: SpawnCargoError }, - - #[snafu(display("The Cargo task panicked"))] - CargoTaskPanicked { source: tokio::task::JoinError }, - - #[snafu(display("Cargo task failed"))] - CargoFailed { source: SpawnCargoError }, } struct SpawnCargo { @@ -2232,10 +2126,44 @@ struct Commander { id: Arc, } +trait LowerRequest { + fn delete_previous_main_request(&self) -> DeleteFileRequest; + + fn write_main_request(&self) -> WriteFileRequest; + + fn execute_cargo_request(&self) -> ExecuteCommandRequest; +} + +impl LowerRequest for &S +where + S: LowerRequest, +{ + fn delete_previous_main_request(&self) -> DeleteFileRequest { + S::delete_previous_main_request(self) + } + + fn write_main_request(&self) -> WriteFileRequest { + S::write_main_request(self) + } + + fn execute_cargo_request(&self) -> ExecuteCommandRequest { + S::execute_cargo_request(self) + } +} + trait CargoTomlModifier { fn modify_cargo_toml(&self, cargo_toml: toml::Value) -> toml::Value; } +impl CargoTomlModifier for &C +where + C: CargoTomlModifier, +{ + fn modify_cargo_toml(&self, cargo_toml: toml::Value) -> toml::Value { + C::modify_cargo_toml(self, cargo_toml) + } +} + #[derive(Debug)] struct ModifyCargoToml { commander: Commander, From c1aee2b22b8ea91dfd74f862e566acb5227a973b Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 12 Nov 2024 11:58:30 -0500 Subject: [PATCH 2/7] Change uses of `join!` to `try_join!` --- compiler/base/orchestrator/src/coordinator.rs | 144 ++++++++++++------ 1 file changed, 94 insertions(+), 50 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index e1b1a49e..30d7f671 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -1,8 +1,4 @@ -use futures::{ - future::{BoxFuture, OptionFuture}, - stream::BoxStream, - Future, FutureExt, Stream, StreamExt, -}; +use futures::{future::BoxFuture, stream::BoxStream, Future, FutureExt, Stream, StreamExt}; use serde::Deserialize; use snafu::prelude::*; use std::{ @@ -16,12 +12,12 @@ use std::{ time::Duration, }; use tokio::{ - join, process::{Child, ChildStdin, ChildStdout, Command}, select, sync::{mpsc, oneshot, OnceCell}, task::{JoinHandle, JoinSet}, time::{self, MissedTickBehavior}, + try_join, }; use tokio_stream::wrappers::ReceiverStream; use tokio_util::{io::SyncIoBridge, sync::CancellationToken}; @@ -799,11 +795,10 @@ impl WithOutput { where F: Future>, { - let stdout = stdout_rx.collect(); - let stderr = stderr_rx.collect(); + let stdout = stdout_rx.collect().map(Ok); + let stderr = stderr_rx.collect().map(Ok); - let (result, stdout, stderr) = join!(task, stdout, stderr); - let response = result?; + let (response, stdout, stderr) = try_join!(task, stdout, stderr)?; Ok(WithOutput { response, @@ -936,11 +931,11 @@ where c.versions().await.map_err(VersionsChannelError::from) }); - let (stable, beta, nightly) = join!(stable, beta, nightly); + let stable = async { stable.await.context(StableSnafu) }; + let beta = async { beta.await.context(BetaSnafu) }; + let nightly = async { nightly.await.context(NightlySnafu) }; - let stable = stable.context(StableSnafu)?; - let beta = beta.context(BetaSnafu)?; - let nightly = nightly.context(NightlySnafu)?; + let (stable, beta, nightly) = try_join!(stable, beta, nightly)?; Ok(Versions { stable, @@ -1128,16 +1123,17 @@ where let token = mem::take(token); token.cancel(); - let channels = - [stable, beta, nightly].map(|c| OptionFuture::from(c.take().map(|c| c.shutdown()))); + let channels = [stable, beta, nightly].map(|c| async { + match c.take() { + Some(c) => c.shutdown().await, + _ => Ok(()), + } + }); let [stable, beta, nightly] = channels; - let (stable, beta, nightly) = join!(stable, beta, nightly); - - stable.transpose()?; - beta.transpose()?; - nightly.transpose()?; + let (stable, beta, nightly) = try_join!(stable, beta, nightly)?; + let _: [(); 3] = [stable, beta, nightly]; Ok(()) } @@ -1196,14 +1192,28 @@ impl Container { let task = tokio::spawn( async move { - let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next()); + let child = async { + let _: std::process::ExitStatus = + child.wait().await.context(JoinWorkerSnafu)?; + Ok(()) + }; - c.context(JoinWorkerSnafu)?; - d.context(DemultiplexerTaskPanickedSnafu)? - .context(DemultiplexerTaskFailedSnafu)?; - if let Some(t) = t { - t.context(IoQueuePanickedSnafu)??; - } + let demultiplex_task = async { + demultiplex_task + .await + .context(DemultiplexerTaskPanickedSnafu)? + .context(DemultiplexerTaskFailedSnafu) + }; + + let task = async { + if let Some(t) = tasks.join_next().await { + t.context(IoQueuePanickedSnafu)??; + } + Ok(()) + }; + + let (c, d, t) = try_join!(child, demultiplex_task, task)?; + let _: [(); 3] = [c, d, t]; Ok(()) } @@ -1234,19 +1244,41 @@ impl Container { let token = CancellationToken::new(); - let rustc = self.rustc_version(token.clone()); - let rustfmt = self.tool_version(token.clone(), "fmt"); - let clippy = self.tool_version(token.clone(), "clippy"); - let miri = self.tool_version(token, "miri"); + let rustc = { + let token = token.clone(); + async { + self.rustc_version(token) + .await + .context(RustcSnafu)? + .context(RustcMissingSnafu) + } + }; + let rustfmt = { + let token = token.clone(); + async { + self.tool_version(token, "fmt") + .await + .context(RustfmtSnafu)? + .context(RustfmtMissingSnafu) + } + }; + let clippy = { + let token = token.clone(); + async { + self.tool_version(token, "clippy") + .await + .context(ClippySnafu)? + .context(ClippyMissingSnafu) + } + }; + let miri = { + let token = token.clone(); + async { self.tool_version(token, "miri").await.context(MiriSnafu) } + }; - let (rustc, rustfmt, clippy, miri) = join!(rustc, rustfmt, clippy, miri); + let _token = token.drop_guard(); - let rustc = rustc.context(RustcSnafu)?.context(RustcMissingSnafu)?; - let rustfmt = rustfmt - .context(RustfmtSnafu)? - .context(RustfmtMissingSnafu)?; - let clippy = clippy.context(ClippySnafu)?.context(ClippyMissingSnafu)?; - let miri = miri.context(MiriSnafu)?; + let (rustc, rustfmt, clippy, miri) = try_join!(rustc, rustfmt, clippy, miri)?; Ok(ChannelVersions { rustc, @@ -1711,21 +1743,33 @@ impl Container { ) -> Result { use do_request_error::*; - let delete_previous_main = request.delete_previous_main_request(); - let write_main = request.write_main_request(); - let execute_cargo = request.execute_cargo_request(); + let delete_previous_main = async { + self.commander + .one(request.delete_previous_main_request()) + .await + .context(CouldNotDeletePreviousCodeSnafu) + .map(drop::) + }; - let delete_previous_main = self.commander.one(delete_previous_main); - let write_main = self.commander.one(write_main); - let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request); + let write_main = async { + self.commander + .one(request.write_main_request()) + .await + .context(CouldNotWriteCodeSnafu) + .map(drop::) + }; - let (delete_previous_main, write_main, modify_cargo_toml) = - join!(delete_previous_main, write_main, modify_cargo_toml); + let modify_cargo_toml = async { + self.modify_cargo_toml + .modify_for(&request) + .await + .context(CouldNotModifyCargoTomlSnafu) + }; - delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?; - write_main.context(CouldNotWriteCodeSnafu)?; - modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; + let (d, w, m) = try_join!(delete_previous_main, write_main, modify_cargo_toml)?; + let _: [(); 3] = [d, w, m]; + let execute_cargo = request.execute_cargo_request(); self.spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu) From d1bbebc4446cddd02ce973e398cd57040bb01869 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 12 Nov 2024 15:41:30 -0500 Subject: [PATCH 3/7] Remove unneeded clone --- compiler/base/orchestrator/src/coordinator.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 30d7f671..6e12888a 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -1314,7 +1314,6 @@ impl Container { token: CancellationToken, cmd: ExecuteCommandRequest, ) -> Result, VersionError> { - let v = self.spawn_cargo_task(token.clone(), cmd).await?; let SpawnCargo { permit: _permit, task, @@ -1322,7 +1321,7 @@ impl Container { stdout_rx, stderr_rx, status_rx, - } = v; + } = self.spawn_cargo_task(token, cmd).await?; drop(stdin_tx); drop(status_rx); From 3ed1dca0fdcefcc8932a8c8486de587348556578 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 13 Nov 2024 09:55:58 -0500 Subject: [PATCH 4/7] Explicitly cancel the coordinator when it is dropped --- compiler/base/orchestrator/src/coordinator.rs | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 6e12888a..6f573039 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -893,7 +893,7 @@ pub struct Coordinator { stable: OnceCell, beta: OnceCell, nightly: OnceCell, - token: CancellationToken, + token: CancelOnDrop, } /// Runs things. @@ -910,7 +910,7 @@ where B: Backend, { pub fn new(limits: Arc, backend: B) -> Self { - let token = CancellationToken::new(); + let token = CancelOnDrop(CancellationToken::new()); Self { limits, @@ -1153,13 +1153,28 @@ where container .get_or_try_init(|| { let limits = self.limits.clone(); - let token = self.token.clone(); + let token = self.token.0.clone(); Container::new(channel, limits, token, &self.backend) }) .await } } +#[derive(Debug, Default)] +struct CancelOnDrop(CancellationToken); + +impl CancelOnDrop { + fn cancel(&self) { + self.0.cancel(); + } +} + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.0.cancel(); + } +} + #[derive(Debug)] struct Container { permit: Box, From f2a056e2a271fe151393afda4c3521d6cb38dd33 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 13 Nov 2024 09:56:50 -0500 Subject: [PATCH 5/7] Remove redundant container name from logs --- compiler/base/orchestrator/src/coordinator.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 6f573039..038e0b29 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -2558,9 +2558,9 @@ impl TerminateContainer { .insert(name.into()); if was_inserted { - info!(%name, "Started tracking container"); + info!("Started tracking container"); } else { - error!(%name, "Started tracking container, but it was already tracked"); + error!("Started tracking container, but it was already tracked"); } } @@ -2572,9 +2572,9 @@ impl TerminateContainer { .remove(name); if was_tracked { - info!(%name, "Stopped tracking container"); + info!("Stopped tracking container"); } else { - error!(%name, "Stopped tracking container, but it was not in the tracking set"); + error!("Stopped tracking container, but it was not in the tracking set"); } } From c79a2c41a065a87c02052e42f9deb447956916b3 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 13 Nov 2024 10:02:11 -0500 Subject: [PATCH 6/7] Log when the WebSocket starts and ends --- ui/src/server_axum/websocket.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 1b9892b5..31c40581 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -28,7 +28,7 @@ use tokio::{ time, }; use tokio_util::sync::CancellationToken; -use tracing::{error, instrument, warn, Instrument}; +use tracing::{error, info, instrument, warn, Instrument}; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] @@ -210,9 +210,11 @@ pub(crate) async fn handle( let id = WEBSOCKET_ID.fetch_add(1, Ordering::SeqCst); tracing::Span::current().record("ws_id", &id); + info!("WebSocket started"); handle_core(socket, factory, feature_flags, db).await; + info!("WebSocket ending"); metrics::LIVE_WS.dec(); let elapsed = start.elapsed(); metrics::DURATION_WS.observe(elapsed.as_secs_f64()); From d6b4d7520e53308e7f5bfb3a2bb0a66ae408f991 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Wed, 13 Nov 2024 11:19:33 -0500 Subject: [PATCH 7/7] Make the WebSocket timeouts configurable --- ui/src/main.rs | 30 ++++++++++++++++++++++++++++++ ui/src/server_axum.rs | 8 +++++--- ui/src/server_axum/websocket.rs | 12 ++++++------ 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/ui/src/main.rs b/ui/src/main.rs index 8decaaad..42eb0b05 100644 --- a/ui/src/main.rs +++ b/ui/src/main.rs @@ -8,12 +8,17 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use tracing::{error, info, warn}; use tracing_subscriber::EnvFilter; const DEFAULT_ADDRESS: &str = "127.0.0.1"; const DEFAULT_PORT: u16 = 5000; + +const DEFAULT_WEBSOCKET_IDLE_TIMEOUT: Duration = Duration::from_secs(60); +const DEFAULT_WEBSOCKET_SESSION_TIMEOUT: Duration = Duration::from_secs(45 * 60); + const DEFAULT_COORDINATORS_LIMIT: usize = 25; const DEFAULT_PROCESSES_LIMIT: usize = 10; @@ -50,6 +55,7 @@ struct Config { metrics_token: Option, feature_flags: FeatureFlags, request_db_path: Option, + websocket_config: WebSocketConfig, limits: Arc, port: u16, root: PathBuf, @@ -108,6 +114,23 @@ impl Config { let request_db_path = env::var_os("PLAYGROUND_REQUEST_DATABASE").map(Into::into); + let websocket_config = { + let idle_timeout = env::var("PLAYGROUND_WEBSOCKET_IDLE_TIMEOUT_S") + .ok() + .and_then(|l| l.parse().map(Duration::from_secs).ok()) + .unwrap_or(DEFAULT_WEBSOCKET_IDLE_TIMEOUT); + + let session_timeout = env::var("PLAYGROUND_WEBSOCKET_SESSION_TIMEOUT_S") + .ok() + .and_then(|l| l.parse().map(Duration::from_secs).ok()) + .unwrap_or(DEFAULT_WEBSOCKET_SESSION_TIMEOUT); + + WebSocketConfig { + idle_timeout, + session_timeout, + } + }; + let coordinators_limit = env::var("PLAYGROUND_COORDINATORS_LIMIT") .ok() .and_then(|l| l.parse().ok()) @@ -131,6 +154,7 @@ impl Config { metrics_token, feature_flags, request_db_path, + websocket_config, limits, port, root, @@ -232,3 +256,9 @@ impl limits::Lifecycle for LifecycleMetrics { metrics::PROCESS_ACTIVE.dec(); } } + +#[derive(Debug, Copy, Clone)] +struct WebSocketConfig { + idle_timeout: Duration, + session_timeout: Duration, +} diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index bbe2cad8..6d01422b 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -5,7 +5,7 @@ use crate::{ UNAVAILABLE_WS, }, request_database::Handle, - Config, GhToken, MetricsToken, + Config, GhToken, MetricsToken, WebSocketConfig, }; use async_trait::async_trait; use axum::{ @@ -111,7 +111,8 @@ pub(crate) async fn serve(config: Config) { .layer(Extension(db_handle)) .layer(Extension(Arc::new(SandboxCache::default()))) .layer(Extension(config.github_token())) - .layer(Extension(config.feature_flags)); + .layer(Extension(config.feature_flags)) + .layer(Extension(config.websocket_config)); if let Some(token) = config.metrics_token() { app = app.layer(Extension(token)) @@ -652,11 +653,12 @@ async fn metrics(_: MetricsAuthorization) -> Result, StatusCode> { async fn websocket( ws: WebSocketUpgrade, + Extension(config): Extension, Extension(factory): Extension, Extension(feature_flags): Extension, Extension(db): Extension, ) -> impl IntoResponse { - ws.on_upgrade(move |s| websocket::handle(s, factory.0, feature_flags.into(), db)) + ws.on_upgrade(move |s| websocket::handle(s, config, factory.0, feature_flags.into(), db)) } #[derive(Debug, serde::Deserialize)] diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 31c40581..1895c8ac 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -2,6 +2,7 @@ use crate::{ metrics::{self, record_metric, Endpoint, HasLabelsCore, Outcome}, request_database::Handle, server_axum::api_orchestrator_integration_impls::*, + WebSocketConfig, }; use axum::extract::ws::{Message, WebSocket}; @@ -199,6 +200,7 @@ struct ExecuteResponse { #[instrument(skip_all, fields(ws_id))] pub(crate) async fn handle( socket: WebSocket, + config: WebSocketConfig, factory: Arc, feature_flags: FeatureFlags, db: Handle, @@ -212,7 +214,7 @@ pub(crate) async fn handle( tracing::Span::current().record("ws_id", &id); info!("WebSocket started"); - handle_core(socket, factory, feature_flags, db).await; + handle_core(socket, config, factory, feature_flags, db).await; info!("WebSocket ending"); metrics::LIVE_WS.dec(); @@ -242,9 +244,6 @@ struct CoordinatorManager { } impl CoordinatorManager { - const IDLE_TIMEOUT: Duration = Duration::from_secs(60); - const SESSION_TIMEOUT: Duration = Duration::from_secs(45 * 60); - const N_PARALLEL: usize = 2; const N_KINDS: usize = 1; @@ -343,6 +342,7 @@ type CoordinatorManagerResult = std::result::Res async fn handle_core( mut socket: WebSocket, + config: WebSocketConfig, factory: Arc, feature_flags: FeatureFlags, db: Handle, @@ -363,7 +363,7 @@ async fn handle_core( } let mut manager = CoordinatorManager::new(&factory); - let mut session_timeout = pin!(time::sleep(CoordinatorManager::SESSION_TIMEOUT)); + let mut session_timeout = pin!(time::sleep(config.session_timeout)); let mut idle_timeout = pin!(Fuse::terminated()); let mut active_executions = BTreeMap::new(); @@ -409,7 +409,7 @@ async fn handle_core( // The last task has completed which means we are a // candidate for idling in a little while. if manager.is_empty() { - idle_timeout.set(time::sleep(CoordinatorManager::IDLE_TIMEOUT).fuse()); + idle_timeout.set(time::sleep(config.idle_timeout).fuse()); } let (error, meta) = match task {