Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 89 additions & 5 deletions collector/src/bin/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use collector::toolchain::{
};
use collector::utils::cachegrind::cachegrind_diff;
use collector::utils::{is_installed, wait_for_future};
use collector::{utils, CollectorCtx, CollectorStepBuilder};
use collector::{command_output, utils, CollectorCtx, CollectorStepBuilder};
use database::{
ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, CollectorConfig, Commit,
CommitType, Connection, Pool,
Expand Down Expand Up @@ -708,6 +708,16 @@ enum Commands {
#[arg(long)]
collector_name: String,

/// Git SHA of the commit that the collector is currently on.
/// If not present, the collector will attempt to figure it out from git directly.
#[arg(long)]
git_sha: Option<String>,

/// Periodically check if the collector's commit SHA matches the commit SHA of the
/// rustc-perf repository.
#[arg(long)]
check_git_sha: bool,

#[command(flatten)]
db: DbOption,
},
Expand Down Expand Up @@ -1352,17 +1362,34 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
Ok(0)
}

Commands::BenchmarkJobQueue { collector_name, db } => {
Commands::BenchmarkJobQueue {
collector_name,
git_sha,
check_git_sha,
db,
} => {
log_db(&db);

let git_sha = match git_sha {
Some(sha) => sha,
None => {
let mut cmd = Command::new("git");
cmd.args(["rev-parse", "HEAD"]);
let stdout = command_output(&mut cmd)
.context("Cannot determine current commit SHA")?
.stdout;
String::from_utf8(stdout).unwrap().trim().to_string()
}
};

let pool = Pool::open(&db.db);
let rt = build_async_runtime();
let conn = rt.block_on(pool.connection());

// Obtain the configuration and validate that it matches the
// collector's host target
let collector_config = rt
.block_on(conn.get_collector_config(&collector_name))?
.block_on(conn.start_collector(&collector_name, &git_sha))?
.ok_or_else(|| {
anyhow::anyhow!("Collector with name `{collector_name}` not found")
})?;
Expand All @@ -1374,6 +1401,13 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
));
}

log::info!(
"Starting collector with target {}, benchmark set {} and commit {}",
collector_config.target(),
collector_config.benchmark_set().get_id(),
collector_config.commit_sha().expect("missing commit SHA")
);

let benchmarks =
get_compile_benchmarks(&compile_benchmark_dir, CompileBenchmarkFilter::All)?;

Expand All @@ -1382,6 +1416,7 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
conn,
&collector_config,
benchmarks,
check_git_sha,
))?;

Ok(0)
Expand All @@ -1397,10 +1432,10 @@ async fn run_job_queue_benchmarks(
mut conn: Box<dyn Connection>,
collector: &CollectorConfig,
all_compile_benchmarks: Vec<Benchmark>,
check_git_sha: bool,
) -> anyhow::Result<()> {
conn.update_collector_heartbeat(collector.name()).await?;
let mut last_request_tag = None;

// TODO: check collector SHA vs site SHA
while let Some((benchmark_job, artifact_id)) = conn
.dequeue_benchmark_job(
collector.name(),
Expand All @@ -1409,6 +1444,22 @@ async fn run_job_queue_benchmarks(
)
.await?
{
// Here we check if we should update our commit SHA, if rustc-perf has been updated.
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
// to avoid changing code in the middle of benchmarking the same request.
// Note that if an update happens, the job that we have just dequeued will have its deque
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
// it will be dequeued again when the collector starts again.
if check_git_sha
&& last_request_tag.is_some()
&& last_request_tag.as_deref() != Some(benchmark_job.request_tag())
&& needs_git_update(collector)
{
log::warn!("Exiting collector to update itself from git.");
return Ok(());
}
last_request_tag = Some(benchmark_job.request_tag().to_string());

log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
let result = run_benchmark_job(
conn.as_mut(),
Expand Down Expand Up @@ -1472,6 +1523,39 @@ async fn run_job_queue_benchmarks(
Ok(())
}

/// Returns true if the commit SHA of collector does not match the latest commit SHA of the master
/// branch of https://github.com/rust-lang/rustc-perf.
fn needs_git_update(collector: &CollectorConfig) -> bool {
let Some(commit_sha) = collector.commit_sha() else {
return false;
};

let mut cmd = Command::new("git");
cmd.arg("ls-remote")
.arg("https://github.com/rust-lang/rustc-perf")
.arg("HEAD");
let upstream_sha = match command_output(&mut cmd) {
Ok(output) => String::from_utf8(output.stdout)
.unwrap()
.split_whitespace()
.next()
.unwrap()
.to_string(),
Err(error) => {
log::error!("Cannot determine latest SHA of rustc-perf: {error:?}");
return false;
}
};
if commit_sha != upstream_sha {
log::warn!(
"Commit {commit_sha} of collector is outdated, latest commit is {upstream_sha}."
);
true
} else {
false
}
}

/// Error that happened during benchmarking of a job.
enum BenchmarkJobError {
/// The error is non-recoverable.
Expand Down
6 changes: 6 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,8 @@ pub struct CollectorConfig {
is_active: bool,
last_heartbeat_at: DateTime<Utc>,
date_added: DateTime<Utc>,
/// The commit SHA of `rustc-perf` that the collector currently has checked out.
commit_sha: Option<String>,
}

impl CollectorConfig {
Expand All @@ -1230,6 +1232,10 @@ impl CollectorConfig {
pub fn date_added(&self) -> DateTime<Utc> {
self.date_added
}

pub fn commit_sha(&self) -> Option<&str> {
self.commit_sha.as_deref()
}
}

/// The data that can be retrived from the database directly to populate the
Expand Down
18 changes: 11 additions & 7 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,13 @@ pub trait Connection: Send + Sync {
is_active: bool,
) -> anyhow::Result<CollectorConfig>;

/// Get the configuration for a collector by its name.
async fn get_collector_config(
/// Call this function when a job queue collector starts.
/// It ensures that a collector with the given name exists, updates its commit SHA and heartbeat
/// and returns its collector config.
async fn start_collector(
&self,
collector_name: &str,
commit_sha: &str,
) -> anyhow::Result<Option<CollectorConfig>>;

/// Dequeues a single job for the given collector, target and benchmark set.
Expand Down Expand Up @@ -788,7 +791,7 @@ mod tests {
run_postgres_test(|ctx| async {
let db = ctx.db_client().connection().await;

let collector_config_result = db.get_collector_config("collector-1").await.unwrap();
let collector_config_result = db.start_collector("collector-1", "foo").await.unwrap();

assert!(collector_config_result.is_none());

Expand All @@ -802,19 +805,20 @@ mod tests {
run_postgres_test(|ctx| async {
let db = ctx.db_client().connection().await;

let inserted_config = db
let mut inserted_config = db
.add_collector_config("collector-1", Target::X86_64UnknownLinuxGnu, 1, true)
.await
.unwrap();

let config = db
.get_collector_config("collector-1")
.start_collector("collector-1", "foo")
.await
.unwrap()
.expect("collector config not found");

// What we entered into the database should be identical to what is
// returned from the database
inserted_config.commit_sha = Some("foo".to_string());
inserted_config.last_heartbeat_at = config.last_heartbeat_at;

assert_eq!(inserted_config, config);
Ok(ctx)
})
Expand Down
56 changes: 35 additions & 21 deletions database/src/pool/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ static MIGRATIONS: &[&str] = &[
r#"
ALTER TABLE benchmark_request ADD COLUMN duration_ms INTEGER NULL;
"#,
r#"
ALTER TABLE collector_config ADD COLUMN commit_sha TEXT NULL;
"#,
];

#[async_trait::async_trait]
Expand Down Expand Up @@ -1732,28 +1735,33 @@ where
is_active,
last_heartbeat_at: row.get::<_, DateTime<Utc>>(0),
date_added: row.get::<_, DateTime<Utc>>(1),
commit_sha: None,
};
Ok(collector_config)
}

async fn get_collector_config(
async fn start_collector(
&self,
collector_name: &str,
commit_sha: &str,
) -> anyhow::Result<Option<CollectorConfig>> {
let row = self
.conn()
.query_opt(
"SELECT
"
UPDATE collector_config
SET
last_heartbeat_at = NOW(),
commit_sha = $2
WHERE
name = $1
RETURNING
target,
benchmark_set,
is_active,
last_heartbeat_at,
date_added
FROM
collector_config
WHERE
name = $1;",
&[&collector_name],
date_added",
&[&collector_name, &commit_sha],
)
.await?;

Expand All @@ -1767,6 +1775,7 @@ where
is_active: row.get::<_, bool>(2),
last_heartbeat_at: row.get::<_, DateTime<Utc>>(3),
date_added: row.get::<_, DateTime<Utc>>(4),
commit_sha: Some(commit_sha.to_string()),
})
})
.transpose()?)
Expand Down Expand Up @@ -1803,6 +1812,7 @@ where
WHEN status = $1 THEN 1
ELSE 2
END,
request_tag,
created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
Expand Down Expand Up @@ -2179,25 +2189,29 @@ where
benchmark_set,
is_active,
last_heartbeat_at,
date_added
date_added,
commit_sha
FROM
collector_config;",
&[],
)
.await?;

let mut configs = vec![];
for row in rows {
let config = CollectorConfig {
name: row.get::<_, String>(0),
target: Target::from_str(row.get::<_, &str>(1)).map_err(|e| anyhow::anyhow!(e))?,
benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32),
is_active: row.get::<_, bool>(3),
last_heartbeat_at: row.get::<_, DateTime<Utc>>(4),
date_added: row.get::<_, DateTime<Utc>>(5),
};
configs.push(config);
}
let configs = rows
.into_iter()
.map(|row| {
Ok(CollectorConfig {
name: row.get::<_, String>(0),
target: Target::from_str(row.get::<_, &str>(1))
.map_err(|e| anyhow::anyhow!(e))?,
benchmark_set: BenchmarkSet(row.get::<_, i32>(2) as u32),
is_active: row.get::<_, bool>(3),
last_heartbeat_at: row.get::<_, DateTime<Utc>>(4),
date_added: row.get::<_, DateTime<Utc>>(5),
commit_sha: row.get::<_, Option<String>>(6),
})
})
.collect::<anyhow::Result<Vec<_>>>()?;

Ok(configs)
}
Expand Down
3 changes: 2 additions & 1 deletion database/src/pool/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,9 +1332,10 @@ impl Connection for SqliteConnection {
.collect::<Result<_, _>>()?)
}

async fn get_collector_config(
async fn start_collector(
&self,
_collector_name: &str,
_commit_sha: &str,
) -> anyhow::Result<Option<CollectorConfig>> {
no_queue_implementation_abort!()
}
Expand Down
Loading