diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 41eaa7f2..f8a60ce4 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -947,7 +947,7 @@ where use versions_error::*; let [stable, beta, nightly] = - [Channel::Stable, Channel::Beta, Channel::Nightly].map(|c| async move { + [Channel::Stable, Channel::Beta, Channel::Nightly].map(async |c| { let c = self.select_channel(c).await?; c.versions().await.map_err(VersionsChannelError::from) }); @@ -1144,11 +1144,9 @@ where let token = mem::take(token); token.cancel(); - let channels = [stable, beta, nightly].map(|c| async { - match c.take() { - Some(c) => c.shutdown().await, - _ => Ok(()), - } + let channels = [stable, beta, nightly].map(async |c| match c.take() { + Some(c) => c.shutdown().await, + _ => Ok(()), }); let [stable, beta, nightly] = channels; @@ -2987,7 +2985,7 @@ mod tests { } } - const MAX_CONCURRENT_TESTS: LazyLock = LazyLock::new(|| { + static MAX_CONCURRENT_TESTS: LazyLock = LazyLock::new(|| { env::var("TESTS_MAX_CONCURRENCY") .ok() .and_then(|v| v.parse().ok()) @@ -3081,7 +3079,7 @@ mod tests { (Mode::Release, "[optimized]"), ]; - let tests = params.into_iter().map(|(mode, expected)| async move { + let tests = params.into_iter().map(async |(mode, expected)| { let coordinator = new_coordinator(); let request = ExecuteRequest { @@ -3116,8 +3114,10 @@ mod tests { ]; let tests = params.into_iter().flat_map(|(code, works_in)| { - Edition::ALL.into_iter().zip(works_in).map( - move |(edition, expected_to_work)| async move { + Edition::ALL + .into_iter() + .zip(works_in) + .map(async |(edition, expected_to_work)| { let coordinator = new_coordinator(); let request = ExecuteRequest { @@ -3137,8 +3137,7 @@ mod tests { coordinator.shutdown().await?; Ok::<_, Error>(()) - }, - ) + }) }); try_join_all(tests).with_timeout().await?; @@ -3157,7 +3156,7 @@ mod tests { ), ]; - let tests = params.into_iter().map(|(crate_type, expected)| async move { + let tests = params.into_iter().map(async |(crate_type, expected)| { let coordinator = new_coordinator(); let request = ExecuteRequest { @@ -3190,7 +3189,7 @@ mod tests { let params = [(false, "Running `"), (true, "Running unittests")]; - let tests = params.into_iter().map(|(tests, expected)| async move { + let tests = params.into_iter().map(async |(tests, expected)| { let coordinator = new_coordinator(); let request = ExecuteRequest { @@ -3223,7 +3222,7 @@ mod tests { (true, "stack backtrace:"), ]; - let tests = params.into_iter().map(|(backtrace, expected)| async move { + let tests = params.into_iter().map(async |(backtrace, expected)| { let coordinator = new_coordinator(); let request = ExecuteRequest { @@ -3539,7 +3538,7 @@ mod tests { .await .unwrap(); - assert!(response.success, "stderr: {}", stderr); + assert!(response.success, "stderr: {stderr}"); assert_contains!(stderr, "Compiling"); assert_contains!(stderr, "Finished"); diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs index c3758ca2..9977c449 100644 --- a/compiler/base/orchestrator/src/worker.rs +++ b/compiler/base/orchestrator/src/worker.rs @@ -1172,7 +1172,7 @@ mod test { let next_result = this.0.pop_front().expect("FixedAsyncRead ran out of input"); if let Ok(v) = &next_result { - buf.put_slice(&v); + buf.put_slice(v); } Poll::Ready(next_result.map(drop)) diff --git a/ui/src/gist.rs b/ui/src/gist.rs index eb79b953..8e226867 100644 --- a/ui/src/gist.rs +++ b/ui/src/gist.rs @@ -23,7 +23,7 @@ impl From for Gist { 0 | 1 => files.into_iter().map(|(_, content)| content).collect(), _ => files .into_iter() - .map(|(name, content)| format!("// {}\n{}\n\n", name, content)) + .map(|(name, content)| format!("// {name}\n{content}\n\n")) .collect(), }; diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index fb1c6126..413a41b5 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -24,12 +24,11 @@ use axum_extra::{ headers::{authorization::Bearer, Authorization, CacheControl, ETag, IfNoneMatch}, TypedHeader, }; -use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use futures::{FutureExt, TryFutureExt}; use orchestrator::coordinator::{self, CoordinatorFactory, DockerBackend, TRACKED_CONTAINERS}; use snafu::prelude::*; use std::{ convert::TryInto, - future::Future, mem, path, str::FromStr, sync::{Arc, LazyLock}, @@ -134,7 +133,8 @@ pub(crate) async fn serve(config: Config) { let x_request_id = HeaderName::from_static("x-request-id"); // Basic access logging - app = app.layer( + app = app.layer({ + let x_request_id = x_request_id.clone(); TraceLayer::new_for_http().make_span_with(move |req: &Request<_>| { const REQUEST_ID: &str = "request_id"; @@ -152,17 +152,15 @@ pub(crate) async fn serve(config: Config) { } span - }), - ); - - let x_request_id = HeaderName::from_static("x-request-id"); + }) + }); // propagate `x-request-id` headers from request to response app = app.layer(PropagateRequestIdLayer::new(x_request_id.clone())); app = app.layer(SetRequestIdLayer::new( x_request_id.clone(), - MakeRequestUuid::default(), + MakeRequestUuid, )); let server_socket_addr = config.server_socket_addr(); @@ -208,16 +206,15 @@ async fn rewrite_help_as_index( next.run(req).await } -async fn attempt_record_request( +async fn attempt_record_request( db: Handle, - req: T, - f: impl FnOnce(T) -> RFut, -) -> Result + req: R, + f: impl AsyncFnOnce(R) -> Result, +) -> Result where - T: HasEndpoint + serde::Serialize, - RFut: Future>, + R: HasEndpoint + serde::Serialize, { - let category = format!("http.{}", <&str>::from(T::ENDPOINT)); + let category = format!("http.{}", <&str>::from(R::ENDPOINT)); let payload = serde_json::to_string(&req).unwrap_or_else(|_| String::from("")); let guard = db.start_with_guard(category, payload).await; @@ -233,9 +230,9 @@ async fn evaluate( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.execute(req).context(EvaluateSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.execute(req).context(EvaluateSnafu).await }) .await .map(Json) @@ -248,9 +245,9 @@ async fn compile( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.compile(req).context(CompileSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.compile(req).context(CompileSnafu).await }) .await .map(Json) @@ -263,9 +260,9 @@ async fn execute( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.execute(req).context(ExecuteSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.execute(req).context(ExecuteSnafu).await }) .await .map(Json) @@ -278,9 +275,9 @@ async fn format( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.format(req).context(FormatSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.format(req).context(FormatSnafu).await }) .await .map(Json) @@ -293,9 +290,9 @@ async fn clippy( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.clippy(req).context(ClippySnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.clippy(req).context(ClippySnafu).await }) .await .map(Json) @@ -308,9 +305,9 @@ async fn miri( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.miri(req).context(MiriSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.miri(req).context(MiriSnafu).await }) .await .map(Json) @@ -323,9 +320,9 @@ async fn macro_expansion( Extension(db): Extension, Json(req): Json, ) -> Result> { - attempt_record_request(db, req, |req| async { - with_coordinator(&factory.0, req, |c, req| { - c.macro_expansion(req).context(MacroExpansionSnafu).boxed() + attempt_record_request(db, req, async |req| { + with_coordinator(&factory.0, req, async |c, req| { + c.macro_expansion(req).context(MacroExpansionSnafu).await }) .await .map(Json) @@ -433,10 +430,10 @@ impl Outcome { } } -async fn with_coordinator( +async fn with_coordinator( factory: &CoordinatorFactory, req: WebReq, - f: F, + f: impl AsyncFnOnce(&coordinator::Coordinator, Req) -> Result, ) -> Result where WebReq: TryInto, @@ -445,8 +442,6 @@ where Req: HasLabelsCore, Resp: Into, Resp: IsSuccess, - for<'f> F: - FnOnce(&'f coordinator::Coordinator, Req) -> BoxFuture<'f, Result>, { let coordinator = factory.build(); @@ -525,9 +520,8 @@ where .with_max_age(SANDBOX_CACHE_TIME_TO_LIVE) .with_public(); - let use_fresh = if_none_match.map_or(true, |if_none_match| { - if_none_match.0.precondition_passes(&etag) - }); + let use_fresh = + if_none_match.is_none_or(|if_none_match| if_none_match.0.precondition_passes(&etag)); let etag = TypedHeader(etag); let cache_control = TypedHeader(cache_control); @@ -617,7 +611,7 @@ async fn nowebsocket(Json(req): Json) { } static WS_ERRORS: LazyLock>> = - LazyLock::new(|| Default::default()); + LazyLock::new(Default::default); fn record_websocket_error(error: String) { *WS_ERRORS diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 3ddeda84..27a0a0be 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -16,7 +16,6 @@ use std::{ collections::BTreeMap, convert::TryFrom, mem, - ops::ControlFlow, pin::pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -212,7 +211,7 @@ pub(crate) async fn handle( let start = Instant::now(); let id = WEBSOCKET_ID.fetch_add(1, Ordering::SeqCst); - tracing::Span::current().record("ws_id", &id); + tracing::Span::current().record("ws_id", id); info!("WebSocket started"); handle_core(socket, config, factory, feature_flags, db).await; @@ -371,25 +370,55 @@ async fn handle_core( let mut active_execution_gc_interval = time::interval(Duration::from_secs(30)); loop { - tokio::select! { - request = socket.recv() => { + use Event::*; + + enum Event { + Request(Option>), + Response(Option>), + Task(Result, tokio::task::JoinError>), + GarbageCollection, + IdleTimeout, + IdleRequest, + SessionTimeout, + } + + let event = tokio::select! { + request = socket.recv() => Request(request), + + resp = rx.recv() => Response(resp), + + // We don't care if there are no running tasks + Some(task) = manager.join_next() => Task(task), + + _ = active_execution_gc_interval.tick() => GarbageCollection, + + _ = &mut idle_timeout, if manager.is_empty() => IdleTimeout, + + _ = factory.container_requested(), if manager.is_empty() => IdleRequest, + + _ = &mut session_timeout => SessionTimeout, + }; + + match event { + Request(request) => { metrics::WS_INCOMING.inc(); match request { - None => { - // browser disconnected - break; - } - Some(Ok(Message::Text(txt))) => handle_msg(&txt, &tx, &mut manager, &mut active_executions, &db).await, - Some(Ok(_)) => { - // unknown message type - continue; + // browser disconnected + None => break, + + Some(Ok(Message::Text(txt))) => { + handle_msg(&txt, &tx, &mut manager, &mut active_executions, &db).await } + + // unknown message type + Some(Ok(_)) => continue, + Some(Err(e)) => super::record_websocket_error(e.to_string()), } - }, + } - resp = rx.recv() => { + Response(resp) => { let resp = resp.expect("The rx should never close as we have a tx"); let success = resp.is_ok(); @@ -403,10 +432,9 @@ async fn handle_core( let success = if success { "true" } else { "false" }; metrics::WS_OUTGOING.with_label_values(&[success]).inc(); - }, + } - // We don't care if there are no running tasks - Some(task) = manager.join_next() => { + Task(task) => { // The last task has completed which means we are a // candidate for idling in a little while. if manager.is_empty() { @@ -415,17 +443,21 @@ async fn handle_core( let (error, meta) = match task { Ok(Ok(())) => continue, + Ok(Err(error)) => error, + Err(error) => { // The task was cancelled; no need to report - let Ok(panic) = error.try_into_panic() else { continue }; + let Ok(panic) = error.try_into_panic() else { + continue; + }; let text = match panic.downcast::() { Ok(text) => *text, Err(panic) => match panic.downcast::<&str>() { Ok(text) => text.to_string(), _ => "An unknown panic occurred".into(), - } + }, }; (WebSocketTaskPanicSnafu { text }.build(), None) } @@ -435,32 +467,30 @@ async fn handle_core( // We can't send a response break; } - }, + } - _ = active_execution_gc_interval.tick() => { + GarbageCollection => { active_executions = mem::take(&mut active_executions) .into_iter() - .filter(|(_id, (_, tx))| tx.as_ref().map_or(false, |tx| !tx.is_closed())) + .filter(|(_id, (_, tx))| tx.as_ref().is_some_and(|tx| !tx.is_closed())) .collect(); - }, + } - _ = &mut idle_timeout, if manager.is_empty() => { - if handle_idle(&mut manager, &tx).await.is_break() { - break + IdleTimeout | IdleRequest => { + if let IdleRequest = event { + info!("Container requested to idle"); } - }, - _ = factory.container_requested(), if manager.is_empty() => { - info!("Container requested to idle"); + let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); + let Err(error) = idled else { continue }; - if handle_idle(&mut manager, &tx).await.is_break() { - break + if tx.send(Err((error, None))).await.is_err() { + // We can't send a response + break; } - }, - - _ = &mut session_timeout => { - break; } + + SessionTimeout => break, } } @@ -474,9 +504,7 @@ async fn connect_handshake(socket: &mut WebSocket) -> bool { let Some(Ok(Message::Text(txt))) = socket.recv().await else { return false; }; - let Ok(HandshakeMessage::Connected { payload, .. }) = - serde_json::from_str::(&txt) - else { + let Ok(HandshakeMessage::Connected { payload, .. }) = serde_json::from_str(&txt) else { return false; }; if !payload.i_accept_this_is_an_unsupported_api { @@ -510,21 +538,6 @@ fn response_to_message(response: MessageResponse) -> Message { Message::Text(resp.into()) } -async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> { - let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu); - - let Err(error) = idled else { - return ControlFlow::Continue(()); - }; - - if tx.send(Err((error, None))).await.is_err() { - // We can't send a response - return ControlFlow::Break(()); - } - - ControlFlow::Continue(()) -} - type ActiveExecutionInfo = (DropGuard, Option>); async fn handle_msg( @@ -555,7 +568,7 @@ async fn handle_msg( .spawn({ let tx = tx.clone(); let meta = meta.clone(); - |coordinator| async { + async |coordinator| { let r = handle_execute( token, execution_rx, @@ -693,13 +706,13 @@ async fn handle_execute_inner( let mut stdin_tx = Some(stdin_tx); - let send_stdout = |payload| async { + let send_stdout = async |payload| { let meta = meta.clone(); tx.send(Ok(MessageResponse::ExecuteStdout { payload, meta })) .await }; - let send_stderr = |payload| async { + let send_stderr = async |payload| { let meta = meta.clone(); tx.send(Ok(MessageResponse::ExecuteStderr { payload, meta })) .await