diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index fffa3ed47..5d9502c95 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -12,6 +12,7 @@ use std::marker::PhantomData; use std::path::{Path, PathBuf}; use std::process; use std::process::Command; +use std::str::FromStr; use std::time::Duration; use std::{str, time::Instant}; @@ -666,6 +667,37 @@ enum Commands { /// The name of the modified artifact to be compared. modified: Option, }, + + /// Registers a collector in the database + AddCollector { + #[command(flatten)] + db: DbOption, + + #[arg(long)] + collector_name: String, + + #[arg(long)] + target: String, + + #[arg(long)] + is_active: bool, + + #[arg(long)] + benchmark_set: u32, + }, + + /// Polls the job queue for work to benchmark + DequeueJob { + /// The unique identifier for the collector + #[arg(long)] + collector_name: String, + + #[arg(long)] + target: String, + + #[command(flatten)] + db: DbOption, + }, } #[derive(Debug, clap::Parser)] @@ -1266,6 +1298,63 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc rt.block_on(compare_artifacts(conn, metric, base, modified))?; Ok(0) } + + Commands::AddCollector { + db, + collector_name, + target, + is_active, + benchmark_set, + } => { + let pool = Pool::open(&db.db); + let rt = build_async_runtime(); + let conn = rt.block_on(pool.connection()); + + let target = database::Target::from_str(&target).map_err(|e| anyhow::anyhow!(e))?; + rt.block_on(conn.add_collector_config( + &collector_name, + &target, + benchmark_set, + is_active, + ))?; + Ok(0) + } + + Commands::DequeueJob { + collector_name, + db, + target, + } => { + let pool = Pool::open(&db.db); + let rt = build_async_runtime(); + let conn = rt.block_on(pool.connection()); + + // Obtain the configuration and validate that it matches the + // collector's setup + let collector_config: database::CollectorConfig = + rt.block_on(conn.get_collector_config(&collector_name))?; + + let collector_target = collector_config.target(); + if collector_target.as_str() != target { + panic!( + "Mismatching target for collector expected `{collector_target}` got `{target}`" + ); + } + + // Dequeue a job + let benchmark_job = rt.block_on(conn.dequeue_benchmark_job( + &collector_name, + collector_config.target(), + collector_config.benchmark_set(), + ))?; + + if let Some(benchmark_job) = benchmark_job { + // TODO; process the job + println!("{:?}", benchmark_job); + } + + Ok(0) + } } } diff --git a/collector/src/compile/benchmark/target.rs b/collector/src/compile/benchmark/target.rs index f58a14e0a..2d84ff0ab 100644 --- a/collector/src/compile/benchmark/target.rs +++ b/collector/src/compile/benchmark/target.rs @@ -1,3 +1,5 @@ +use std::{fmt, str::FromStr}; + /// Target representing an Rust target triple, for a full list of targets and /// their support see; /// https://doc.rust-lang.org/nightly/rustc/platform-support.html @@ -15,12 +17,36 @@ impl Default for Target { } } +impl FromStr for Target { + type Err = String; + fn from_str(s: &str) -> Result { + Ok(match s.to_ascii_lowercase().as_str() { + "x86_64-unknown-linux-gnu" => Target::X86_64UnknownLinuxGnu, + _ => return Err(format!("{} is not a valid target", s)), + }) + } +} + +impl fmt::Display for Target { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + impl Target { pub fn all() -> Vec { vec![Self::X86_64UnknownLinuxGnu] } } +impl Target { + pub fn as_str(self) -> &'static str { + match self { + Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu", + } + } +} + impl From for Target { fn from(value: database::Target) -> Self { match value { diff --git a/database/src/lib.rs b/database/src/lib.rs index fc2510d99..d37d606ea 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1054,10 +1054,12 @@ pub enum BenchmarkJobStatus { Queued, InProgress { started_at: DateTime, + collector_name: String, }, Completed { started_at: DateTime, completed_at: DateTime, + collector_name: String, success: bool, }, } @@ -1109,3 +1111,70 @@ pub struct BenchmarkJob { status: BenchmarkJobStatus, retry: u32, } + +impl BenchmarkJob { + pub fn target(&self) -> &Target { + &self.target + } + + pub fn backend(&self) -> &CodegenBackend { + &self.backend + } + + pub fn profile(&self) -> &Profile { + &self.profile + } + + pub fn request_tag(&self) -> &str { + &self.request_tag + } + + pub fn benchmark_set(&self) -> &BenchmarkSet { + &self.benchmark_set + } + + pub fn collector_name(&self) -> Option<&str> { + match &self.status { + BenchmarkJobStatus::Queued => None, + BenchmarkJobStatus::InProgress { collector_name, .. } + | BenchmarkJobStatus::Completed { collector_name, .. } => Some(collector_name), + } + } +} + +/// The configuration for a collector +#[derive(Debug, PartialEq)] +pub struct CollectorConfig { + name: String, + target: Target, + benchmark_set: BenchmarkSet, + is_active: bool, + last_heartbeat_at: DateTime, + date_added: DateTime, +} + +impl CollectorConfig { + pub fn name(&self) -> &str { + &self.name + } + + pub fn target(&self) -> &Target { + &self.target + } + + pub fn benchmark_set(&self) -> &BenchmarkSet { + &self.benchmark_set + } + + pub fn is_active(&self) -> bool { + self.is_active + } + + pub fn last_heartbeat_at(&self) -> DateTime { + self.last_heartbeat_at + } + + pub fn date_added(&self) -> DateTime { + self.date_added + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index ce12782c1..563e4083c 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,7 +1,8 @@ use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex, - BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest, + BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig, + CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -232,6 +233,26 @@ pub trait Connection: Send + Sync { &self, artifact_row_id: &ArtifactIdNumber, ) -> anyhow::Result>; + + /// Add the confiuguration for a collector + async fn add_collector_config( + &self, + collector_name: &str, + target: &Target, + benchmark_set: u32, + is_active: bool, + ) -> anyhow::Result; + + /// Get the confiuguration for a collector by the name of the collector + async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result; + + /// Get the confiuguration for a collector by the name of the collector + async fn dequeue_benchmark_job( + &self, + collector_name: &str, + target: &Target, + benchmark_set: &BenchmarkSet, + ) -> anyhow::Result>; } #[async_trait::async_trait] @@ -690,6 +711,127 @@ mod tests { .await .unwrap() .is_empty()); + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn get_collector_config_error_if_not_exist() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + + let collector_config_result = db.get_collector_config("collector-1").await; + + assert!(collector_config_result.is_err()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn add_collector_config() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + + let insert_config_result = db + .add_collector_config("collector-1", &Target::X86_64UnknownLinuxGnu, 1, true) + .await; + assert!(insert_config_result.is_ok()); + + let get_config_result = db.get_collector_config("collector-1").await; + assert!(get_config_result.is_ok()); + + // What we entered into the database should be identical to what is + // returned from the database + assert_eq!(insert_config_result.unwrap(), get_config_result.unwrap()); + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn dequeue_benchmark_job_empty_queue() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + + let benchmark_job_result = db + .dequeue_benchmark_job( + "collector-1", + &Target::X86_64UnknownLinuxGnu, + &BenchmarkSet(420), + ) + .await; + + assert!(benchmark_job_result.is_ok()); + assert!(benchmark_job_result.unwrap().is_none()); + + Ok(ctx) + }) + .await; + } + + #[tokio::test] + async fn dequeue_benchmark_job() { + run_postgres_test(|ctx| async { + let db = ctx.db_client().connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + + let insert_result = db + .add_collector_config("collector-1", &Target::X86_64UnknownLinuxGnu, 1, true) + .await; + assert!(insert_result.is_ok()); + + let collector_config = insert_result.unwrap(); + + let benchmark_request = + BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); + + // Insert the request so we don't violate the foreign key + db.insert_benchmark_request(&benchmark_request) + .await + .unwrap(); + + // Now we can insert the job + let enqueue_result = db + .enqueue_benchmark_job( + benchmark_request.tag().unwrap(), + &Target::X86_64UnknownLinuxGnu, + &CodegenBackend::Llvm, + &Profile::Opt, + 1u32, + ) + .await; + assert!(enqueue_result.is_ok()); + + let benchmark_job = db + .dequeue_benchmark_job( + collector_config.name(), + collector_config.target(), + collector_config.benchmark_set(), + ) + .await; + assert!(benchmark_job.is_ok()); + + let benchmark_job = benchmark_job.unwrap(); + assert!(benchmark_job.is_some()); + + // Ensure the properties of the job match both the request and the + // collector configuration + let benchmark_job = benchmark_job.unwrap(); + assert_eq!( + benchmark_job.request_tag(), + benchmark_request.tag().unwrap() + ); + assert_eq!( + benchmark_job.benchmark_set(), + collector_config.benchmark_set() + ); + assert_eq!( + benchmark_job.collector_name().unwrap(), + collector_config.name(), + ); Ok(ctx) }) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 9ce80b936..1d30a942a 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,12 +1,15 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJobStatus, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkJob, BenchmarkJobStatus, BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, - CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, - QueuedCommit, Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, + BenchmarkSet, CodegenBackend, CollectionId, CollectorConfig, Commit, CommitType, + CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, BENCHMARK_JOB_STATUS_QUEUED_STR, + BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, - BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR, + BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR, + BENCHMARK_REQUEST_TRY_STR, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -361,6 +364,19 @@ static MIGRATIONS: &[&str] = &[ ); CREATE INDEX IF NOT EXISTS job_queue_request_tag_idx ON job_queue (request_tag); "#, + // The name is unique and not null in the collector config and is simpler + // to use as we do not expose the `collector_id` in the code + r#" + ALTER TABLE job_queue DROP CONSTRAINT IF EXISTS job_queue_collector; + ALTER TABLE job_queue ADD COLUMN collector_name TEXT; + ALTER TABLE job_queue + ADD CONSTRAINT job_queue_collector + FOREIGN KEY (collector_name) + REFERENCES collector_config(name) + ON DELETE CASCADE; + ALTER TABLE job_queue DROP COLUMN IF EXISTS collector_id; + CREATE INDEX IF NOT EXISTS job_queue_status_target_benchmark_set_idx ON job_queue (status, target, benchmark_set); + "#, ]; #[async_trait::async_trait] @@ -1477,7 +1493,7 @@ where &benchmark_request.parent_sha(), &benchmark_request.pr().map(|it| *it as i32), &benchmark_request.commit_type, - &benchmark_request.status, + &benchmark_request.status.as_str(), &benchmark_request.created_at, &benchmark_request.backends, &benchmark_request.profiles, @@ -1558,9 +1574,9 @@ where &[ &sha, &parent_sha, - &BenchmarkRequestStatus::ArtifactsReady, + &BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, &(pr as i32), - &BenchmarkRequestStatus::WaitingForArtifacts, + &BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR, ], ) .await @@ -1679,7 +1695,7 @@ where &backend, &profile, &(benchmark_set as i32), - &BenchmarkJobStatus::Queued, + &BENCHMARK_JOB_STATUS_QUEUED_STR, ], ) .await @@ -1710,6 +1726,167 @@ where }) .collect()) } + + async fn add_collector_config( + &self, + collector_name: &str, + target: &Target, + benchmark_set: u32, + is_active: bool, + ) -> anyhow::Result { + let row = self + .conn() + .query_one( + "INSERT INTO collector_config( + name, + target, + date_added, + last_heartbeat_at, + benchmark_set, + is_active + ) VALUES ( + $1, + $2, + NOW(), + NOW(), + $3, + $4 + ) + RETURNING + last_heartbeat_at, + date_added + ", + &[ + &collector_name, + &target, + &(benchmark_set as i32), + &is_active, + ], + ) + .await + .context("failed to create collector config")?; + + let collector_config = CollectorConfig { + name: collector_name.into(), + target: *target, + benchmark_set: BenchmarkSet(benchmark_set), + is_active, + last_heartbeat_at: row.get::<_, DateTime>(0), + date_added: row.get::<_, DateTime>(1), + }; + Ok(collector_config) + } + + async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result { + let row = self + .conn() + .query_one( + "SELECT + target, + benchmark_set, + is_active, + last_heartbeat_at, + date_added + FROM + collector_config + WHERE + name = $1;", + &[&collector_name], + ) + .await?; + + let collector_config = CollectorConfig { + name: collector_name.into(), + target: Target::from_str(&row.get::<_, String>(0)).map_err(|e| anyhow::anyhow!(e))?, + benchmark_set: BenchmarkSet(row.get::<_, i32>(1) as u32), + is_active: row.get::<_, bool>(2), + last_heartbeat_at: row.get::<_, DateTime>(3), + date_added: row.get::<_, DateTime>(4), + }; + Ok(collector_config) + } + + async fn dequeue_benchmark_job( + &self, + collector_name: &str, + target: &Target, + benchmark_set: &BenchmarkSet, + ) -> anyhow::Result> { + // We take the oldest job from the job_queue matching the benchmark_set, + // target and status of 'queued' + let row_opt = self + .conn() + .query_opt( + " + WITH picked AS ( + SELECT + id + FROM + job_queue + WHERE + status = $1 + AND target = $2 + AND benchmark_set = $3 + ORDER + BY created_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ), updated_queue AS ( + UPDATE + job_queue + SET + collector_name = $4, + started_at = NOW(), + status = $5 + FROM + picked + WHERE + job_queue.id = picked.id + RETURNING + job_queue.backend, + job_queue.profile, + job_queue.request_tag, + job_queue.created_at, + job_queue.started_at, + job_queue.retry + ) + SELECT + * + FROM + updated_queue;", + &[ + &BENCHMARK_JOB_STATUS_QUEUED_STR, + &target, + &(benchmark_set.0 as i32), + &collector_name, + &BENCHMARK_JOB_STATUS_IN_PROGRESS_STR, + ], + ) + .await?; + + match row_opt { + None => Ok(None), + Some(row) => { + let job = BenchmarkJob { + target: *target, + backend: CodegenBackend::from_str(&row.get::<_, String>(0)) + .map_err(|e| anyhow::anyhow!(e))?, + profile: Profile::from_str(&row.get::<_, String>(1)) + .map_err(|e| anyhow::anyhow!(e))?, + request_tag: row.get::<_, String>(2), + benchmark_set: benchmark_set.clone(), + created_at: row.get::<_, DateTime>(3), + // The job is now in an in_progress state + status: BenchmarkJobStatus::InProgress { + started_at: row.get::<_, DateTime>(4), + collector_name: collector_name.into(), + }, + retry: row.get::<_, i32>(5) as u32, + }; + Ok(Some(job)) + } + } + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { @@ -1754,11 +1931,9 @@ macro_rules! impl_to_postgresql_via_to_string { } impl_to_postgresql_via_to_string!(BenchmarkRequestType); -impl_to_postgresql_via_to_string!(BenchmarkRequestStatus); impl_to_postgresql_via_to_string!(Target); impl_to_postgresql_via_to_string!(CodegenBackend); impl_to_postgresql_via_to_string!(Profile); -impl_to_postgresql_via_to_string!(BenchmarkJobStatus); #[cfg(test)] mod tests { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index d781de4cc..e1efbddc0 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,9 +1,9 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::selector::CompileTestCase; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, BenchmarkRequestIndex, - BenchmarkRequestStatus, CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, - Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkJob, BenchmarkRequest, + BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectionId, + CollectorConfig, Commit, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1330,6 +1330,29 @@ impl Connection for SqliteConnection { })? .collect::>()?) } + + async fn get_collector_config(&self, _collector_name: &str) -> anyhow::Result { + no_queue_implementation_abort!() + } + + async fn dequeue_benchmark_job( + &self, + _collector_name: &str, + _target: &Target, + _benchmark_set: &BenchmarkSet, + ) -> anyhow::Result> { + no_queue_implementation_abort!() + } + + async fn add_collector_config( + &self, + _collector_name: &str, + _target: &Target, + _benchmark_set: u32, + _is_active: bool, + ) -> anyhow::Result { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId {