diff --git a/database/Cargo.toml b/database/Cargo.toml index 8a2112c7f..dcfb1edf7 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -26,6 +26,7 @@ csv = "1" x509-cert = { version = "0.2.5", features = ["pem"] } intern = { path = "../intern" } +uuid = { version = "1.16.0", features = ["v4"] } [dev-dependencies] uuid = { version = "1.16.0", features = ["v4"] } diff --git a/database/src/lib.rs b/database/src/lib.rs index 1953a822c..0f4c6f1ae 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -13,9 +13,7 @@ pub mod interpolate; pub mod metric; pub mod pool; pub mod selector; - -#[cfg(test)] -mod tests; +pub mod tests; pub use pool::{Connection, Pool}; @@ -799,7 +797,7 @@ pub struct ArtifactCollection { pub end_time: DateTime, } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum BenchmarkRequestStatus { WaitingForArtifacts, ArtifactsReady, @@ -818,12 +816,34 @@ impl fmt::Display for BenchmarkRequestStatus { } } -#[derive(Debug)] +impl<'a> tokio_postgres::types::FromSql<'a> for BenchmarkRequestStatus { + fn from_sql( + ty: &tokio_postgres::types::Type, + raw: &'a [u8], + ) -> Result> { + // Decode raw bytes into &str with Postgres' own text codec + let s: &str = <&str as tokio_postgres::types::FromSql>::from_sql(ty, raw)?; + + match s { + "waiting_for_artifacts" => Ok(Self::WaitingForArtifacts), + "artifacts_ready" => Ok(Self::ArtifactsReady), + "in_progress" => Ok(Self::InProgress), + "completed" => Ok(Self::Completed), + other => Err(format!("unknown benchmark_request_status '{other}'").into()), + } + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + <&str as tokio_postgres::types::FromSql>::accepts(ty) + } +} + +#[derive(Debug, Clone, PartialEq)] pub enum BenchmarkRequestType { /// A Try commit Try { sha: String, - parent_sha: String, + parent_sha: Option, pr: u32, }, /// A Master commit @@ -864,7 +884,7 @@ impl fmt::Display for BenchmarkRequestType { } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct BenchmarkRequest { pub commit_type: BenchmarkRequestType, pub created_at: DateTime, @@ -896,7 +916,7 @@ impl BenchmarkRequest { pub fn create_try( sha: &str, - parent_sha: &str, + parent_sha: Option<&str>, pr: u32, created_at: DateTime, status: BenchmarkRequestStatus, @@ -907,7 +927,7 @@ impl BenchmarkRequest { commit_type: BenchmarkRequestType::Try { pr, sha: sha.to_string(), - parent_sha: parent_sha.to_string(), + parent_sha: parent_sha.map(|it| it.to_string()), }, created_at, completed_at: None, @@ -964,8 +984,11 @@ impl BenchmarkRequest { pub fn parent_sha(&self) -> Option<&str> { match &self.commit_type { - BenchmarkRequestType::Try { parent_sha, .. } - | BenchmarkRequestType::Master { parent_sha, .. } => Some(parent_sha), + BenchmarkRequestType::Try { parent_sha, .. } => match parent_sha { + Some(parent_sha) => Some(parent_sha), + _ => None, + }, + BenchmarkRequestType::Master { parent_sha, .. } => Some(parent_sha), BenchmarkRequestType::Release { tag: _ } => None, } } diff --git a/database/src/pool.rs b/database/src/pool.rs index c2577019a..40df7d65c 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,7 +1,7 @@ use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, CodegenBackend, - CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestStatus, + CodegenBackend, CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -195,6 +195,20 @@ pub trait Connection: Send + Sync { &self, artifact_row_id: &ArtifactIdNumber, ) -> anyhow::Result>; + + /// Gets the benchmark requests matching the status. Optionally provide the + /// number of days from whence to search from + async fn get_benchmark_requests_by_status( + &self, + statuses: &[BenchmarkRequestStatus], + ) -> anyhow::Result>; + + /// Update the status of a `benchmark_request` + async fn update_benchmark_request_status( + &mut self, + benchmark_request: &BenchmarkRequest, + benchmark_request_status: BenchmarkRequestStatus, + ) -> anyhow::Result<()>; } #[async_trait::async_trait] @@ -406,7 +420,7 @@ mod tests { let try_benchmark_request = BenchmarkRequest::create_try( "b-sha-2", - "parent-sha-2", + Some("parent-sha-2"), 32, time, BenchmarkRequestStatus::ArtifactsReady, @@ -494,4 +508,100 @@ mod tests { }) .await; } + + #[tokio::test] + async fn get_benchmark_requests_by_status() { + // Ensure we get back the requests matching the status with no date + // limit + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let master_benchmark_request = BenchmarkRequest::create_master( + "a-sha-1", + "parent-sha-1", + 42, + time, + BenchmarkRequestStatus::ArtifactsReady, + "llvm", + "", + ); + + let try_benchmark_request = BenchmarkRequest::create_try( + "b-sha-2", + Some("parent-sha-2"), + 32, + time, + BenchmarkRequestStatus::Completed, + "cranelift", + "", + ); + + let release_benchmark_request = BenchmarkRequest::create_release( + "1.8.0", + time, + BenchmarkRequestStatus::ArtifactsReady, + "cranelift,llvm", + "", + ); + + let db = db.connection().await; + db.insert_benchmark_request(&master_benchmark_request).await; + db.insert_benchmark_request(&try_benchmark_request).await; + db.insert_benchmark_request(&release_benchmark_request) + .await; + + let requests = db + .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::ArtifactsReady]) + .await + .unwrap(); + + assert_eq!(requests.len(), 2); + assert_eq!(requests[0].status, BenchmarkRequestStatus::ArtifactsReady); + assert_eq!(requests[1].status, BenchmarkRequestStatus::ArtifactsReady); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn update_benchmark_request_status() { + // Insert one item into the database, change the status and then + // get the item back out again to ensure it has changed status + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let master_benchmark_request = BenchmarkRequest::create_master( + "a-sha-1", + "parent-sha-1", + 42, + time, + BenchmarkRequestStatus::ArtifactsReady, + "llvm", + "", + ); + + let mut db = db.connection().await; + db.insert_benchmark_request(&master_benchmark_request).await; + + db.update_benchmark_request_status( + &master_benchmark_request, + BenchmarkRequestStatus::InProgress, + ) + .await + .unwrap(); + + let requests = db + .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::InProgress]) + .await + .unwrap(); + + assert_eq!(requests.len(), 1); + assert_eq!(requests[0].tag(), master_benchmark_request.tag()); + assert_eq!(requests[0].status, BenchmarkRequestStatus::InProgress); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 5c366a60d..f5a6e1117 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,9 +1,9 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, CodegenBackend, - CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, - Scenario, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, + BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, CollectionId, Commit, CommitType, + CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -1443,6 +1443,113 @@ where }) .collect()) } + + async fn get_benchmark_requests_by_status( + &self, + statuses: &[BenchmarkRequestStatus], + ) -> anyhow::Result> { + // There is a small period of time where a try commit's parent_sha + // could be NULL, this query will filter that out. + let query = " + SELECT + tag, + parent_sha, + pr, + commit_type, + status, + created_at, + completed_at, + backends, + profiles + FROM benchmark_request + WHERE status = ANY($1)" + .to_string(); + + let rows = self + .conn() + .query(&query, &[&statuses]) + .await + .context("Failed to get benchmark requests")?; + + let benchmark_requests = rows + .iter() + .map(|row| { + let tag = row.get::<_, &str>(0); + let parent_sha = row.get::<_, Option<&str>>(1); + let pr = row.get::<_, Option>(2); + let commit_type = row.get::<_, &str>(3); + let status = row.get::<_, BenchmarkRequestStatus>(4); + let created_at = row.get::<_, DateTime>(5); + let completed_at = row.get::<_, Option>>(6); + let backends = row.get::<_, &str>(7); + let profiles = row.get::<_, &str>(8); + + match commit_type { + "try" => { + let mut try_benchmark = BenchmarkRequest::create_try( + tag, + parent_sha, + pr.unwrap() as u32, + created_at, + status, + backends, + profiles, + ); + try_benchmark.completed_at = completed_at; + try_benchmark + } + "master" => { + let mut master_benchmark = BenchmarkRequest::create_master( + tag, + parent_sha.unwrap(), + pr.unwrap() as u32, + created_at, + status, + backends, + profiles, + ); + master_benchmark.completed_at = completed_at; + master_benchmark + } + "release" => { + let mut release_benchmark = BenchmarkRequest::create_release( + tag, created_at, status, backends, profiles, + ); + release_benchmark.completed_at = completed_at; + release_benchmark + } + _ => panic!( + "Invalid `commit_type` for `BenchmarkRequest` {}", + commit_type + ), + } + }) + .collect(); + Ok(benchmark_requests) + } + + async fn update_benchmark_request_status( + &mut self, + benchmark_request: &BenchmarkRequest, + benchmark_request_status: BenchmarkRequestStatus, + ) -> anyhow::Result<()> { + let tx = self + .conn_mut() + .transaction() + .await + .context("failed to start transaction")?; + + tx.execute( + "UPDATE benchmark_request SET status = $1 WHERE tag = $2;", + &[&benchmark_request_status, &benchmark_request.tag()], + ) + .await + .context("failed to execute UPDATE benchmark_request")?; + + tx.commit().await.context("failed to commit transaction")?; + + Ok(()) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { @@ -1464,6 +1571,31 @@ fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> Artifa } } +macro_rules! impl_to_postgresql_via_to_string { + ($t:ty) => { + impl tokio_postgres::types::ToSql for $t { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + { + self.to_string().to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + ::accepts(ty) + } + + // Only compile if the type is acceptable + tokio_postgres::types::to_sql_checked!(); + } + }; +} + +impl_to_postgresql_via_to_string!(BenchmarkRequestType); +impl_to_postgresql_via_to_string!(BenchmarkRequestStatus); + #[cfg(test)] mod tests { use super::make_certificates; diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 3e1e42099..0d018821c 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,8 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, CodegenBackend, CollectionId, - Commit, CommitType, CompileBenchmark, Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, BenchmarkRequestStatus, + CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -459,6 +459,17 @@ impl SqliteConnection { } } +macro_rules! no_queue_implementation_abort { + () => { + panic!( + "Queueing for SQLite has not been implemented; if you want to test the queueing \ + functionality please use Postgres. Presuming you have Docker installed, at the \ + root of the repo you can run `make start-postgres` to spin up a Postgres \ + database." + ) + }; +} + #[async_trait::async_trait] impl Connection for SqliteConnection { async fn maybe_create_indices(&mut self) { @@ -1250,7 +1261,22 @@ impl Connection for SqliteConnection { } async fn insert_benchmark_request(&self, _benchmark_request: &BenchmarkRequest) { - panic!("Queueing for SQLite has not been implemented, if you are wanting to test the queueing functionality please use postgres. Presuming you have docker installed, at the root of the repo you can run `make start-postgres` to spin up a postgres database."); + no_queue_implementation_abort!() + } + + async fn get_benchmark_requests_by_status( + &self, + _statuses: &[BenchmarkRequestStatus], + ) -> anyhow::Result> { + no_queue_implementation_abort!() + } + + async fn update_benchmark_request_status( + &mut self, + _benchmark_request: &BenchmarkRequest, + _benchmark_request_status: BenchmarkRequestStatus, + ) -> anyhow::Result<()> { + no_queue_implementation_abort!() } async fn get_compile_test_cases_with_measurements( diff --git a/database/src/tests/mod.rs b/database/src/tests/mod.rs index 3e9872a9c..6b411a9d1 100644 --- a/database/src/tests/mod.rs +++ b/database/src/tests/mod.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::future::Future; use tokio_postgres::config::Host; use tokio_postgres::Config; @@ -16,7 +18,7 @@ enum TestDb { /// Represents a connection to a Postgres database that can be /// used in integration tests to test logic that interacts with /// a database. -pub(crate) struct TestContext { +pub struct TestContext { test_db: TestDb, // Pre-cached client to avoid creating unnecessary connections in tests client: Pool, @@ -27,7 +29,7 @@ impl TestContext { let config: Config = db_url.parse().expect("Cannot parse connection string"); // Create a new database that will be used for this specific test - let client = make_client(&db_url) + let client = make_client(db_url) .await .expect("Cannot connect to database"); let db_name = format!("db{}", uuid::Uuid::new_v4().to_string().replace("-", "")); @@ -48,10 +50,6 @@ impl TestContext { // cfg-gated to keep non-unix builds happy. #[cfg(unix)] Host::Unix(_) => panic!("Unix sockets in Postgres connection string are not supported"), - - // On non-unix targets the enum has no other variants. - #[cfg(not(unix))] - _ => unreachable!("non-TCP hosts cannot appear on this platform"), }; // We need to connect to the database against, because Postgres doesn't allow @@ -85,7 +83,7 @@ impl TestContext { } } - pub(crate) fn db_client(&self) -> &Pool { + pub fn db_client(&self) -> &Pool { &self.client } @@ -114,7 +112,7 @@ impl TestContext { } /// Runs a test against an actual postgres database. -pub(crate) async fn run_postgres_test(f: F) +pub async fn run_postgres_test(f: F) where F: Fn(TestContext) -> Fut, Fut: Future>, @@ -141,7 +139,8 @@ where /// Runs a test against an actual database. /// Checks both Postgres and SQLite. -pub(crate) async fn run_db_test(f: F) +#[allow(dead_code)] +pub async fn run_db_test(f: F) where F: Fn(TestContext) -> Fut + Clone, Fut: Future>, diff --git a/site/src/job_queue.rs b/site/src/job_queue.rs index aa3b31e2e..7ed8ad133 100644 --- a/site/src/job_queue.rs +++ b/site/src/job_queue.rs @@ -1,19 +1,21 @@ use std::{str::FromStr, sync::Arc}; -use crate::load::SiteCtxt; +use crate::load::{partition_in_place, SiteCtxt}; use chrono::Utc; -use database::{BenchmarkRequest, BenchmarkRequestStatus}; +use database::{BenchmarkRequest, BenchmarkRequestStatus, BenchmarkRequestType}; +use hashbrown::HashSet; use parking_lot::RwLock; use tokio::time::{self, Duration}; /// Store the latest master commits or do nothing if all of them are /// already in the database -async fn enqueue_master_commits(ctxt: &Arc) { - let conn = ctxt.conn().await; +async fn create_benchmark_request_master_commits( + ctxt: &Arc, + conn: &dyn database::pool::Connection, +) -> anyhow::Result<()> { let master_commits = &ctxt.get_master_commits().commits; // TODO; delete at some point in the future - let cutoff: chrono::DateTime = - chrono::DateTime::from_str("2025-06-01T00:00:00.000Z").unwrap(); + let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; for master_commit in master_commits { // We don't want to add masses of obsolete data @@ -31,12 +33,161 @@ async fn enqueue_master_commits(ctxt: &Arc) { conn.insert_benchmark_request(&benchmark).await; } } + Ok(()) +} + +/// Sorts try and master requests that are in the `ArtifactsReady` status. +/// Doesn't consider in-progress requests or release artifacts. +fn sort_benchmark_requests(done: &HashSet, request_queue: &mut [BenchmarkRequest]) { + let mut done: HashSet = done.iter().cloned().collect(); + + // Ensure all the items are ready to be sorted, if they are not this is + // undefined behaviour + assert!(request_queue.iter().all(|bmr| { + bmr.status == BenchmarkRequestStatus::ArtifactsReady + && matches!( + bmr.commit_type, + BenchmarkRequestType::Master { .. } | BenchmarkRequestType::Try { .. } + ) + })); + + let mut finished = 0; + while finished < request_queue.len() { + // The next level is those elements in the unordered queue which + // are ready to be benchmarked (i.e., those with parent in done or no + // parent). + let level_len = partition_in_place(request_queue[finished..].iter_mut(), |bmr| { + bmr.parent_sha().is_none_or(|parent| done.contains(parent)) + }); + + // No commit is ready for benchmarking. This can happen e.g. when a try parent commit + // was forcefully removed from the master branch of rust-lang/rust. In this case, just + // let the commits be benchmarked in the current order that we have, these benchmark runs + // just won't have a parent result available. + if level_len == 0 { + if cfg!(test) { + panic!("No commit is ready for benchmarking"); + } else { + log::warn!("No commit is ready for benchmarking"); + return; + } + } + + // Everything in level has the same topological order, then we sort based on heuristics + let level = &mut request_queue[finished..][..level_len]; + level.sort_unstable_by_key(|bmr| { + ( + // Pr number takes priority + *bmr.pr().unwrap_or(&0), + // Order master commits before try commits + match bmr.commit_type { + BenchmarkRequestType::Try { .. } => 1, + BenchmarkRequestType::Master { .. } => 0, + BenchmarkRequestType::Release { .. } => unreachable!(), + }, + bmr.created_at, + ) + }); + for c in level { + done.insert(c.tag().to_string()); + } + finished += level_len; + } +} + +pub trait ExtractIf { + fn extract_if_stable(&mut self, predicate: F) -> Vec + where + F: FnMut(&T) -> bool; +} + +/// Vec method `extract_if` is unstable, this very simple implementation +/// can be deleted once it is stable +impl ExtractIf for Vec { + fn extract_if_stable(&mut self, mut predicate: F) -> Vec + where + F: FnMut(&T) -> bool, + { + let mut extracted = Vec::new(); + let mut i = 0; + + while i < self.len() { + if predicate(&self[i]) { + extracted.push(self.remove(i)); + } else { + i += 1; + } + } + extracted + } +} + +/// Assumes that master/release artifacts have been put into the DB. +pub async fn build_queue( + conn: &mut dyn database::pool::Connection, + completed_set: &HashSet, +) -> anyhow::Result> { + let mut pending = conn + .get_benchmark_requests_by_status(&[ + BenchmarkRequestStatus::InProgress, + BenchmarkRequestStatus::ArtifactsReady, + ]) + .await?; + + // The queue starts with in progress + let mut queue: Vec = pending + .extract_if_stable(|request| matches!(request.status, BenchmarkRequestStatus::InProgress)); + + // We sort the in-progress ones based on the started date + queue.sort_unstable_by(|a, b| a.created_at.cmp(&b.created_at)); + + // Add release artifacts ordered by the release tag (1.87.0 before 1.88.0) and `created_at`. + let mut release_artifacts: Vec = pending.extract_if_stable(|request| { + matches!(request.commit_type, BenchmarkRequestType::Release { .. }) + }); + + release_artifacts.sort_unstable_by(|a, b| { + a.tag() + .cmp(b.tag()) + .then_with(|| a.created_at.cmp(&b.created_at)) + }); + + queue.append(&mut release_artifacts); + sort_benchmark_requests(completed_set, &mut pending); + queue.append(&mut pending); + Ok(queue) +} + +/// Enqueue the job into the job_queue +async fn enqueue_next_job(conn: &mut dyn database::pool::Connection) -> anyhow::Result<()> { + // We draw back all completed requests + let completed: HashSet = conn + .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::Completed]) + .await? + .into_iter() + .map(|request| request.tag().to_string()) + .collect(); + + let queue = build_queue(conn, &completed).await?; + + if let Some(request) = queue.into_iter().next() { + if request.status != BenchmarkRequestStatus::InProgress { + // TODO: actually enqueue the jobs + conn.update_benchmark_request_status(&request, BenchmarkRequestStatus::InProgress) + .await?; + } + } + + Ok(()) } /// For queueing jobs, add the jobs you want to queue to this function -async fn cron_enqueue_jobs(site_ctxt: &Arc) { +async fn cron_enqueue_jobs(site_ctxt: &Arc) -> anyhow::Result<()> { + let mut conn = site_ctxt.conn().await; // Put the master commits into the `benchmark_requests` queue - enqueue_master_commits(site_ctxt).await; + create_benchmark_request_master_commits(site_ctxt, &*conn).await?; + enqueue_next_job(&mut *conn).await?; + Ok(()) } /// Entry point for the cron @@ -51,8 +202,404 @@ pub async fn cron_main(site_ctxt: Arc>>>, seconds: u let guard = ctxt.read(); guard.as_ref().cloned() } { - cron_enqueue_jobs(&ctxt_clone).await; - log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()); + match cron_enqueue_jobs(&ctxt_clone).await { + Ok(_) => log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()), + Err(e) => log::error!("Cron job failed to execute {}", e), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Datelike, Duration, TimeZone, Utc}; + + use database::tests::run_postgres_test; + + fn days_ago(day_str: &str) -> chrono::DateTime { + // Walk backwards until the first non-digit, then slice + let days = day_str + .strip_prefix("days") + .unwrap() + .parse::() + .unwrap(); + + let timestamp = Utc::now() - Duration::days(days); + // zero out the seconds + Utc.with_ymd_and_hms( + timestamp.year(), + timestamp.month(), + timestamp.day(), + 0, + 0, + 0, + ) + .unwrap() + } + + fn create_master(sha: &str, parent: &str, pr: u32, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_master( + sha, + parent, + pr, + days_ago(age_days), + BenchmarkRequestStatus::ArtifactsReady, + "", + "", + ) + } + + fn create_try(sha: &str, parent: &str, pr: u32, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_try( + sha, + Some(parent), + pr, + days_ago(age_days), + BenchmarkRequestStatus::ArtifactsReady, + "", + "", + ) + } + + fn create_release(tag: &str, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_release( + tag, + days_ago(age_days), + BenchmarkRequestStatus::ArtifactsReady, + "", + "", + ) + } + + async fn db_insert_requests( + conn: &dyn database::pool::Connection, + requests: &[BenchmarkRequest], + ) { + for request in requests { + conn.insert_benchmark_request(&request).await; + } + } + + /// Get an `InProgress` item out of the `benchmark_requests` table. In + /// practice this is the job that has been enqueued. + async fn get_in_progress(conn: &dyn database::pool::Connection) -> Option { + conn.get_benchmark_requests_by_status(&[BenchmarkRequestStatus::InProgress]) + .await + .unwrap() + .first() + .cloned() + } + + fn queue_order_matches(queue: &[BenchmarkRequest], expected: &[&str]) { + let queue_shas: Vec<&str> = queue.iter().map(|req| req.tag()).collect(); + assert_eq!(queue_shas, expected) + } + + trait BenchmarkRequestExt { + fn with_status(self, status: BenchmarkRequestStatus) -> Self; + } + + impl BenchmarkRequestExt for BenchmarkRequest { + fn with_status(mut self, status: BenchmarkRequestStatus) -> Self { + self.status = status; + self } } + + /// Nothing to do, empty table + #[tokio::test] + async fn enqueue_next_job_no_jobs() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + + enqueue_next_job(&mut *db).await?; + + let in_progress = get_in_progress(&*db).await; + + assert!(in_progress.is_none()); + Ok(ctx) + }) + .await; + } + + /// Parent completed -> child is picked + #[tokio::test] + async fn get_next_benchmark_request_completed_parent() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + let parent = + create_master("a", "x", 1, "days5").with_status(BenchmarkRequestStatus::Completed); + let child = create_master("b", "a", 1, "days5"); + + db_insert_requests(&*db, &[parent, child]).await; + + enqueue_next_job(&mut *db).await?; + + let in_progress = get_in_progress(&*db).await; + + assert_eq!(in_progress.unwrap().tag(), "b"); + Ok(ctx) + }) + .await; + } + + /// Release (no parent) is always eligible + #[tokio::test] + async fn get_next_benchmark_request_no_parent_release() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + let release = create_release("v1.2.3", "days2"); + + db_insert_requests(&*db, &[release]).await; + + enqueue_next_job(&mut *db).await?; + + let in_progress = get_in_progress(&*db).await; + + assert_eq!(in_progress.unwrap().tag(), "v1.2.3"); + Ok(ctx) + }) + .await; + } + + /// Parent exists but is older -> parent gets picked + #[tokio::test] + async fn get_next_benchmark_request_oldest_first() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + let c1 = create_master("x", "x", 1, "days521") + .with_status(BenchmarkRequestStatus::Completed); + let c2 = create_master("y", "y", 2, "days521") + .with_status(BenchmarkRequestStatus::Completed); + + let m1 = create_master("old", "x", 3, "days45"); + let m2 = create_master("new", "y", 4, "days1"); + + db_insert_requests(&*db, &[c1, c2, m1, m2]).await; + enqueue_next_job(&mut *db).await?; + + let in_progress = get_in_progress(&*db).await; + + assert_eq!(in_progress.unwrap().tag(), "old"); + Ok(ctx) + }) + .await; + } + + /// Parent SHA missing entirely -> child is ready + #[cfg(unix)] // test will not panic on windows and would be skipped entirely + #[tokio::test] + #[should_panic(expected = "No commit is ready for benchmarking")] + async fn get_next_benchmark_request_missing_parent() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + let orphan = create_master("orphan", "gone", 42, "days1"); + + db_insert_requests(&*db, &[orphan]).await; + enqueue_next_job(&mut *db).await?; + + let in_progress = get_in_progress(&*db).await; + assert_eq!(in_progress.unwrap().tag(), "orphan"); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn get_next_benchmark_request_large_mixture() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + // Fresh parents that will unblock some children + let parent_master = create_master("parent_m", "x", 911, "days5") + .with_status(BenchmarkRequestStatus::Completed); + let parent_try = create_try("parent_t", "x", 888, "days4") + .with_status(BenchmarkRequestStatus::Completed); + let parent_master_two = create_master("gp", "x", 922, "days5") + .with_status(BenchmarkRequestStatus::Completed); + let parent_master_three = create_master("blocked_p", "x", 932, "days5") + .with_status(BenchmarkRequestStatus::Completed); + + // Two releases, the older one should win overall + let rel_old = create_release("v0.8.0", "days40"); // 40days old + let rel_new = create_release("v1.0.0", "days10"); + + // Ready masters (parents completed) + let master_low_pr = create_master("m_low", "parent_m", 1, "days12"); + let master_high_pr = create_master("m_high", "parent_m", 7, "days8"); + + let blocked_parent = create_master("blocked_p", "gp", 0, "days3"); + let master_blocked = create_master("blocked_c", "blocked_p", 0, "days1"); + + // A try commit that is ready + let try_ready = create_try("t_ready", "parent_t", 42, "days2"); + + let requests = vec![ + parent_master, + parent_master_two, + parent_master_three, + parent_try, + master_high_pr, + master_low_pr, + master_blocked, + blocked_parent, + try_ready, + rel_old, + rel_new, + ]; + + db_insert_requests(&*db, &requests).await; + enqueue_next_job(&mut *db).await?; + + // The oldest release ("v0.8.0") outranks everything else + let in_progress = get_in_progress(&*db).await; + assert_eq!(in_progress.unwrap().tag(), "v0.8.0"); + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn get_next_benchmark_request_large_mixture_no_release() { + run_postgres_test(|ctx| async { + let mut db = ctx.db_client().connection().await; + // Fresh parents that will unblock some children + let parent_master = create_master("parent_m", "x", 8, "days5") + .with_status(BenchmarkRequestStatus::Completed); + let parent_try = create_try("parent_t", "x", 9, "days4") + .with_status(BenchmarkRequestStatus::Completed); + let parent_master_two = create_master("gp", "x", 10, "days5") + .with_status(BenchmarkRequestStatus::Completed); + let parent_master_three = create_master("blocked_p", "x", 11, "days5") + .with_status(BenchmarkRequestStatus::Completed); + + // Ready masters (parents completed) + let m1 = create_master("m_low", "parent_m", 3, "days12"); + let m2 = create_master("m_high", "parent_m", 7, "days8"); + + let m3 = create_master("B", "gp", 1, "days3"); + let m4 = create_master("C", "blocked_p", 2, "days1"); + + // A try commit that is ready + let t1 = create_try("t_ready", "parent_t", 42, "days2"); + + let requests = vec![ + parent_master, + parent_master_two, + parent_master_three, + parent_try, + m2, + m1, + m4, + m3, + t1, + ]; + + db_insert_requests(&*db, &requests).await; + enqueue_next_job(&mut *db).await?; + + // The oldest release ("v0.8.0") outranks everything else + let in_progress = get_in_progress(&*db).await; + assert_eq!(in_progress.unwrap().tag(), "B"); + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn queue_ordering() { + run_postgres_test(|ctx| async { + /* Key: + * +---------------------+ + * | m - master | + * | t - try | + * | r - release | + * | C - Completed | + * | R - Artifacts Ready | + * | IP - In Progress | + * +---------------------+ + * + * This is the graph we have: + * 2: A release + * +------------+ + * | r "v1.2.3" | + * +------------+ + * 1: Currently `in_progress` + * +---------------+ + * +--->| t "t1" IP pr1 | + * | +---------------+ + * +-----------+ | + * | m "rrr" C | -----+--> + * +-----------+ | + * | +---------------+ + * +--->| t "yee" R pr1 | 3: a try with a low pr + * +---------------+ + * +-----------+ + * | m "aaa" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "mmm" R pr88 | 6: a master commit + * +----------------+ + * + * +-----------+ + * | m "345" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "123" R pr11 | 4: a master commit, high pr number + * +----------------+ + * + * + * +-----------+ + * | m "bar" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "foo" R pr77 | 5: a master commit + * +----------------+ + * | + * V + * +---------------+ + * | t "baz" R pr4 | 7: a try with a low pr, blocked by parent + * +---------------+ + * + * The master commits should take priority, then "yee" followed + * by "baz" + **/ + + let mut db = ctx.db_client().connection().await; + let requests = vec![ + create_master("foo", "bar", 77, "days2"), + create_master("123", "345", 11, "days2"), + create_try("baz", "foo", 4, "days1"), + create_release("v.1.2.3", "days2"), + create_try("yee", "rrr", 1, "days2"), // lower PR number takes priority + create_try("t1", "rrr", 1, "days1").with_status(BenchmarkRequestStatus::InProgress), + create_master("mmm", "aaa", 88, "days2"), + ]; + + db_insert_requests(&*db, &requests).await; + + let completed: HashSet = HashSet::from([ + "bar".to_string(), + "345".to_string(), + "rrr".to_string(), + "aaa".to_string(), + ]); + + let sorted: Vec = build_queue(&mut *db, &completed).await.unwrap(); + + queue_order_matches( + &sorted, + &["t1", "v.1.2.3", "yee", "123", "foo", "mmm", "baz"], + ); + Ok(ctx) + }) + .await; + } } diff --git a/site/src/load.rs b/site/src/load.rs index 2e21c2d7c..6693eff6a 100644 --- a/site/src/load.rs +++ b/site/src/load.rs @@ -481,7 +481,7 @@ fn sort_queue( } // Copy of Iterator::partition_in_place, which is currently unstable. -fn partition_in_place<'a, I, T: 'a, P>(mut iter: I, mut predicate: P) -> usize +pub fn partition_in_place<'a, I, T: 'a, P>(mut iter: I, mut predicate: P) -> usize where I: Sized + DoubleEndedIterator, P: FnMut(&T) -> bool,