Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions collector/collect-job-queue.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/bash

# This script expects DATABASE and COLLECTOR_NAME to be defined in the environment

set -u -o pipefail

echo "Running job queue collector"

export RUST_LOG=collector=trace,collector::sysroot=debug
export PATH="/home/collector/.cargo/bin:$PATH"

while : ; do
# Update and rebuild the collector.
git pull
git reset --hard @{upstream}

# Make sure we have a recent build, so that we can successfully build
# the collector.
rustup update stable
cargo +stable build --release -p collector

CURRENT_SHA=`git rev-parse HEAD`

target/release/collector benchmark_job_queue \
--db "${DATABASE}" \
--check_git_sha \
--git_sha "${CURRENT_SHA}"
--collector_name "${COLLECTOR_NAME}"

STATUS=$?
echo finished run at `date` with exit code $STATUS

# Wait a bit if the command has failed.
if [ $STATUS -ne 0 ]; then
sleep 60
fi
done
172 changes: 96 additions & 76 deletions collector/src/bin/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,9 @@ fn get_host_tuple_from_rustc(rustc: &str) -> anyhow::Result<String> {
/// Maximum number of failures before a job will be marked as failed.
const MAX_JOB_FAILS: u32 = 3;

/// How long should the collector sleep for if it does not find any job in the job queue.
const JOB_WAIT_SLEEP_TIME: Duration = Duration::from_secs(30);

async fn run_job_queue_benchmarks(
pool: Pool,
mut conn: Box<dyn Connection>,
Expand All @@ -1504,96 +1507,113 @@ async fn run_job_queue_benchmarks(

let mut last_request_tag = None;

while let Some((benchmark_job, artifact_id)) = conn
.dequeue_benchmark_job(
collector.name(),
collector.target(),
collector.benchmark_set(),
)
.await?
{
// Are we benchmarking a different benchmark request than in the previous iteration of the
// loop?
let is_new_request = last_request_tag.is_some()
&& last_request_tag.as_deref() != Some(benchmark_job.request_tag());
if is_new_request {
let _ = tidy_toolchain_cache_dir();
}

// Here we check if we should update our commit SHA, if rustc-perf has been updated.
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
// to avoid changing code in the middle of benchmarking the same request.
// Note that if an update happens, the job that we have just dequeued will have its deque
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
// it will be dequeued again when the collector starts again.
if check_git_sha && is_new_request && needs_git_update(collector) {
// Outer loop - wait for some jobs to appear
loop {
if check_git_sha && needs_git_update(collector) {
log::warn!("Exiting collector to update itself from git.");
return Ok(());
break;
}

last_request_tag = Some(benchmark_job.request_tag().to_string());
while let Some((benchmark_job, artifact_id)) = conn
.dequeue_benchmark_job(
collector.name(),
collector.target(),
collector.benchmark_set(),
)
.await?
{
// Are we benchmarking a different benchmark request than in the previous iteration of the
// loop?
let is_new_request = last_request_tag.is_none()
|| (last_request_tag.as_deref() != Some(benchmark_job.request_tag()));
if is_new_request {
log::info!("Starting new request {}", benchmark_job.request_tag());
let _ = tidy_toolchain_cache_dir();
}

log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
let result = run_benchmark_job(
conn.as_mut(),
&benchmark_job,
artifact_id.clone(),
&all_compile_benchmarks,
)
.await;
match result {
Ok(_) => {
log::info!("Job finished sucessfully");
conn.mark_benchmark_job_as_completed(
benchmark_job.id(),
BenchmarkJobConclusion::Success,
)
.await?;
// Here we check if we should update our commit SHA, if rustc-perf has been updated.
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
// to avoid changing code in the middle of benchmarking the same request.
// Note that if an update happens, the job that we have just dequeued will have its deque
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
// it will be dequeued again when the collector starts again.
if check_git_sha && is_new_request && needs_git_update(collector) {
log::warn!("Exiting collector to update itself from git.");
return Ok(());
}
Err(error) => {
match error {
BenchmarkJobError::Permanent(error) => {
log::error!("Job finished with permanent error: {error:?}");

// Store the error to the database
let artifact_row_id = conn.artifact_id(&artifact_id).await;
// Use a <job> placeholder to say that the error is associated with a job,
// not with a benchmark.
conn.record_error(
artifact_row_id,
"Job failure",
&format!("Error while benchmarking job {benchmark_job:?}: {error:?}"),
Some(benchmark_job.id()),
)
.await;

// Something bad that probably cannot be retried has happened.
// Immediately mark the job as failed and continue with other jobs
log::info!("Marking the job as failed");
conn.mark_benchmark_job_as_completed(
benchmark_job.id(),
BenchmarkJobConclusion::Failure,
)
.await?;
}
BenchmarkJobError::Transient(error) => {
log::error!("Job finished with transient error: {error:?}");

// There was some transient (i.e. I/O, network or database) error.
// Let's retry the job later, with some sleep
log::info!("Retrying after 30s...");
tokio::time::sleep(Duration::from_secs(30)).await;
last_request_tag = Some(benchmark_job.request_tag().to_string());

// Maybe there was a DB issue. Try to reconnect to the database.
conn = pool.connection().await;
log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
let result = run_benchmark_job(
conn.as_mut(),
&benchmark_job,
artifact_id.clone(),
&all_compile_benchmarks,
)
.await;
match result {
Ok(_) => {
log::info!("Job finished sucessfully");
conn.mark_benchmark_job_as_completed(
benchmark_job.id(),
BenchmarkJobConclusion::Success,
)
.await?;
}
Err(error) => {
match error {
BenchmarkJobError::Permanent(error) => {
log::error!("Job finished with permanent error: {error:?}");

// Store the error to the database
let artifact_row_id = conn.artifact_id(&artifact_id).await;
// Use a <job> placeholder to say that the error is associated with a job,
// not with a benchmark.
conn.record_error(
artifact_row_id,
"Job failure",
&format!(
"Error while benchmarking job {benchmark_job:?}: {error:?}"
),
Some(benchmark_job.id()),
)
.await;

// Something bad that probably cannot be retried has happened.
// Immediately mark the job as failed and continue with other jobs
log::info!("Marking the job as failed");
conn.mark_benchmark_job_as_completed(
benchmark_job.id(),
BenchmarkJobConclusion::Failure,
)
.await?;
}
BenchmarkJobError::Transient(error) => {
log::error!("Job finished with transient error: {error:?}");

// There was some transient (i.e. I/O, network or database) error.
// Let's retry the job later, with some sleep
log::info!("Retrying after 30s...");
tokio::time::sleep(Duration::from_secs(30)).await;

// Maybe there was a DB issue. Try to reconnect to the database.
conn = pool.connection().await;
}
}
}
}

conn.update_collector_heartbeat(collector.name()).await?;
}

log::info!(
"No job found, sleeping for {}s",
JOB_WAIT_SLEEP_TIME.as_secs()
);
tokio::time::sleep(JOB_WAIT_SLEEP_TIME).await;
conn.update_collector_heartbeat(collector.name()).await?;
}
log::info!("No job found, exiting");
Ok(())
}

Expand Down
Loading