Skip to content

Commit 57fb0f5

Browse files
committed
Most of the scaffolding for dequeuing a job from a collector
1 parent 98ca3ce commit 57fb0f5

File tree

8 files changed

+389
-10
lines changed

8 files changed

+389
-10
lines changed

collector/src/bin/collector.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use collector::artifact_stats::{
3535
use collector::codegen::{codegen_diff, CodegenType};
3636
use collector::compile::benchmark::category::Category;
3737
use collector::compile::benchmark::codegen_backend::CodegenBackend;
38+
use collector::compile::benchmark::collector_config::CollectorConfig;
3839
use collector::compile::benchmark::profile::Profile;
3940
use collector::compile::benchmark::scenario::Scenario;
4041
use collector::compile::benchmark::target::Target;
@@ -666,6 +667,19 @@ enum Commands {
666667
/// The name of the modified artifact to be compared.
667668
modified: Option<String>,
668669
},
670+
671+
/// Polls the job queue for work to benchmark
672+
DequeueJob {
673+
/// The unique identifier for the collector
674+
#[arg(long)]
675+
collector_name: String,
676+
677+
#[arg(long)]
678+
target: String,
679+
680+
#[command(flatten)]
681+
db: DbOption,
682+
},
669683
}
670684

671685
#[derive(Debug, clap::Parser)]
@@ -1266,6 +1280,44 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
12661280
rt.block_on(compare_artifacts(conn, metric, base, modified))?;
12671281
Ok(0)
12681282
}
1283+
1284+
Commands::DequeueJob {
1285+
collector_name,
1286+
db,
1287+
target,
1288+
} => {
1289+
let pool = Pool::open(&db.db);
1290+
let rt = build_async_runtime();
1291+
let conn = rt.block_on(pool.connection());
1292+
1293+
// Obtain the configuration and validate that it matches the
1294+
// collector's setup
1295+
let collector_config: CollectorConfig = rt
1296+
.block_on(conn.get_collector_config(&collector_name))?
1297+
.into();
1298+
1299+
let collector_target = collector_config.target();
1300+
if collector_target.as_str() != target {
1301+
panic!(
1302+
"Mismatching target for collector expected `{}` got `{}`",
1303+
collector_target, target
1304+
);
1305+
}
1306+
1307+
// Dequeue a job
1308+
let benchmark_job = rt.block_on(conn.dequeue_benchmark_job(
1309+
&collector_name,
1310+
&collector_config.target().to_db_target(),
1311+
collector_config.benchmark_set(),
1312+
))?;
1313+
1314+
if let Some(benchmark_job) = benchmark_job {
1315+
// TODO; process the job
1316+
println!("{:?}", benchmark_job);
1317+
}
1318+
1319+
Ok(0)
1320+
}
12691321
}
12701322
}
12711323

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use chrono::{DateTime, Utc};
2+
use database::BenchmarkSet;
3+
4+
use super::target::Target;
5+
6+
pub struct CollectorConfig {
7+
name: String,
8+
target: Target,
9+
benchmark_set: BenchmarkSet,
10+
is_active: bool,
11+
last_heartbeat_at: DateTime<Utc>,
12+
date_added: DateTime<Utc>,
13+
}
14+
15+
impl CollectorConfig {
16+
pub fn name(&self) -> &str {
17+
&self.name
18+
}
19+
20+
pub fn target(&self) -> &Target {
21+
&self.target
22+
}
23+
24+
pub fn benchmark_set(&self) -> &BenchmarkSet {
25+
&self.benchmark_set
26+
}
27+
28+
pub fn is_active(&self) -> bool {
29+
self.is_active
30+
}
31+
32+
pub fn last_heartbeat_at(&self) -> DateTime<Utc> {
33+
self.last_heartbeat_at
34+
}
35+
36+
pub fn date_added(&self) -> DateTime<Utc> {
37+
self.date_added
38+
}
39+
}
40+
41+
impl From<database::CollectorConfig> for CollectorConfig {
42+
fn from(value: database::CollectorConfig) -> Self {
43+
CollectorConfig {
44+
name: value.name().into(),
45+
target: Target::from_db_target(value.target()),
46+
benchmark_set: value.benchmark_set().clone(),
47+
is_active: value.is_active(),
48+
last_heartbeat_at: value.last_heartbeat_at(),
49+
date_added: value.date_added(),
50+
}
51+
}
52+
}

collector/src/compile/benchmark/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tempfile::TempDir;
1919

2020
pub mod category;
2121
pub mod codegen_backend;
22+
pub mod collector_config;
2223
pub(crate) mod patch;
2324
pub mod profile;
2425
pub mod scenario;

collector/src/compile/benchmark/target.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::{fmt, str::FromStr};
2+
13
/// Target representing an Rust target triple, for a full list of targets and
24
/// their support see;
35
/// https://doc.rust-lang.org/nightly/rustc/platform-support.html
@@ -15,6 +17,22 @@ impl Default for Target {
1517
}
1618
}
1719

20+
impl FromStr for Target {
21+
type Err = String;
22+
fn from_str(s: &str) -> Result<Self, Self::Err> {
23+
Ok(match s.to_ascii_lowercase().as_str() {
24+
"x86_64-unknown-linux-gnu" => Target::X86_64UnknownLinuxGnu,
25+
_ => return Err(format!("{} is not a valid target", s)),
26+
})
27+
}
28+
}
29+
30+
impl fmt::Display for Target {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
write!(f, "{}", self.as_str())
33+
}
34+
}
35+
1836
impl Target {
1937
pub fn all() -> Vec<Self> {
2038
vec![Self::X86_64UnknownLinuxGnu]
@@ -29,6 +47,26 @@ impl From<database::Target> for Target {
2947
}
3048
}
3149

50+
impl Target {
51+
pub fn as_str(self) -> &'static str {
52+
match self {
53+
Target::X86_64UnknownLinuxGnu => "x86_64-unknown-linux-gnu",
54+
}
55+
}
56+
57+
pub fn from_db_target(target: &database::Target) -> Target {
58+
match target {
59+
database::Target::X86_64UnknownLinuxGnu => Self::X86_64UnknownLinuxGnu,
60+
}
61+
}
62+
63+
pub fn to_db_target(&self) -> database::Target {
64+
match self {
65+
Target::X86_64UnknownLinuxGnu => database::Target::X86_64UnknownLinuxGnu,
66+
}
67+
}
68+
}
69+
3270
impl From<Target> for database::Target {
3371
fn from(value: Target) -> Self {
3472
match value {

database/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,10 +1054,12 @@ pub enum BenchmarkJobStatus {
10541054
Queued,
10551055
InProgress {
10561056
started_at: DateTime<Utc>,
1057+
collector_name: String,
10571058
},
10581059
Completed {
10591060
started_at: DateTime<Utc>,
10601061
completed_at: DateTime<Utc>,
1062+
collector_name: String,
10611063
success: bool,
10621064
},
10631065
}
@@ -1109,3 +1111,39 @@ pub struct BenchmarkJob {
11091111
status: BenchmarkJobStatus,
11101112
retry: u32,
11111113
}
1114+
1115+
/// The configuration for a collector
1116+
pub struct CollectorConfig {
1117+
name: String,
1118+
target: Target,
1119+
benchmark_set: BenchmarkSet,
1120+
is_active: bool,
1121+
last_heartbeat_at: DateTime<Utc>,
1122+
date_added: DateTime<Utc>,
1123+
}
1124+
1125+
impl CollectorConfig {
1126+
pub fn name(&self) -> &str {
1127+
&self.name
1128+
}
1129+
1130+
pub fn target(&self) -> &Target {
1131+
&self.target
1132+
}
1133+
1134+
pub fn benchmark_set(&self) -> &BenchmarkSet {
1135+
&self.benchmark_set
1136+
}
1137+
1138+
pub fn is_active(&self) -> bool {
1139+
self.is_active
1140+
}
1141+
1142+
pub fn last_heartbeat_at(&self) -> DateTime<Utc> {
1143+
self.last_heartbeat_at
1144+
}
1145+
1146+
pub fn date_added(&self) -> DateTime<Utc> {
1147+
self.date_added
1148+
}
1149+
}

database/src/pool.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::selector::CompileTestCase;
22
use crate::{
3-
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex,
4-
BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target,
3+
ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkRequest,
4+
BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkSet, CodegenBackend, CollectorConfig,
5+
CompileBenchmark, Target,
56
};
67
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
78
use chrono::{DateTime, Utc};
@@ -232,6 +233,17 @@ pub trait Connection: Send + Sync {
232233
&self,
233234
artifact_row_id: &ArtifactIdNumber,
234235
) -> anyhow::Result<HashSet<CompileTestCase>>;
236+
237+
/// Get the confiuguration for a collector by the name of the collector
238+
async fn get_collector_config(&self, collector_name: &str) -> anyhow::Result<CollectorConfig>;
239+
240+
/// Get the confiuguration for a collector by the name of the collector
241+
async fn dequeue_benchmark_job(
242+
&self,
243+
collector_name: &str,
244+
target: &Target,
245+
benchmark_set: &BenchmarkSet,
246+
) -> anyhow::Result<Option<BenchmarkJob>>;
235247
}
236248

237249
#[async_trait::async_trait]
@@ -690,6 +702,39 @@ mod tests {
690702
.await
691703
.unwrap()
692704
.is_empty());
705+
Ok(ctx)
706+
})
707+
.await;
708+
}
709+
710+
async fn get_collector_config_error_if_not_exist() {
711+
run_postgres_test(|ctx| async {
712+
let db = ctx.db_client().connection().await;
713+
714+
let collector_config_result = db.get_collector_config("collector-1").await;
715+
716+
assert!(collector_config_result.is_err());
717+
718+
Ok(ctx)
719+
})
720+
.await;
721+
}
722+
723+
#[tokio::test]
724+
async fn dequeue_benchmark_job_empty_queue() {
725+
run_postgres_test(|ctx| async {
726+
let db = ctx.db_client().connection().await;
727+
728+
let benchmark_job_result = db
729+
.dequeue_benchmark_job(
730+
"collector-1",
731+
&Target::X86_64UnknownLinuxGnu,
732+
&BenchmarkSet(420),
733+
)
734+
.await;
735+
736+
assert!(benchmark_job_result.is_ok());
737+
assert!(benchmark_job_result.unwrap().is_none());
693738

694739
Ok(ctx)
695740
})

0 commit comments

Comments
 (0)