Skip to content

Commit d4cf6a1

Browse files
committed
Most of the scaffolding for dequeuing a job from a collector
1 parent 7929852 commit d4cf6a1

File tree

8 files changed

+383
-10
lines changed

8 files changed

+383
-10
lines changed

collector/src/bin/collector.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use collector::artifact_stats::{
3535
use collector::codegen::{codegen_diff, CodegenType};
3636
use collector::compile::benchmark::category::Category;
3737
use collector::compile::benchmark::codegen_backend::CodegenBackend;
38+
use collector::compile::benchmark::collector_config::CollectorConfig;
3839
use collector::compile::benchmark::profile::Profile;
3940
use collector::compile::benchmark::scenario::Scenario;
4041
use collector::compile::benchmark::target::Target;
@@ -664,6 +665,19 @@ enum Commands {
664665
/// The name of the modified artifact to be compared.
665666
modified: Option<String>,
666667
},
668+
669+
/// Polls the job queue for work to benchmark
670+
DequeueJob {
671+
/// The unique identifier for the collector
672+
#[arg(long)]
673+
collector_name: String,
674+
675+
#[arg(long)]
676+
target: String,
677+
678+
#[command(flatten)]
679+
db: DbOption,
680+
},
667681
}
668682

669683
#[derive(Debug, clap::Parser)]
@@ -1264,6 +1278,44 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
12641278
rt.block_on(compare_artifacts(conn, metric, base, modified))?;
12651279
Ok(0)
12661280
}
1281+
1282+
Commands::DequeueJob {
1283+
collector_name,
1284+
db,
1285+
target,
1286+
} => {
1287+
let pool = Pool::open(&db.db);
1288+
let rt = build_async_runtime();
1289+
let conn = rt.block_on(pool.connection());
1290+
1291+
// Obtain the configuration and validate that it matches the
1292+
// collector's setup
1293+
let collector_config: CollectorConfig = rt
1294+
.block_on(conn.get_collector_config(&collector_name))?
1295+
.into();
1296+
1297+
let collector_target = collector_config.target();
1298+
if collector_target.as_str() != target {
1299+
panic!(
1300+
"Mismatching target for collector expected `{}` got `{}`",
1301+
collector_target, target
1302+
);
1303+
}
1304+
1305+
// Dequeue a job
1306+
let benchmark_job = rt.block_on(conn.dequeue_benchmark_job(
1307+
&collector_name,
1308+
&collector_config.target().to_db_target(),
1309+
collector_config.benchmark_set(),
1310+
))?;
1311+
1312+
if let Some(benchmark_job) = benchmark_job {
1313+
// TODO; process the job
1314+
println!("{:?}", benchmark_job);
1315+
}
1316+
1317+
Ok(0)
1318+
}
12671319
}
12681320
}
12691321

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use chrono::{DateTime, Utc};
2+
use database::BenchmarkSet;
3+
4+
use super::target::Target;
5+
6+
pub struct CollectorConfig {
7+
name: String,
8+
target: Target,
9+
benchmark_set: BenchmarkSet,
10+
is_active: bool,
11+
last_heartbeat_at: DateTime<Utc>,
12+
date_added: DateTime<Utc>,
13+
}
14+
15+
impl CollectorConfig {
16+
pub fn name(&self) -> &str {
17+
&self.name
18+
}
19+
20+
pub fn target(&self) -> &Target {
21+
&self.target
22+
}
23+
24+
pub fn benchmark_set(&self) -> &BenchmarkSet {
25+
&self.benchmark_set
26+
}
27+
28+
pub fn is_active(&self) -> bool {
29+
self.is_active
30+
}
31+
32+
pub fn last_heartbeat_at(&self) -> DateTime<Utc> {
33+
self.last_heartbeat_at
34+
}
35+
36+
pub fn date_added(&self) -> DateTime<Utc> {
37+
self.date_added
38+
}
39+
}
40+
41+
impl From<database::CollectorConfig> for CollectorConfig {
42+
fn from(value: database::CollectorConfig) -> Self {
43+
CollectorConfig {
44+
name: value.name().into(),
45+
target: Target::from_db_target(value.target()),
46+
benchmark_set: value.benchmark_set().clone(),
47+
is_active: value.is_active(),
48+
last_heartbeat_at: value.last_heartbeat_at(),
49+
date_added: value.date_added(),
50+
}
51+
}
52+
}

collector/src/compile/benchmark/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tempfile::TempDir;
1818

1919
pub mod category;
2020
pub mod codegen_backend;
21+
pub mod collector_config;
2122
pub(crate) mod patch;
2223
pub mod profile;
2324
pub mod scenario;

collector/src/compile/benchmark/target.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::{fmt, str::FromStr};
2+
13
/// Target representing an Rust target triple, for a full list of targets and
24
/// their support see;
35
/// https://doc.rust-lang.org/nightly/rustc/platform-support.html
@@ -15,14 +17,42 @@ impl Default for Target {
1517
}
1618
}
1719

20+
impl FromStr for Target {
21+
type Err = String;
22+
fn from_str(s: &str) -> Result<Self, Self::Err> {
23+
Ok(match s.to_ascii_lowercase().as_str() {
24+
"x86_64-unknown-linux-gnu" => Target::X86_64UnknownLinuxGnu,
25+
_ => return Err(format!("{} is not a valid target", s)),
26+
})
27+
}
28+
}
29+
30+
impl fmt::Display for Target {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
write!(f, "{}", self.as_str())
33+
}
34+
}
35+
1836
impl Target {
1937
pub fn all() -> Vec<Self> {
2038
vec![Self::X86_64UnknownLinuxGnu]
2139
}
2240

41+
pub fn as_str(self) -> &'static str {
42+
match self {
43+
Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu",
44+
}
45+
}
46+
2347
pub fn from_db_target(target: &database::Target) -> Target {
2448
match target {
2549
database::Target::X86_64UnknownLinuxGnu => Self::X86_64UnknownLinuxGnu,
2650
}
2751
}
52+
53+
pub fn to_db_target(&self) -> database::Target {
54+
match self {
55+
Target::X86_64UnknownLinuxGnu => database::Target::X86_64UnknownLinuxGnu,
56+
}
57+
}
2858
}

database/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,10 +1054,12 @@ pub enum BenchmarkJobStatus {
10541054
Queued,
10551055
InProgress {
10561056
started_at: DateTime<Utc>,
1057+
collector_name: String,
10571058
},
10581059
Completed {
10591060
started_at: DateTime<Utc>,
10601061
completed_at: DateTime<Utc>,
1062+
collector_name: String,
10611063
success: bool,
10621064
},
10631065
}
@@ -1109,3 +1111,39 @@ pub struct BenchmarkJob {
11091111
status: BenchmarkJobStatus,
11101112
retry: u32,
11111113
}
1114+
1115+
/// The configuration for a collector
1116+
pub struct CollectorConfig {
1117+
name: String,
1118+
target: Target,
1119+
benchmark_set: BenchmarkSet,
1120+
is_active: bool,
1121+
last_heartbeat_at: DateTime<Utc>,
1122+
date_added: DateTime<Utc>,
1123+
}
1124+
1125+
impl CollectorConfig {
1126+
pub fn name(&self) -> &str {
1127+
&self.name
1128+
}
1129+
1130+
pub fn target(&self) -> &Target {
1131+
&self.target
1132+
}
1133+
1134+
pub fn benchmark_set(&self) -> &BenchmarkSet {
1135+
&self.benchmark_set
1136+
}
1137+
1138+
pub fn is_active(&self) -> bool {
1139+
self.is_active
1140+
}
1141+
1142+
pub fn last_heartbeat_at(&self) -> DateTime<Utc> {
1143+
self.last_heartbeat_at
1144+
}
1145+
1146+
pub fn date_added(&self) -> DateTime<Utc> {
1147+
self.date_added
1148+
}
1149+
}

database/src/pool.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
2-
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex,
3-
BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target,
2+
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest,
3+
BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig,
4+
CompileBenchmark, Target,
45
};
56
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
67
use chrono::{DateTime, Utc};
@@ -220,6 +221,17 @@ pub trait Connection: Send + Sync {
220221
profile: &Profile,
221222
benchmark_set: u32,
222223
) -> anyhow::Result<()>;
224+
225+
/// Get the confiuguration for a collector by the name of the collector
226+
async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result<CollectorConfig>;
227+
228+
/// Get the confiuguration for a collector by the name of the collector
229+
async fn dequeue_benchmark_job(
230+
&self,
231+
collector_name: &str,
232+
target: &Target,
233+
benchmark_set: &BenchmarkSet,
234+
) -> anyhow::Result<Option<BenchmarkJob>>;
223235
}
224236

225237
#[async_trait::async_trait]
@@ -625,4 +637,39 @@ mod tests {
625637
})
626638
.await;
627639
}
640+
641+
#[tokio::test]
642+
async fn get_collector_config_error_if_not_exist() {
643+
run_postgres_test(|ctx| async {
644+
let db = ctx.db_client().connection().await;
645+
646+
let collector_config_result = db.get_collector_config("collector-1").await;
647+
648+
assert!(collector_config_result.is_err());
649+
650+
Ok(ctx)
651+
})
652+
.await;
653+
}
654+
655+
#[tokio::test]
656+
async fn dequeue_benchmark_job_empty_queue() {
657+
run_postgres_test(|ctx| async {
658+
let db = ctx.db_client().connection().await;
659+
660+
let benchmark_job_result = db
661+
.dequeue_benchmark_job(
662+
"collector-1",
663+
&Target::X86_64UnknownLinuxGnu,
664+
&BenchmarkSet(420),
665+
)
666+
.await;
667+
668+
assert!(benchmark_job_result.is_ok());
669+
assert!(benchmark_job_result.unwrap().is_none());
670+
671+
Ok(ctx)
672+
})
673+
.await;
674+
}
628675
}

0 commit comments

Comments
 (0)