diff --git a/collector/collect-job-queue.sh b/collector/collect-job-queue.sh new file mode 100755 index 000000000..70514966d --- /dev/null +++ b/collector/collect-job-queue.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# This script expects DATABASE and COLLECTOR_NAME to be defined in the environment + +set -u -o pipefail + +echo "Running job queue collector" + +export RUST_LOG=collector=trace,collector::sysroot=debug +export PATH="/home/collector/.cargo/bin:$PATH" + +while : ; do + # Update and rebuild the collector. + git pull + git reset --hard @{upstream} + + # Make sure we have a recent build, so that we can successfully build + # the collector. + rustup update stable + cargo +stable build --release -p collector + + CURRENT_SHA=`git rev-parse HEAD` + + target/release/collector benchmark_job_queue \ + --db "${DATABASE}" \ + --check_git_sha \ + --git_sha "${CURRENT_SHA}" + --collector_name "${COLLECTOR_NAME}" + + STATUS=$? + echo finished run at `date` with exit code $STATUS + + # Wait a bit if the command has failed. + if [ $STATUS -ne 0 ]; then + sleep 60 + fi +done diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index c908d5468..055bb6d72 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -1493,6 +1493,9 @@ fn get_host_tuple_from_rustc(rustc: &str) -> anyhow::Result { /// Maximum number of failures before a job will be marked as failed. const MAX_JOB_FAILS: u32 = 3; +/// How long should the collector sleep for if it does not find any job in the job queue. +const JOB_WAIT_SLEEP_TIME: Duration = Duration::from_secs(30); + async fn run_job_queue_benchmarks( pool: Pool, mut conn: Box, @@ -1504,96 +1507,113 @@ async fn run_job_queue_benchmarks( let mut last_request_tag = None; - while let Some((benchmark_job, artifact_id)) = conn - .dequeue_benchmark_job( - collector.name(), - collector.target(), - collector.benchmark_set(), - ) - .await? - { - // Are we benchmarking a different benchmark request than in the previous iteration of the - // loop? - let is_new_request = last_request_tag.is_some() - && last_request_tag.as_deref() != Some(benchmark_job.request_tag()); - if is_new_request { - let _ = tidy_toolchain_cache_dir(); - } - - // Here we check if we should update our commit SHA, if rustc-perf has been updated. - // We only check for updates when we switch *benchmark requests*, not *benchmark jobs*, - // to avoid changing code in the middle of benchmarking the same request. - // Note that if an update happens, the job that we have just dequeued will have its deque - // counter increased. But since updates are relatively rare, that shouldn't be a big deal, - // it will be dequeued again when the collector starts again. - if check_git_sha && is_new_request && needs_git_update(collector) { + // Outer loop - wait for some jobs to appear + loop { + if check_git_sha && needs_git_update(collector) { log::warn!("Exiting collector to update itself from git."); - return Ok(()); + break; } - last_request_tag = Some(benchmark_job.request_tag().to_string()); + while let Some((benchmark_job, artifact_id)) = conn + .dequeue_benchmark_job( + collector.name(), + collector.target(), + collector.benchmark_set(), + ) + .await? + { + // Are we benchmarking a different benchmark request than in the previous iteration of the + // loop? + let is_new_request = last_request_tag.is_none() + || (last_request_tag.as_deref() != Some(benchmark_job.request_tag())); + if is_new_request { + log::info!("Starting new request {}", benchmark_job.request_tag()); + let _ = tidy_toolchain_cache_dir(); + } - log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); - let result = run_benchmark_job( - conn.as_mut(), - &benchmark_job, - artifact_id.clone(), - &all_compile_benchmarks, - ) - .await; - match result { - Ok(_) => { - log::info!("Job finished sucessfully"); - conn.mark_benchmark_job_as_completed( - benchmark_job.id(), - BenchmarkJobConclusion::Success, - ) - .await?; + // Here we check if we should update our commit SHA, if rustc-perf has been updated. + // We only check for updates when we switch *benchmark requests*, not *benchmark jobs*, + // to avoid changing code in the middle of benchmarking the same request. + // Note that if an update happens, the job that we have just dequeued will have its deque + // counter increased. But since updates are relatively rare, that shouldn't be a big deal, + // it will be dequeued again when the collector starts again. + if check_git_sha && is_new_request && needs_git_update(collector) { + log::warn!("Exiting collector to update itself from git."); + return Ok(()); } - Err(error) => { - match error { - BenchmarkJobError::Permanent(error) => { - log::error!("Job finished with permanent error: {error:?}"); - - // Store the error to the database - let artifact_row_id = conn.artifact_id(&artifact_id).await; - // Use a placeholder to say that the error is associated with a job, - // not with a benchmark. - conn.record_error( - artifact_row_id, - "Job failure", - &format!("Error while benchmarking job {benchmark_job:?}: {error:?}"), - Some(benchmark_job.id()), - ) - .await; - - // Something bad that probably cannot be retried has happened. - // Immediately mark the job as failed and continue with other jobs - log::info!("Marking the job as failed"); - conn.mark_benchmark_job_as_completed( - benchmark_job.id(), - BenchmarkJobConclusion::Failure, - ) - .await?; - } - BenchmarkJobError::Transient(error) => { - log::error!("Job finished with transient error: {error:?}"); - // There was some transient (i.e. I/O, network or database) error. - // Let's retry the job later, with some sleep - log::info!("Retrying after 30s..."); - tokio::time::sleep(Duration::from_secs(30)).await; + last_request_tag = Some(benchmark_job.request_tag().to_string()); - // Maybe there was a DB issue. Try to reconnect to the database. - conn = pool.connection().await; + log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); + let result = run_benchmark_job( + conn.as_mut(), + &benchmark_job, + artifact_id.clone(), + &all_compile_benchmarks, + ) + .await; + match result { + Ok(_) => { + log::info!("Job finished sucessfully"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Success, + ) + .await?; + } + Err(error) => { + match error { + BenchmarkJobError::Permanent(error) => { + log::error!("Job finished with permanent error: {error:?}"); + + // Store the error to the database + let artifact_row_id = conn.artifact_id(&artifact_id).await; + // Use a placeholder to say that the error is associated with a job, + // not with a benchmark. + conn.record_error( + artifact_row_id, + "Job failure", + &format!( + "Error while benchmarking job {benchmark_job:?}: {error:?}" + ), + Some(benchmark_job.id()), + ) + .await; + + // Something bad that probably cannot be retried has happened. + // Immediately mark the job as failed and continue with other jobs + log::info!("Marking the job as failed"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Failure, + ) + .await?; + } + BenchmarkJobError::Transient(error) => { + log::error!("Job finished with transient error: {error:?}"); + + // There was some transient (i.e. I/O, network or database) error. + // Let's retry the job later, with some sleep + log::info!("Retrying after 30s..."); + tokio::time::sleep(Duration::from_secs(30)).await; + + // Maybe there was a DB issue. Try to reconnect to the database. + conn = pool.connection().await; + } } } } + + conn.update_collector_heartbeat(collector.name()).await?; } + log::info!( + "No job found, sleeping for {}s", + JOB_WAIT_SLEEP_TIME.as_secs() + ); + tokio::time::sleep(JOB_WAIT_SLEEP_TIME).await; conn.update_collector_heartbeat(collector.name()).await?; } - log::info!("No job found, exiting"); Ok(()) }