Skip to content
89 changes: 89 additions & 0 deletions collector/src/bin/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::process;
use std::process::Command;
use std::str::FromStr;
use std::time::Duration;
use std::{str, time::Instant};

Expand Down Expand Up @@ -666,6 +667,37 @@ enum Commands {
/// The name of the modified artifact to be compared.
modified: Option<String>,
},

/// Registers a collector in the database
AddCollector {
#[command(flatten)]
db: DbOption,

#[arg(long)]
collector_name: String,

#[arg(long)]
target: String,

#[arg(long)]
is_active: bool,

#[arg(long)]
benchmark_set: u32,
},

/// Polls the job queue for work to benchmark
DequeueJob {
/// The unique identifier for the collector
#[arg(long)]
collector_name: String,

#[arg(long)]
target: String,

#[command(flatten)]
db: DbOption,
},
}

#[derive(Debug, clap::Parser)]
Expand Down Expand Up @@ -1266,6 +1298,63 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
rt.block_on(compare_artifacts(conn, metric, base, modified))?;
Ok(0)
}

Commands::AddCollector {
db,
collector_name,
target,
is_active,
benchmark_set,
} => {
let pool = Pool::open(&db.db);
let rt = build_async_runtime();
let conn = rt.block_on(pool.connection());

let target = database::Target::from_str(&target).map_err(|e| anyhow::anyhow!(e))?;
rt.block_on(conn.add_collector_config(
&collector_name,
&target,
benchmark_set,
is_active,
))?;
Ok(0)
}

Commands::DequeueJob {
collector_name,
db,
target,
} => {
let pool = Pool::open(&db.db);
let rt = build_async_runtime();
let conn = rt.block_on(pool.connection());

// Obtain the configuration and validate that it matches the
// collector's setup
let collector_config: database::CollectorConfig =
rt.block_on(conn.get_collector_config(&collector_name))?;

let collector_target = collector_config.target();
if collector_target.as_str() != target {
panic!(
"Mismatching target for collector expected `{collector_target}` got `{target}`"
);
}

// Dequeue a job
let benchmark_job = rt.block_on(conn.dequeue_benchmark_job(
&collector_name,
collector_config.target(),
collector_config.benchmark_set(),
))?;

if let Some(benchmark_job) = benchmark_job {
// TODO; process the job
println!("{:?}", benchmark_job);
}

Ok(0)
}
}
}

Expand Down
26 changes: 26 additions & 0 deletions collector/src/compile/benchmark/target.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{fmt, str::FromStr};

/// Target representing an Rust target triple, for a full list of targets and
/// their support see;
/// https://doc.rust-lang.org/nightly/rustc/platform-support.html
Expand All @@ -15,12 +17,36 @@ impl Default for Target {
}
}

impl FromStr for Target {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.to_ascii_lowercase().as_str() {
"x86_64-unknown-linux-gnu" => Target::X86_64UnknownLinuxGnu,
_ => return Err(format!("{} is not a valid target", s)),
})
}
}

impl fmt::Display for Target {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}

impl Target {
pub fn all() -> Vec<Self> {
vec![Self::X86_64UnknownLinuxGnu]
}
}

impl Target {
pub fn as_str(self) -> &'static str {
match self {
Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu",
}
}
}

impl From<database::Target> for Target {
fn from(value: database::Target) -> Self {
match value {
Expand Down
69 changes: 69 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,12 @@ pub enum BenchmarkJobStatus {
Queued,
InProgress {
started_at: DateTime<Utc>,
collector_name: String,
},
Completed {
started_at: DateTime<Utc>,
completed_at: DateTime<Utc>,
collector_name: String,
success: bool,
},
}
Expand Down Expand Up @@ -1109,3 +1111,70 @@ pub struct BenchmarkJob {
status: BenchmarkJobStatus,
retry: u32,
}

impl BenchmarkJob {
pub fn target(&self) -> &Target {
&self.target
}

pub fn backend(&self) -> &CodegenBackend {
&self.backend
}

pub fn profile(&self) -> &Profile {
&self.profile
}

pub fn request_tag(&self) -> &str {
&self.request_tag
}

pub fn benchmark_set(&self) -> &BenchmarkSet {
&self.benchmark_set
}

pub fn collector_name(&self) -> Option<&str> {
match &self.status {
BenchmarkJobStatus::Queued => None,
BenchmarkJobStatus::InProgress { collector_name, .. }
| BenchmarkJobStatus::Completed { collector_name, .. } => Some(collector_name),
}
}
}

/// The configuration for a collector
#[derive(Debug, PartialEq)]
pub struct CollectorConfig {
name: String,
target: Target,
benchmark_set: BenchmarkSet,
is_active: bool,
last_heartbeat_at: DateTime<Utc>,
date_added: DateTime<Utc>,
}

impl CollectorConfig {
pub fn name(&self) -> &str {
&self.name
}

pub fn target(&self) -> &Target {
&self.target
}

pub fn benchmark_set(&self) -> &BenchmarkSet {
&self.benchmark_set
}

pub fn is_active(&self) -> bool {
self.is_active
}

pub fn last_heartbeat_at(&self) -> DateTime<Utc> {
self.last_heartbeat_at
}

pub fn date_added(&self) -> DateTime<Utc> {
self.date_added
}
}
Loading
Loading