diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index c8c32566c..e91720dcd 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -59,7 +59,7 @@ use collector::toolchain::{ }; use collector::utils::cachegrind::cachegrind_diff; use collector::utils::{is_installed, wait_for_future}; -use collector::{utils, CollectorCtx, CollectorStepBuilder}; +use collector::{command_output, utils, CollectorCtx, CollectorStepBuilder}; use database::{ ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, CollectorConfig, Commit, CommitType, Connection, Pool, @@ -708,6 +708,16 @@ enum Commands { #[arg(long)] collector_name: String, + /// Git SHA of the commit that the collector is currently on. + /// If not present, the collector will attempt to figure it out from git directly. + #[arg(long)] + git_sha: Option, + + /// Periodically check if the collector's commit SHA matches the commit SHA of the + /// rustc-perf repository. + #[arg(long)] + check_git_sha: bool, + #[command(flatten)] db: DbOption, }, @@ -1352,9 +1362,26 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc Ok(0) } - Commands::BenchmarkJobQueue { collector_name, db } => { + Commands::BenchmarkJobQueue { + collector_name, + git_sha, + check_git_sha, + db, + } => { log_db(&db); + let git_sha = match git_sha { + Some(sha) => sha, + None => { + let mut cmd = Command::new("git"); + cmd.args(["rev-parse", "HEAD"]); + let stdout = command_output(&mut cmd) + .context("Cannot determine current commit SHA")? + .stdout; + String::from_utf8(stdout).unwrap().trim().to_string() + } + }; + let pool = Pool::open(&db.db); let rt = build_async_runtime(); let conn = rt.block_on(pool.connection()); @@ -1362,7 +1389,7 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc // Obtain the configuration and validate that it matches the // collector's host target let collector_config = rt - .block_on(conn.get_collector_config(&collector_name))? + .block_on(conn.start_collector(&collector_name, &git_sha))? .ok_or_else(|| { anyhow::anyhow!("Collector with name `{collector_name}` not found") })?; @@ -1374,6 +1401,13 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc )); } + log::info!( + "Starting collector with target {}, benchmark set {} and commit {}", + collector_config.target(), + collector_config.benchmark_set().get_id(), + collector_config.commit_sha().expect("missing commit SHA") + ); + let benchmarks = get_compile_benchmarks(&compile_benchmark_dir, CompileBenchmarkFilter::All)?; @@ -1382,6 +1416,7 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc conn, &collector_config, benchmarks, + check_git_sha, ))?; Ok(0) @@ -1397,10 +1432,10 @@ async fn run_job_queue_benchmarks( mut conn: Box, collector: &CollectorConfig, all_compile_benchmarks: Vec, + check_git_sha: bool, ) -> anyhow::Result<()> { - conn.update_collector_heartbeat(collector.name()).await?; + let mut last_request_tag = None; - // TODO: check collector SHA vs site SHA while let Some((benchmark_job, artifact_id)) = conn .dequeue_benchmark_job( collector.name(), @@ -1409,6 +1444,22 @@ async fn run_job_queue_benchmarks( ) .await? { + // Here we check if we should update our commit SHA, if rustc-perf has been updated. + // We only check for updates when we switch *benchmark requests*, not *benchmark jobs*, + // to avoid changing code in the middle of benchmarking the same request. + // Note that if an update happens, the job that we have just dequeued will have its deque + // counter increased. But since updates are relatively rare, that shouldn't be a big deal, + // it will be dequeued again when the collector starts again. + if check_git_sha + && last_request_tag.is_some() + && last_request_tag.as_deref() != Some(benchmark_job.request_tag()) + && needs_git_update(collector) + { + log::warn!("Exiting collector to update itself from git."); + return Ok(()); + } + last_request_tag = Some(benchmark_job.request_tag().to_string()); + log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); let result = run_benchmark_job( conn.as_mut(), @@ -1472,6 +1523,39 @@ async fn run_job_queue_benchmarks( Ok(()) } +/// Returns true if the commit SHA of collector does not match the latest commit SHA of the master +/// branch of https://github.com/rust-lang/rustc-perf. +fn needs_git_update(collector: &CollectorConfig) -> bool { + let Some(commit_sha) = collector.commit_sha() else { + return false; + }; + + let mut cmd = Command::new("git"); + cmd.arg("ls-remote") + .arg("https://github.com/rust-lang/rustc-perf") + .arg("HEAD"); + let upstream_sha = match command_output(&mut cmd) { + Ok(output) => String::from_utf8(output.stdout) + .unwrap() + .split_whitespace() + .next() + .unwrap() + .to_string(), + Err(error) => { + log::error!("Cannot determine latest SHA of rustc-perf: {error:?}"); + return false; + } + }; + if commit_sha != upstream_sha { + log::warn!( + "Commit {commit_sha} of collector is outdated, latest commit is {upstream_sha}." + ); + true + } else { + false + } +} + /// Error that happened during benchmarking of a job. enum BenchmarkJobError { /// The error is non-recoverable. diff --git a/database/src/lib.rs b/database/src/lib.rs index 78655501d..b7e7e8d66 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1204,6 +1204,8 @@ pub struct CollectorConfig { is_active: bool, last_heartbeat_at: DateTime, date_added: DateTime, + /// The commit SHA of `rustc-perf` that the collector currently has checked out. + commit_sha: Option, } impl CollectorConfig { @@ -1230,6 +1232,10 @@ impl CollectorConfig { pub fn date_added(&self) -> DateTime { self.date_added } + + pub fn commit_sha(&self) -> Option<&str> { + self.commit_sha.as_deref() + } } /// The data that can be retrived from the database directly to populate the diff --git a/database/src/pool.rs b/database/src/pool.rs index 1d1bb24d1..c21b1bd5c 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -244,10 +244,13 @@ pub trait Connection: Send + Sync { is_active: bool, ) -> anyhow::Result; - /// Get the configuration for a collector by its name. - async fn get_collector_config( + /// Call this function when a job queue collector starts. + /// It ensures that a collector with the given name exists, updates its commit SHA and heartbeat + /// and returns its collector config. + async fn start_collector( &self, collector_name: &str, + commit_sha: &str, ) -> anyhow::Result>; /// Dequeues a single job for the given collector, target and benchmark set. @@ -788,7 +791,7 @@ mod tests { run_postgres_test(|ctx| async { let db = ctx.db_client().connection().await; - let collector_config_result = db.get_collector_config("collector-1").await.unwrap(); + let collector_config_result = db.start_collector("collector-1", "foo").await.unwrap(); assert!(collector_config_result.is_none()); @@ -802,19 +805,20 @@ mod tests { run_postgres_test(|ctx| async { let db = ctx.db_client().connection().await; - let inserted_config = db + let mut inserted_config = db .add_collector_config("collector-1", Target::X86_64UnknownLinuxGnu, 1, true) .await .unwrap(); let config = db - .get_collector_config("collector-1") + .start_collector("collector-1", "foo") .await .unwrap() .expect("collector config not found"); - // What we entered into the database should be identical to what is - // returned from the database + inserted_config.commit_sha = Some("foo".to_string()); + inserted_config.last_heartbeat_at = config.last_heartbeat_at; + assert_eq!(inserted_config, config); Ok(ctx) }) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 6f48ba654..e72ea8911 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -384,6 +384,9 @@ static MIGRATIONS: &[&str] = &[ r#" ALTER TABLE benchmark_request ADD COLUMN duration_ms INTEGER NULL; "#, + r#" + ALTER TABLE collector_config ADD COLUMN commit_sha TEXT NULL; + "#, ]; #[async_trait::async_trait] @@ -1732,28 +1735,33 @@ where is_active, last_heartbeat_at: row.get::<_, DateTime>(0), date_added: row.get::<_, DateTime>(1), + commit_sha: None, }; Ok(collector_config) } - async fn get_collector_config( + async fn start_collector( &self, collector_name: &str, + commit_sha: &str, ) -> anyhow::Result> { let row = self .conn() .query_opt( - "SELECT + " + UPDATE collector_config + SET + last_heartbeat_at = NOW(), + commit_sha = $2 + WHERE + name = $1 + RETURNING target, benchmark_set, is_active, last_heartbeat_at, - date_added - FROM - collector_config - WHERE - name = $1;", - &[&collector_name], + date_added", + &[&collector_name, &commit_sha], ) .await?; @@ -1767,6 +1775,7 @@ where is_active: row.get::<_, bool>(2), last_heartbeat_at: row.get::<_, DateTime>(3), date_added: row.get::<_, DateTime>(4), + commit_sha: Some(commit_sha.to_string()), }) }) .transpose()?) @@ -1803,6 +1812,7 @@ where WHEN status = $1 THEN 1 ELSE 2 END, + request_tag, created_at LIMIT 1 FOR UPDATE SKIP LOCKED @@ -2179,25 +2189,29 @@ where benchmark_set, is_active, last_heartbeat_at, - date_added + date_added, + commit_sha FROM collector_config;", &[], ) .await?; - let mut configs = vec![]; - for row in rows { - let config = CollectorConfig { - name: row.get::<_, String>(0), - target: Target::from_str(row.get::<_, &str>(1)).map_err(|e| anyhow::anyhow!(e))?, - benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32), - is_active: row.get::<_, bool>(3), - last_heartbeat_at: row.get::<_, DateTime>(4), - date_added: row.get::<_, DateTime>(5), - }; - configs.push(config); - } + let configs = rows + .into_iter() + .map(|row| { + Ok(CollectorConfig { + name: row.get::<_, String>(0), + target: Target::from_str(row.get::<_, &str>(1)) + .map_err(|e| anyhow::anyhow!(e))?, + benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32), + is_active: row.get::<_, bool>(3), + last_heartbeat_at: row.get::<_, DateTime>(4), + date_added: row.get::<_, DateTime>(5), + commit_sha: row.get::<_, Option>(6), + }) + }) + .collect::>>()?; Ok(configs) } diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 08d011f46..a0e2a19b3 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1332,9 +1332,10 @@ impl Connection for SqliteConnection { .collect::>()?) } - async fn get_collector_config( + async fn start_collector( &self, _collector_name: &str, + _commit_sha: &str, ) -> anyhow::Result> { no_queue_implementation_abort!() }