diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 95690b02d..ec294e647 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -457,28 +457,36 @@ async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> { Ok(()) } -/// Entry point for the cron job that manages the benchmark request and job queue. -pub async fn create_queue_process( +/// Entry point for the job queue handler, a "cron job" that manages benchmark requests and +/// the job queue. +pub async fn create_job_queue_process( site_ctxt: Arc>>>, run_interval: Duration, ) { + // The job queue process will be restarted in case of panics. + // If it panicked repeatedly because of some transient error, it could lead to 100% CPU + // utilization and a panic loop. + // We thus ensure that we will always wait for the specified interval **first** before + // attempting to run the cron job. In that case even if it panics everytime, the panic won't + // happen more often than N seconds. let mut interval = time::interval(run_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + interval.reset(); let ctxt = site_ctxt.clone(); loop { + interval.tick().await; + if let Some(ctxt_clone) = { let guard = ctxt.read(); guard.as_ref().cloned() } { match perform_queue_tick(&ctxt_clone).await { - Ok(_) => log::info!("Cron job finished"), - Err(e) => log::error!("Cron job failed to execute: {e:?}"), + Ok(_) => log::info!("Job queue handler finished"), + Err(e) => log::error!("Job queue handler failed: {e:?}"), } } - - interval.tick().await; } } diff --git a/site/src/main.rs b/site/src/main.rs index 7affbd7de..c2e89a3ea 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -1,6 +1,6 @@ use futures::future::FutureExt; use parking_lot::RwLock; -use site::job_queue::{create_queue_process, is_job_queue_enabled}; +use site::job_queue::{create_job_queue_process, is_job_queue_enabled}; use site::load; use std::env; use std::sync::Arc; @@ -59,15 +59,19 @@ async fn main() { let server = site::server::start(ctxt.clone(), port).fuse(); - if is_job_queue_enabled() { + let create_job_queue_handler = |ctxt: Arc>>>| { task::spawn(async move { - create_queue_process( - ctxt.clone(), - Duration::from_secs(queue_update_interval_seconds), - ) - .await; - }); - } + if is_job_queue_enabled() { + create_job_queue_process(ctxt, Duration::from_secs(queue_update_interval_seconds)) + .await; + } else { + futures::future::pending::<()>().await; + } + }) + .fuse() + }; + + let mut job_queue_handler = create_job_queue_handler(ctxt.clone()); futures::pin_mut!(server); futures::pin_mut!(fut); @@ -85,6 +89,17 @@ async fn main() { } } } + // We want to have a panic boundary here; if the job queue handler panics for any + // reason (e.g. a transient networking/DB issue), we want to continue running it in the + // future. + // We thus use tokio::task::spawn, wait for a potential panic, and then restart the + // task again. + res = job_queue_handler => { + // The job queue handler task future has "finished", which means that it had to crash. + let error = res.expect_err("Job queue handler finished without an error"); + log::error!("The job queue handler has panicked\n{error:?}\nIt will be now restarted"); + job_queue_handler = create_job_queue_handler(ctxt.clone()); + } } } }