Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions site/src/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,28 +457,36 @@ async fn perform_queue_tick(ctxt: &SiteCtxt) -> anyhow::Result<()> {
Ok(())
}

/// Entry point for the cron job that manages the benchmark request and job queue.
pub async fn create_queue_process(
/// Entry point for the job queue handler, a "cron job" that manages benchmark requests and
/// the job queue.
pub async fn create_job_queue_process(
site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>,
run_interval: Duration,
) {
// The job queue process will be restarted in case of panics.
// If it panicked repeatedly because of some transient error, it could lead to 100% CPU
// utilization and a panic loop.
// We thus ensure that we will always wait for the specified interval **first** before
// attempting to run the cron job. In that case even if it panics everytime, the panic won't
// happen more often than N seconds.
let mut interval = time::interval(run_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.reset();

let ctxt = site_ctxt.clone();

loop {
interval.tick().await;

if let Some(ctxt_clone) = {
let guard = ctxt.read();
guard.as_ref().cloned()
} {
match perform_queue_tick(&ctxt_clone).await {
Ok(_) => log::info!("Cron job finished"),
Err(e) => log::error!("Cron job failed to execute: {e:?}"),
Ok(_) => log::info!("Job queue handler finished"),
Err(e) => log::error!("Job queue handler failed: {e:?}"),
}
}

interval.tick().await;
}
}

Expand Down
33 changes: 24 additions & 9 deletions site/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::future::FutureExt;
use parking_lot::RwLock;
use site::job_queue::{create_queue_process, is_job_queue_enabled};
use site::job_queue::{create_job_queue_process, is_job_queue_enabled};
use site::load;
use std::env;
use std::sync::Arc;
Expand Down Expand Up @@ -59,15 +59,19 @@ async fn main() {

let server = site::server::start(ctxt.clone(), port).fuse();

if is_job_queue_enabled() {
let create_job_queue_handler = |ctxt: Arc<RwLock<Option<Arc<load::SiteCtxt>>>>| {
task::spawn(async move {
create_queue_process(
ctxt.clone(),
Duration::from_secs(queue_update_interval_seconds),
)
.await;
});
}
if is_job_queue_enabled() {
create_job_queue_process(ctxt, Duration::from_secs(queue_update_interval_seconds))
.await;
} else {
futures::future::pending::<()>().await;
}
})
.fuse()
};

let mut job_queue_handler = create_job_queue_handler(ctxt.clone());

futures::pin_mut!(server);
futures::pin_mut!(fut);
Expand All @@ -85,6 +89,17 @@ async fn main() {
}
}
}
// We want to have a panic boundary here; if the job queue handler panics for any
// reason (e.g. a transient networking/DB issue), we want to continue running it in the
// future.
// We thus use tokio::task::spawn, wait for a potential panic, and then restart the
// task again.
res = job_queue_handler => {
// The job queue handler task future has "finished", which means that it had to crash.
let error = res.expect_err("Job queue handler finished without an error");
log::error!("The job queue handler has panicked\n{error:?}\nIt will be now restarted");
job_queue_handler = create_job_queue_handler(ctxt.clone());
}
}
}
}
Loading