Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 7 additions & 36 deletions collector/src/bin/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,11 @@ struct BenchRustcOption {
bench_rustc: bool,
}

#[derive(Clone, Debug, clap::ValueEnum)]
enum PurgeMode {
/// Purge all old data associated with the artifact
Old,
/// Purge old data of failed benchmarks associated with the artifact
Failed,
}

#[derive(Debug, clap::Args)]
struct PurgeOption {
/// Removes old data for the specified artifact prior to running the benchmarks.
#[arg(long = "purge")]
purge: Option<PurgeMode>,
purge: bool,
}

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -872,7 +864,9 @@ fn main_result() -> anyhow::Result<i32> {
r#type: CommitType::Master,
});

rt.block_on(purge_old_data(conn.as_mut(), &artifact_id, purge.purge));
if purge.purge {
rt.block_on(conn.purge_artifact(&artifact_id));
}

let runtime_suite = rt.block_on(load_runtime_benchmarks(
conn.as_mut(),
Expand Down Expand Up @@ -1028,7 +1022,9 @@ fn main_result() -> anyhow::Result<i32> {

let rt = build_async_runtime();
let mut conn = rt.block_on(pool.connection());
rt.block_on(purge_old_data(conn.as_mut(), &artifact_id, purge.purge));
if purge.purge {
rt.block_on(conn.purge_artifact(&artifact_id));
}

let shared = SharedBenchmarkConfig {
toolchain,
Expand Down Expand Up @@ -2045,28 +2041,6 @@ fn log_db(db_option: &DbOption) {
println!("Using database `{}`", db_option.db);
}

async fn purge_old_data(
conn: &mut dyn Connection,
artifact_id: &ArtifactId,
purge_mode: Option<PurgeMode>,
) {
match purge_mode {
Some(PurgeMode::Old) => {
// Delete everything associated with the artifact
conn.purge_artifact(artifact_id).await;
}
Some(PurgeMode::Failed) => {
// Delete all benchmarks that have an error for the given artifact
let artifact_row_id = conn.artifact_id(artifact_id).await;
let errors = conn.get_error(artifact_row_id).await;
for krate in errors.keys() {
conn.collector_remove_step(artifact_row_id, krate).await;
}
}
None => {}
}
}

/// Record a collection entry into the database, specifying which benchmark steps will be executed.
async fn init_collection(
connection: &mut dyn Connection,
Expand Down Expand Up @@ -2231,8 +2205,6 @@ async fn bench_compile(
print_intro: &dyn Fn(),
measure: F,
) {
collector.start_compile_step(conn, benchmark_name).await;

let mut tx = conn.transaction().await;
let (supports_stable, category) = category.db_representation();
tx.conn()
Expand All @@ -2259,7 +2231,6 @@ async fn bench_compile(
)
.await;
};
collector.end_compile_step(tx.conn(), benchmark_name).await;
tx.commit().await.expect("committed");
}

Expand Down
82 changes: 28 additions & 54 deletions collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ mod self_profile;
pub mod toolchain;
pub mod utils;

use crate::compile::benchmark::{Benchmark, BenchmarkName};
use crate::compile::benchmark::Benchmark;
use crate::runtime::{BenchmarkGroup, BenchmarkSuite};
pub use crate::self_profile::{
LocalSelfProfileStorage, S3SelfProfileStorage, SelfProfileId, SelfProfileStorage,
};
use database::selector::CompileTestCase;
use database::selector::{CompileTestCase, RuntimeTestCase};
use database::{ArtifactId, ArtifactIdNumber, Connection};
use hashbrown::HashSet;
use process::Stdio;
Expand Down Expand Up @@ -342,27 +342,22 @@ impl CollectorStepBuilder {
) -> CollectorCtx {
// Make sure there is no observable time when the artifact ID is available
// but the in-progress steps are not.
let artifact_row_id = {
let mut tx = conn.transaction().await;
let artifact_row_id = tx.conn().artifact_id(artifact_id).await;
if self.job_id.is_none() {
tx.conn()
.collector_start(artifact_row_id, &self.steps)
.await;
}
tx.commit().await.unwrap();
artifact_row_id
};
let artifact_row_id = conn.artifact_id(artifact_id).await;
// Find out which tests cases were already computed
let measured_compile_test_cases = conn
.get_compile_test_cases_with_measurements(&artifact_row_id)
.await
.expect("cannot fetch measured compile test cases from DB");
let measured_runtime_benchmarks = conn
.get_runtime_benchmarks_with_measurements(&artifact_row_id)
.await
.expect("cannot fetch measured runtime benchmarks from DB");

CollectorCtx {
artifact_row_id,
artifact_id: artifact_id.clone(),
measured_compile_test_cases,
measured_runtime_benchmarks,
job_id: self.job_id,
}
}
Expand All @@ -372,53 +367,32 @@ impl CollectorStepBuilder {
pub struct CollectorCtx {
pub artifact_row_id: ArtifactIdNumber,
pub artifact_id: ArtifactId,
/// Which tests cases were already computed **before** this collection began?
/// Which compile test cases were already computed **before** this collection began?
pub measured_compile_test_cases: HashSet<CompileTestCase>,
/// Which runtime benchmarks were already computed **before** this collection began?
pub measured_runtime_benchmarks: HashSet<RuntimeTestCase>,
pub job_id: Option<u32>,
}

impl CollectorCtx {
pub fn is_from_job_queue(&self) -> bool {
self.job_id.is_some()
}

pub async fn start_compile_step(&self, conn: &dyn Connection, benchmark_name: &BenchmarkName) {
if !self.is_from_job_queue() {
conn.collector_start_step(self.artifact_row_id, &benchmark_name.0)
.await;
}
}

pub async fn end_compile_step(&self, conn: &dyn Connection, benchmark_name: &BenchmarkName) {
if !self.is_from_job_queue() {
conn.collector_end_step(self.artifact_row_id, &benchmark_name.0)
.await;
}
}

/// Starts a new runtime benchmark collector step.
/// If this step was already computed, returns None.
/// Otherwise returns Some(<name of step>).
pub async fn start_runtime_step(
/// Returns the names of benchmarks in the group that have not yet been measured
/// for the given target.
pub fn get_unmeasured_runtime_benchmarks<'a>(
&self,
conn: &dyn Connection,
group: &BenchmarkGroup,
) -> Option<String> {
let step_name = runtime_group_step_name(&group.name);
if self.is_from_job_queue() {
Some(step_name)
} else {
conn.collector_start_step(self.artifact_row_id, &step_name)
.await
.then_some(step_name)
}
}

pub async fn end_runtime_step(&self, conn: &dyn Connection, group: &BenchmarkGroup) {
if !self.is_from_job_queue() {
conn.collector_end_step(self.artifact_row_id, &runtime_group_step_name(&group.name))
.await;
}
group: &'a BenchmarkGroup,
target: database::Target,
) -> Vec<&'a str> {
group
.benchmark_names
.iter()
.filter(|name| {
!self.measured_runtime_benchmarks.contains(&RuntimeTestCase {
benchmark: database::Benchmark::from(name.as_str()),
target,
})
})
.map(|s| s.as_str())
.collect()
}
}

Expand Down
43 changes: 38 additions & 5 deletions collector/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::process::{Command, Stdio};
use anyhow::Context;
use thousands::Separable;

use benchlib::benchmark::passes_filter;
use benchlib::comm::messages::{BenchmarkMessage, BenchmarkResult, BenchmarkStats};
pub use benchmark::{
get_runtime_benchmark_groups, prepare_runtime_benchmark_suite, runtime_benchmark_dir,
Expand All @@ -14,6 +15,7 @@ pub use benchmark::{
};
use database::{ArtifactIdNumber, CollectionId, Connection};

use crate::runtime_group_step_name;
use crate::utils::git::get_rustc_perf_commit;
use crate::{command_output, CollectorCtx};

Expand All @@ -38,24 +40,56 @@ pub async fn bench_runtime(
iterations: u32,
target: Target,
) -> anyhow::Result<()> {
let filtered = suite.filtered_benchmark_count(&filter);
let db_target: database::Target = target.into();

// Compute the total number of benchmarks to run, excluding already-measured ones.
let filtered: u64 = suite
.groups
.iter()
.flat_map(|group| collector.get_unmeasured_runtime_benchmarks(group, db_target))
.filter(|name| passes_filter(name, &filter.exclude, &filter.include))
.count() as u64;
println!("Executing {filtered} benchmarks\n");

let rustc_perf_version = get_rustc_perf_commit();
let mut benchmark_index = 0;
for group in suite.groups {
let Some(step_name) = collector.start_runtime_step(conn, &group).await else {
let unmeasured = collector.get_unmeasured_runtime_benchmarks(&group, db_target);
if unmeasured.is_empty() {
eprintln!("skipping {} -- already benchmarked", group.name);
continue;
};
}

// Construct a filter that excludes already-measured benchmarks, combined with the
// user-provided filter.
let mut group_filter =
RuntimeBenchmarkFilter::new(filter.exclude.clone(), filter.include.clone());
for name in &group.benchmark_names {
if !unmeasured.contains(&name.as_str()) {
group_filter.exclude.push(name.clone());
}
}

// Check if any benchmarks remain after filtering
let has_benchmarks = group
.benchmark_names
.iter()
.any(|name| passes_filter(name, &group_filter.exclude, &group_filter.include));
if !has_benchmarks {
eprintln!("skipping {} -- already benchmarked", group.name);
continue;
}

let step_name = runtime_group_step_name(&group.name);

let mut tx = conn.transaction().await;

// Async block is used to easily capture all results, it basically simulates a `try` block.
// Extracting this into a separate function would be annoying, as there would be many
// parameters.
let result = async {
let messages = execute_runtime_benchmark_binary(&group.binary, &filter, iterations)?;
let messages =
execute_runtime_benchmark_binary(&group.binary, &group_filter, iterations)?;
for message in messages {
let message = message.map_err(|err| {
anyhow::anyhow!(
Expand Down Expand Up @@ -101,7 +135,6 @@ pub async fn bench_runtime(
.await;
};

collector.end_runtime_step(tx.conn(), &group).await;
tx.commit()
.await
.expect("Cannot commit runtime benchmark group results");
Expand Down
23 changes: 8 additions & 15 deletions database/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::selector::CompileTestCase;
use crate::selector::{CompileTestCase, RuntimeTestCase};
use crate::{
ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, BenchmarkJobKind,
BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestInsertResult, BenchmarkRequestStatus,
Expand Down Expand Up @@ -124,20 +124,6 @@ pub trait Connection: Send + Sync {
) -> Vec<Vec<Option<f64>>>;
async fn get_error(&self, artifact_row_id: ArtifactIdNumber) -> HashMap<String, String>;

// Collector status API

// TODO: these functions should be removed. The only useful user of them currently are runtime
// benchmarks, which should switch to something similar to
// `get_compile_test_cases_with_measurements`.
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]);

// Returns `true` if the step was started, i.e., it did not previously have
// an end. Otherwise returns false, indicating that we can skip it.
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool;
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);

async fn collector_remove_step(&self, aid: ArtifactIdNumber, step: &str);

/// Returns the SHA of the parent of the given SHA commit, if available.
async fn parent_of(&self, sha: &str) -> Option<String>;

Expand Down Expand Up @@ -222,6 +208,13 @@ pub trait Connection: Send + Sync {
artifact_row_id: &ArtifactIdNumber,
) -> anyhow::Result<HashSet<CompileTestCase>>;

/// Returns a set of runtime benchmark names that already have measurements
/// for the given artifact in the runtime_pstat table.
async fn get_runtime_benchmarks_with_measurements(
&self,
artifact_row_id: &ArtifactIdNumber,
) -> anyhow::Result<HashSet<RuntimeTestCase>>;

/// Add the confiuguration for a collector
async fn add_collector_config(
&self,
Expand Down
Loading
Loading