diff --git a/database/schema.md b/database/schema.md index 3d8ab3b5b..f13e78e9c 100644 --- a/database/schema.md +++ b/database/schema.md @@ -313,3 +313,36 @@ Columns: execute. * **is_active** (`boolean NOT NULL`): For controlling whether the collector is active for use. Useful for adding/removing collectors. + +### job_queue + +This table stores ephemeral benchmark jobs, which specifically tell the +collector which benchmarks it should execute. The jobs will be kept in the +table for ~30 days after being completed, so that we can quickly figure out +what master parent jobs we need to backfill when handling try builds. + +Columns: + +- **id** (`bigint` / `serial`): Primary-key identifier for the job row; + auto-increments with each new job. +- **request_id** (`bigint`): References the parent benchmark request that + spawned this job. +- **target** (`text NOT NULL`): Hardware/ISA the benchmarks must run on + (e.g. AArch64, x86_64). +- **backend** (`text NOT NULL`): Code generation backend the collector should + test (e.g. llvm, cranelift). +- **benchmark_set** (`int NOT NULL`): ID of the predefined benchmark suite to + execute. +- **collector_id** (`text`): Id of the collector that claimed the job + (populated once the job is started). +- **created_at** (`timestamptz NOT NULL`): Datetime when the job was queued. +- **started_at** (`timestamptz`): Datetime when the collector actually began + running the benchmarks; NULL until the job is claimed. +- **completed_at** (`timestampt`): Datetime when the collector finished + (successfully or otherwise); used to purge rows after ~30 days. +- **status** (`text NOT NULL`): Current job state. `queued`, `in_progress`, + `success`, or `failure`. +- **retry** (`int NOT NULL`): Number of times the job has been re-queued after + a failure; 0 on the first attempt. +- **error** (`text`): Optional error message or stack trace from the last + failed run; NULL when the job succeeded. diff --git a/database/src/lib.rs b/database/src/lib.rs index fc2510d99..3d4638de9 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1026,6 +1026,10 @@ impl BenchmarkRequest { .collect::, _>>() .map_err(|e| anyhow::anyhow!("Invalid backend: {e}")) } + + pub fn is_completed(&self) -> bool { + matches!(self.status, BenchmarkRequestStatus::Completed { .. }) + } } /// Cached information about benchmark requests in the DB @@ -1047,6 +1051,80 @@ impl BenchmarkRequestIndex { pub fn completed_requests(&self) -> &HashSet { &self.completed } + + pub fn add_tag(&mut self, tag: &str) { + self.all.insert(tag.to_string()); + self.completed.insert(tag.to_string()); + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum BenchmarkJobStatus { + Queued, + InProgress, + Success, + Failure, +} + +const BENCHMARK_JOB_STATUS_QUEUED_STR: &str = "queued"; +const BENCHMARK_JOB_STATUS_IN_PROGRESS_STR: &str = "in_progress"; +const BENCHMARK_JOB_STATUS_SUCCESS_STR: &str = "success"; +const BENCHMARK_JOB_STATUS_FAILURE_STR: &str = "failure"; + +impl BenchmarkJobStatus { + pub fn as_str(&self) -> &str { + match self { + BenchmarkJobStatus::Queued => BENCHMARK_JOB_STATUS_QUEUED_STR, + BenchmarkJobStatus::InProgress => BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + BenchmarkJobStatus::Success => BENCHMARK_JOB_STATUS_SUCCESS_STR, + BenchmarkJobStatus::Failure => BENCHMARK_JOB_STATUS_FAILURE_STR, + } + } +} + +impl fmt::Display for BenchmarkJobStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkSet(u32); + +#[derive(Debug, Clone, PartialEq)] +pub struct BenchmarkJob { + target: Target, + backend: CodegenBackend, + benchmark_set: BenchmarkSet, + collector_id: String, + created_at: DateTime, + started_at: Option>, + completed_at: Option>, + status: BenchmarkJobStatus, + retry: u32, +} + +impl BenchmarkJob { + pub fn new( + target: Target, + backend: CodegenBackend, + benchmark_set: u32, + collector_id: &str, + created_at: DateTime, + status: BenchmarkJobStatus, + ) -> Self { + BenchmarkJob { + target, + backend, + benchmark_set: BenchmarkSet(benchmark_set), + collector_id: collector_id.to_string(), + created_at, + started_at: None, + completed_at: None, + status, + retry: 0, + } + } } #[derive(Debug, Clone, PartialEq)] diff --git a/database/src/pool.rs b/database/src/pool.rs index ce12782c1..231cd4b83 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,7 +1,7 @@ use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex, - BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest, + BenchmarkRequestIndex, BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -232,6 +232,13 @@ pub trait Connection: Send + Sync { &self, artifact_row_id: &ArtifactIdNumber, ) -> anyhow::Result>; + + /// Try and mark the benchmark_request as completed. Will return `true` if + /// it has been marked as completed else `false` meaning there was no change + async fn mark_benchmark_request_as_completed( + &self, + benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result; } #[async_trait::async_trait] @@ -358,6 +365,12 @@ mod tests { use chrono::Utc; use std::str::FromStr; + use super::*; + use crate::{ + tests::{run_db_test, run_postgres_test}, + BenchmarkJobStatus, BenchmarkRequestStatus, BenchmarkRequestType, Commit, CommitType, Date, + }; + /// Create a Commit fn create_commit(commit_sha: &str, time: chrono::DateTime, r#type: CommitType) -> Commit { Commit { @@ -367,6 +380,43 @@ mod tests { } } + async fn db_insert_jobs(conn: &dyn Connection, request_id: u32, jobs: &[BenchmarkJob]) { + for job in jobs { + conn.insert_benchmark_job(request_id, job).await.unwrap(); + } + } + + /// Create a try + fn create_try( + sha: Option<&str>, + parent_sha: Option<&str>, + pr: u32, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: &str, + profiles: &str, + ) -> BenchmarkRequest { + BenchmarkRequest { + commit_type: BenchmarkRequestType::Try { + sha: sha.map(|it| it.to_string()), + parent_sha: parent_sha.map(|it| it.to_string()), + pr, + }, + created_at, + status, + backends: backends.to_string(), + profiles: profiles.to_string(), + } + } + + async fn request_is_complete(conn: &dyn Connection, tag: &str) -> bool { + conn.load_benchmark_request_index() + .await + .unwrap() + .completed_requests() + .contains(tag) + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -690,6 +740,252 @@ mod tests { .await .unwrap() .is_empty()); + Ok(ctx) + }) + .await; + } + + // We can't insert jobs unless there is a corresponding benchmark request + #[tokio::test] + async fn insert_benchmark_job_fk_violation() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let job = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 3, + "collector 1", + Utc::now(), + BenchmarkJobStatus::Queued, + ); + + assert!(db.insert_benchmark_job(1, &job).await.is_err()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn insert_benchmark_job() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let job = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 3, + "collector 1", + Utc::now(), + BenchmarkJobStatus::Queued, + ); + let request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + assert!(db.insert_benchmark_job(1, &job).await.is_ok()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed_no_tag() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + None, + None, + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + Utc::now(), + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + Utc::now(), + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .mark_benchmark_request_as_completed(&mut request) + .await + .is_err()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed_not_inprogress() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::ArtifactsReady, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + Utc::now(), + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + Utc::now(), + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .mark_benchmark_request_as_completed(&mut request) + .await + .is_err()); + + Ok(ctx) + }) + .await; + } + + // The case where the job is not complete + #[tokio::test] + async fn mark_request_completed_nop() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + Utc::now(), + BenchmarkJobStatus::InProgress, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + Utc::now(), + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .mark_benchmark_request_as_completed(&mut request) + .await + .is_ok()); + assert_eq!(request.status(), BenchmarkRequestStatus::InProgress); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn mark_request_completed() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let db = db.connection().await; + let request_id = 1; + + let mut request = create_try( + Some("s1"), + Some("p1"), + 3, + Utc::now(), + BenchmarkRequestStatus::InProgress, + "", + "", + ); + db.insert_benchmark_request(&request).await.unwrap(); + + let job_1 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 1, + "collector 1", + Utc::now(), + BenchmarkJobStatus::Success, + ); + let job_2 = BenchmarkJob::new( + Target::X86_64UnknownLinuxGnu, + CodegenBackend::Llvm, + 2, + "collector 2", + Utc::now(), + BenchmarkJobStatus::Success, + ); + + db_insert_jobs(&*db, request_id, &[job_1, job_2]).await; + + assert!(db + .mark_benchmark_request_as_completed(&mut request) + .await + .is_ok()); + // The struct should have been mutated + assert!(request.is_completed()); + // The tag should exist in the completed set + assert!(request_is_complete(&*db, request.tag().unwrap()).await); Ok(ctx) }) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 9ce80b936..d90223848 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,7 +1,6 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJobStatus, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, @@ -311,6 +310,7 @@ static MIGRATIONS: &[&str] = &[ // being added to the table r#"CREATE UNIQUE INDEX benchmark_request_pr_commit_type_idx ON benchmark_request (pr, commit_type) WHERE status != 'completed';"#, r#" +<<<<<<< HEAD CREATE TABLE IF NOT EXISTS collector_config ( id SERIAL PRIMARY KEY, target TEXT NOT NULL, @@ -1710,6 +1710,67 @@ where }) .collect()) } + + async fn mark_benchmark_request_as_completed( + &self, + benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result { + anyhow::ensure!( + benchmark_request.tag().is_some(), + "Benchmark request has no tag" + ); + anyhow::ensure!( + benchmark_request.status == BenchmarkRequestStatus::InProgress, + "Can only mark benchmark request whos status is in_progress as complete" + ); + + // Find if the benchmark is completed and update it's status to completed + // in one SQL block + let row = self + .conn() + .query_opt( + " + UPDATE + benchmark_request + SET + status = $1, + completed_at = NOW() + WHERE + benchmark_request.tag = $2 + AND benchmark_request.commit_type = $3 + AND benchmark_request.status = $4 + AND NOT EXISTS ( + SELECT + 1 + FROM + job_queue + WHERE job_queue.request_id = benchmark_request.id + AND job_queue.status NOT IN ($5, $6) + ) + RETURNING benchmark_request.id, benchmark_request.completed_at; + ", + &[ + &BENCHMARK_REQUEST_STATUS_COMPLETED_STR, + &benchmark_request.tag(), + &benchmark_request.commit_type, + &benchmark_request.status, + &BenchmarkJobStatus::Success, + &BenchmarkJobStatus::Failure, + ], + ) + .await + .context("Failed to get id for benchmark_request")?; + // The affected id is returned by the query thus we can use the row's + // presence to determine if the request was marked as completed + if let Some(row) = row { + let completed_at = row.get::<_, DateTime>(1); + // Also mutate our object + benchmark_request.status = BenchmarkRequestStatus::Completed { completed_at }; + Ok(true) + } else { + Ok(false) + } + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { @@ -1753,12 +1814,12 @@ macro_rules! impl_to_postgresql_via_to_string { }; } -impl_to_postgresql_via_to_string!(BenchmarkRequestType); +impl_to_postgresql_via_to_string!(BenchmarkJobStatus); impl_to_postgresql_via_to_string!(BenchmarkRequestStatus); -impl_to_postgresql_via_to_string!(Target); +impl_to_postgresql_via_to_string!(BenchmarkRequestType); impl_to_postgresql_via_to_string!(CodegenBackend); impl_to_postgresql_via_to_string!(Profile); -impl_to_postgresql_via_to_string!(BenchmarkJobStatus); +impl_to_postgresql_via_to_string!(Target); #[cfg(test)] mod tests { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index d781de4cc..592c53594 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1330,6 +1330,13 @@ impl Connection for SqliteConnection { })? .collect::>()?) } + + async fn mark_benchmark_request_as_completed( + &self, + _benchmark_request: &mut BenchmarkRequest, + ) -> anyhow::Result { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs index 0f2f35587..77b109e4e 100644 --- a/site/src/job_queue/mod.rs +++ b/site/src/job_queue/mod.rs @@ -253,8 +253,13 @@ async fn try_enqueue_next_benchmark_request( break; } BenchmarkRequestStatus::InProgress => { - // TODO: Try to mark as completed - break; + if conn + .try_mark_benchmark_request_as_completed(&mut request) + .await? + { + index.add_tag(request.tag().unwrap()); + continue; + } } BenchmarkRequestStatus::WaitingForArtifacts | BenchmarkRequestStatus::Completed { .. } => {