Skip to content

Commit 29090d0

Browse files
authored
Merge pull request #2230 from Kobzol/collector-sha-check
Check latest commit SHA in the collector
2 parents bf2ee34 + a13a464 commit 29090d0

File tree

5 files changed

+143
-34
lines changed

5 files changed

+143
-34
lines changed

collector/src/bin/collector.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use collector::toolchain::{
5959
};
6060
use collector::utils::cachegrind::cachegrind_diff;
6161
use collector::utils::{is_installed, wait_for_future};
62-
use collector::{utils, CollectorCtx, CollectorStepBuilder};
62+
use collector::{command_output, utils, CollectorCtx, CollectorStepBuilder};
6363
use database::{
6464
ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, CollectorConfig, Commit,
6565
CommitType, Connection, Pool,
@@ -708,6 +708,16 @@ enum Commands {
708708
#[arg(long)]
709709
collector_name: String,
710710

711+
/// Git SHA of the commit that the collector is currently on.
712+
/// If not present, the collector will attempt to figure it out from git directly.
713+
#[arg(long)]
714+
git_sha: Option<String>,
715+
716+
/// Periodically check if the collector's commit SHA matches the commit SHA of the
717+
/// rustc-perf repository.
718+
#[arg(long)]
719+
check_git_sha: bool,
720+
711721
#[command(flatten)]
712722
db: DbOption,
713723
},
@@ -1352,17 +1362,34 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
13521362
Ok(0)
13531363
}
13541364

1355-
Commands::BenchmarkJobQueue { collector_name, db } => {
1365+
Commands::BenchmarkJobQueue {
1366+
collector_name,
1367+
git_sha,
1368+
check_git_sha,
1369+
db,
1370+
} => {
13561371
log_db(&db);
13571372

1373+
let git_sha = match git_sha {
1374+
Some(sha) => sha,
1375+
None => {
1376+
let mut cmd = Command::new("git");
1377+
cmd.args(["rev-parse", "HEAD"]);
1378+
let stdout = command_output(&mut cmd)
1379+
.context("Cannot determine current commit SHA")?
1380+
.stdout;
1381+
String::from_utf8(stdout).unwrap().trim().to_string()
1382+
}
1383+
};
1384+
13581385
let pool = Pool::open(&db.db);
13591386
let rt = build_async_runtime();
13601387
let conn = rt.block_on(pool.connection());
13611388

13621389
// Obtain the configuration and validate that it matches the
13631390
// collector's host target
13641391
let collector_config = rt
1365-
.block_on(conn.get_collector_config(&collector_name))?
1392+
.block_on(conn.start_collector(&collector_name, &git_sha))?
13661393
.ok_or_else(|| {
13671394
anyhow::anyhow!("Collector with name `{collector_name}` not found")
13681395
})?;
@@ -1374,6 +1401,13 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
13741401
));
13751402
}
13761403

1404+
log::info!(
1405+
"Starting collector with target {}, benchmark set {} and commit {}",
1406+
collector_config.target(),
1407+
collector_config.benchmark_set().get_id(),
1408+
collector_config.commit_sha().expect("missing commit SHA")
1409+
);
1410+
13771411
let benchmarks =
13781412
get_compile_benchmarks(&compile_benchmark_dir, CompileBenchmarkFilter::All)?;
13791413

@@ -1382,6 +1416,7 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
13821416
conn,
13831417
&collector_config,
13841418
benchmarks,
1419+
check_git_sha,
13851420
))?;
13861421

13871422
Ok(0)
@@ -1397,10 +1432,10 @@ async fn run_job_queue_benchmarks(
13971432
mut conn: Box<dyn Connection>,
13981433
collector: &CollectorConfig,
13991434
all_compile_benchmarks: Vec<Benchmark>,
1435+
check_git_sha: bool,
14001436
) -> anyhow::Result<()> {
1401-
conn.update_collector_heartbeat(collector.name()).await?;
1437+
let mut last_request_tag = None;
14021438

1403-
// TODO: check collector SHA vs site SHA
14041439
while let Some((benchmark_job, artifact_id)) = conn
14051440
.dequeue_benchmark_job(
14061441
collector.name(),
@@ -1409,6 +1444,22 @@ async fn run_job_queue_benchmarks(
14091444
)
14101445
.await?
14111446
{
1447+
// Here we check if we should update our commit SHA, if rustc-perf has been updated.
1448+
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
1449+
// to avoid changing code in the middle of benchmarking the same request.
1450+
// Note that if an update happens, the job that we have just dequeued will have its deque
1451+
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
1452+
// it will be dequeued again when the collector starts again.
1453+
if check_git_sha
1454+
&& last_request_tag.is_some()
1455+
&& last_request_tag.as_deref() != Some(benchmark_job.request_tag())
1456+
&& needs_git_update(collector)
1457+
{
1458+
log::warn!("Exiting collector to update itself from git.");
1459+
return Ok(());
1460+
}
1461+
last_request_tag = Some(benchmark_job.request_tag().to_string());
1462+
14121463
log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
14131464
let result = run_benchmark_job(
14141465
conn.as_mut(),
@@ -1472,6 +1523,39 @@ async fn run_job_queue_benchmarks(
14721523
Ok(())
14731524
}
14741525

1526+
/// Returns true if the commit SHA of collector does not match the latest commit SHA of the master
1527+
/// branch of https://github.com/rust-lang/rustc-perf.
1528+
fn needs_git_update(collector: &CollectorConfig) -> bool {
1529+
let Some(commit_sha) = collector.commit_sha() else {
1530+
return false;
1531+
};
1532+
1533+
let mut cmd = Command::new("git");
1534+
cmd.arg("ls-remote")
1535+
.arg("https://github.com/rust-lang/rustc-perf")
1536+
.arg("HEAD");
1537+
let upstream_sha = match command_output(&mut cmd) {
1538+
Ok(output) => String::from_utf8(output.stdout)
1539+
.unwrap()
1540+
.split_whitespace()
1541+
.next()
1542+
.unwrap()
1543+
.to_string(),
1544+
Err(error) => {
1545+
log::error!("Cannot determine latest SHA of rustc-perf: {error:?}");
1546+
return false;
1547+
}
1548+
};
1549+
if commit_sha != upstream_sha {
1550+
log::warn!(
1551+
"Commit {commit_sha} of collector is outdated, latest commit is {upstream_sha}."
1552+
);
1553+
true
1554+
} else {
1555+
false
1556+
}
1557+
}
1558+
14751559
/// Error that happened during benchmarking of a job.
14761560
enum BenchmarkJobError {
14771561
/// The error is non-recoverable.

database/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,8 @@ pub struct CollectorConfig {
12041204
is_active: bool,
12051205
last_heartbeat_at: DateTime<Utc>,
12061206
date_added: DateTime<Utc>,
1207+
/// The commit SHA of `rustc-perf` that the collector currently has checked out.
1208+
commit_sha: Option<String>,
12071209
}
12081210

12091211
impl CollectorConfig {
@@ -1230,6 +1232,10 @@ impl CollectorConfig {
12301232
pub fn date_added(&self) -> DateTime<Utc> {
12311233
self.date_added
12321234
}
1235+
1236+
pub fn commit_sha(&self) -> Option<&str> {
1237+
self.commit_sha.as_deref()
1238+
}
12331239
}
12341240

12351241
/// The data that can be retrived from the database directly to populate the

database/src/pool.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,13 @@ pub trait Connection: Send + Sync {
244244
is_active: bool,
245245
) -> anyhow::Result<CollectorConfig>;
246246

247-
/// Get the configuration for a collector by its name.
248-
async fn get_collector_config(
247+
/// Call this function when a job queue collector starts.
248+
/// It ensures that a collector with the given name exists, updates its commit SHA and heartbeat
249+
/// and returns its collector config.
250+
async fn start_collector(
249251
&self,
250252
collector_name: &str,
253+
commit_sha: &str,
251254
) -> anyhow::Result<Option<CollectorConfig>>;
252255

253256
/// Dequeues a single job for the given collector, target and benchmark set.
@@ -788,7 +791,7 @@ mod tests {
788791
run_postgres_test(|ctx| async {
789792
let db = ctx.db_client().connection().await;
790793

791-
let collector_config_result = db.get_collector_config("collector-1").await.unwrap();
794+
let collector_config_result = db.start_collector("collector-1", "foo").await.unwrap();
792795

793796
assert!(collector_config_result.is_none());
794797

@@ -802,19 +805,20 @@ mod tests {
802805
run_postgres_test(|ctx| async {
803806
let db = ctx.db_client().connection().await;
804807

805-
let inserted_config = db
808+
let mut inserted_config = db
806809
.add_collector_config("collector-1", Target::X86_64UnknownLinuxGnu, 1, true)
807810
.await
808811
.unwrap();
809812

810813
let config = db
811-
.get_collector_config("collector-1")
814+
.start_collector("collector-1", "foo")
812815
.await
813816
.unwrap()
814817
.expect("collector config not found");
815818

816-
// What we entered into the database should be identical to what is
817-
// returned from the database
819+
inserted_config.commit_sha = Some("foo".to_string());
820+
inserted_config.last_heartbeat_at = config.last_heartbeat_at;
821+
818822
assert_eq!(inserted_config, config);
819823
Ok(ctx)
820824
})

database/src/pool/postgres.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ static MIGRATIONS: &[&str] = &[
384384
r#"
385385
ALTER TABLE benchmark_request ADD COLUMN duration_ms INTEGER NULL;
386386
"#,
387+
r#"
388+
ALTER TABLE collector_config ADD COLUMN commit_sha TEXT NULL;
389+
"#,
387390
];
388391

389392
#[async_trait::async_trait]
@@ -1732,28 +1735,33 @@ where
17321735
is_active,
17331736
last_heartbeat_at: row.get::<_, DateTime<Utc>>(0),
17341737
date_added: row.get::<_, DateTime<Utc>>(1),
1738+
commit_sha: None,
17351739
};
17361740
Ok(collector_config)
17371741
}
17381742

1739-
async fn get_collector_config(
1743+
async fn start_collector(
17401744
&self,
17411745
collector_name: &str,
1746+
commit_sha: &str,
17421747
) -> anyhow::Result<Option<CollectorConfig>> {
17431748
let row = self
17441749
.conn()
17451750
.query_opt(
1746-
"SELECT
1751+
"
1752+
UPDATE collector_config
1753+
SET
1754+
last_heartbeat_at = NOW(),
1755+
commit_sha = $2
1756+
WHERE
1757+
name = $1
1758+
RETURNING
17471759
target,
17481760
benchmark_set,
17491761
is_active,
17501762
last_heartbeat_at,
1751-
date_added
1752-
FROM
1753-
collector_config
1754-
WHERE
1755-
name = $1;",
1756-
&[&collector_name],
1763+
date_added",
1764+
&[&collector_name, &commit_sha],
17571765
)
17581766
.await?;
17591767

@@ -1767,6 +1775,7 @@ where
17671775
is_active: row.get::<_, bool>(2),
17681776
last_heartbeat_at: row.get::<_, DateTime<Utc>>(3),
17691777
date_added: row.get::<_, DateTime<Utc>>(4),
1778+
commit_sha: Some(commit_sha.to_string()),
17701779
})
17711780
})
17721781
.transpose()?)
@@ -1803,6 +1812,7 @@ where
18031812
WHEN status = $1 THEN 1
18041813
ELSE 2
18051814
END,
1815+
request_tag,
18061816
created_at
18071817
LIMIT 1
18081818
FOR UPDATE SKIP LOCKED
@@ -2179,25 +2189,29 @@ where
21792189
benchmark_set,
21802190
is_active,
21812191
last_heartbeat_at,
2182-
date_added
2192+
date_added,
2193+
commit_sha
21832194
FROM
21842195
collector_config;",
21852196
&[],
21862197
)
21872198
.await?;
21882199

2189-
let mut configs = vec![];
2190-
for row in rows {
2191-
let config = CollectorConfig {
2192-
name: row.get::<_, String>(0),
2193-
target: Target::from_str(row.get::<_, &str>(1)).map_err(|e| anyhow::anyhow!(e))?,
2194-
benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32),
2195-
is_active: row.get::<_, bool>(3),
2196-
last_heartbeat_at: row.get::<_, DateTime<Utc>>(4),
2197-
date_added: row.get::<_, DateTime<Utc>>(5),
2198-
};
2199-
configs.push(config);
2200-
}
2200+
let configs = rows
2201+
.into_iter()
2202+
.map(|row| {
2203+
Ok(CollectorConfig {
2204+
name: row.get::<_, String>(0),
2205+
target: Target::from_str(row.get::<_, &str>(1))
2206+
.map_err(|e| anyhow::anyhow!(e))?,
2207+
benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32),
2208+
is_active: row.get::<_, bool>(3),
2209+
last_heartbeat_at: row.get::<_, DateTime<Utc>>(4),
2210+
date_added: row.get::<_, DateTime<Utc>>(5),
2211+
commit_sha: row.get::<_, Option<String>>(6),
2212+
})
2213+
})
2214+
.collect::<anyhow::Result<Vec<_>>>()?;
22012215

22022216
Ok(configs)
22032217
}

database/src/pool/sqlite.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1332,9 +1332,10 @@ impl Connection for SqliteConnection {
13321332
.collect::<Result<_, _>>()?)
13331333
}
13341334

1335-
async fn get_collector_config(
1335+
async fn start_collector(
13361336
&self,
13371337
_collector_name: &str,
1338+
_commit_sha: &str,
13381339
) -> anyhow::Result<Option<CollectorConfig>> {
13391340
no_queue_implementation_abort!()
13401341
}

0 commit comments

Comments
 (0)