Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 315 additions & 56 deletions crates/prek/src/cli/run/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,67 +619,92 @@ async fn run_hooks(
filter.len()
);

// Sort hooks by priority (lower number means higher priority).
// If two hooks have the same priority, preserve their original order from the config.
hooks.sort_by(|a, b| a.priority.cmp(&b.priority).then(a.idx.cmp(&b.idx)));

if projects_len > 1 || !project.is_root() {
reporter.suspend(|| {
writeln!(
status_printer.printer().stdout(),
"{}{}",
if first { "" } else { "\n" },
format!("Running hooks for `{}`:", project.to_string().cyan()).bold()
)
})?;
first = false;
}
let mut prev_diff = git::get_diff(project.path()).await?;

let project_fail_fast = fail_fast || project.config().fail_fast.unwrap_or(false);

for group_range in PriorityGroupRanges::new(&hooks) {
let group_hooks = hooks[group_range].to_vec();
let mut group_results =
run_priority_group(group_hooks, &filter, store, dry_run, &reporter).await?;

// Print results in a stable order (same order as config within the project).
group_results.sort_unstable_by(|a, b| a.hook.idx.cmp(&b.hook.idx));

// Check if any files were modified by this group of hooks.
let all_skipped = group_results.iter().all(|r| r.status.is_skipped());
let group_modified_files = if !all_skipped {
let curr_diff = git::get_diff(project.path()).await?;
let group_modified_files = curr_diff != prev_diff;
prev_diff = curr_diff;
group_modified_files
} else {
false
};

if group_modified_files {
file_modified = true;
// Determine if any hooks use DAG scheduling (group/after fields).
let has_dag_hooks = hooks
.iter()
.any(|h| h.group.is_some() || !h.after.is_empty());

if has_dag_hooks {
// When any hook uses group/after, all hooks in this project are scheduled
// via the DAG. Hooks without `after` have zero dependencies and run in the
// first wave.
if projects_len > 1 || !project.is_root() {
reporter.suspend(|| {
writeln!(
status_printer.printer().stdout(),
"{}{}",
if first { "" } else { "\n" },
format!("Running hooks for `{}`:", project.to_string().cyan()).bold()
)
})?;
first = false;
}

reporter.suspend(|| {
render_priority_group(
let mut prev_diff = git::get_diff(project.path()).await?;
let project_fail_fast = fail_fast || project.config().fail_fast.unwrap_or(false);

let dag_result = run_dag_hooks(
hooks,
&filter,
store,
dry_run,
&reporter,
printer,
&status_printer,
verbose,
project.path(),
&mut prev_diff,
&mut file_modified,
&mut success,
&mut has_unimplemented,
project_fail_fast,
)
.await?;
if dag_result {
break 'outer;
}
} else {
// Legacy priority-based scheduling path.
// Sort hooks by priority (lower number means higher priority).
// If two hooks have the same priority, preserve their original order from the config.
hooks.sort_by(|a, b| a.priority.cmp(&b.priority).then(a.idx.cmp(&b.idx)));

if projects_len > 1 || !project.is_root() {
reporter.suspend(|| {
writeln!(
status_printer.printer().stdout(),
"{}{}",
if first { "" } else { "\n" },
format!("Running hooks for `{}`:", project.to_string().cyan()).bold()
)
})?;
first = false;
}
let mut prev_diff = git::get_diff(project.path()).await?;

let project_fail_fast = fail_fast || project.config().fail_fast.unwrap_or(false);

for group_range in PriorityGroupRanges::new(&hooks) {
let group_hooks = hooks[group_range].to_vec();
let (should_break, _modified) = run_and_render_group(
group_hooks,
&filter,
store,
dry_run,
&reporter,
printer,
&status_printer,
&group_results,
verbose,
group_modified_files,
project.path(),
&mut prev_diff,
&mut file_modified,
&mut success,
&mut has_unimplemented,
project_fail_fast,
)
})?;

let hook_fail_fast = apply_group_outcome(
&group_results,
group_modified_files,
&mut success,
&mut has_unimplemented,
);

if !success && (project_fail_fast || hook_fail_fast) {
break 'outer;
.await?;
if should_break {
break 'outer;
}
}
}
}
Expand Down Expand Up @@ -917,6 +942,240 @@ fn render_priority_group(
Ok(())
}

/// Run a group of hooks concurrently, render results, and check outcomes.
/// Returns `(should_break_outer, group_modified_files)`.
#[allow(clippy::too_many_arguments)]
async fn run_and_render_group(
group_hooks: Vec<InstalledHook>,
filter: &FileFilter<'_>,
store: &Store,
dry_run: bool,
reporter: &HookRunReporter,
printer: Printer,
status_printer: &StatusPrinter,
verbose: bool,
project_path: &std::path::Path,
prev_diff: &mut Vec<u8>,
file_modified: &mut bool,
success: &mut bool,
has_unimplemented: &mut bool,
project_fail_fast: bool,
) -> Result<(bool, bool)> {
let mut group_results =
run_priority_group(group_hooks, filter, store, dry_run, reporter).await?;

group_results.sort_unstable_by(|a, b| a.hook.idx.cmp(&b.hook.idx));

let all_skipped = group_results.iter().all(|r| r.status.is_skipped());
let group_modified_files = if !all_skipped {
let curr_diff = git::get_diff(project_path).await?;
let modified = curr_diff != *prev_diff;
*prev_diff = curr_diff;
modified
} else {
false
};

if group_modified_files {
*file_modified = true;
}

reporter.suspend(|| {
render_priority_group(
printer,
status_printer,
&group_results,
verbose,
group_modified_files,
)
})?;

let hook_fail_fast = apply_group_outcome(
&group_results,
group_modified_files,
success,
has_unimplemented,
);

let should_break = !*success && (project_fail_fast || hook_fail_fast);
Ok((should_break, group_modified_files))
}

/// Build a DAG from hooks with `group`/`after` fields and execute in topological order,
/// running independent hooks in parallel.
///
/// Returns `true` if the outer loop should break (fail-fast).
#[allow(clippy::too_many_arguments)]
async fn run_dag_hooks(
hooks: Vec<InstalledHook>,
filter: &FileFilter<'_>,
store: &Store,
dry_run: bool,
reporter: &HookRunReporter,
printer: Printer,
status_printer: &StatusPrinter,
verbose: bool,
project_path: &std::path::Path,
prev_diff: &mut Vec<u8>,
file_modified: &mut bool,
success: &mut bool,
has_unimplemented: &mut bool,
project_fail_fast: bool,
) -> Result<bool> {
let dag = build_dag(&hooks)?;

// Track completion status per hook index in `hooks` vec.
let num_hooks = hooks.len();
let mut completed = vec![false; num_hooks];
// remaining_deps[i] = number of unfinished dependencies for hook i.
let mut remaining_deps: Vec<usize> = dag.deps.iter().map(Vec::len).collect();

loop {
// Collect all hooks that are ready to run (deps satisfied, not yet completed).
let ready: Vec<usize> = (0..num_hooks)
.filter(|&i| !completed[i] && remaining_deps[i] == 0)
.collect();

if ready.is_empty() {
// Either we're done or there's a bug (cycles should have been caught).
break;
}

// Run ready hooks as a concurrent group.
let group_hooks: Vec<InstalledHook> = ready.iter().map(|&i| hooks[i].clone()).collect();

let (should_break, _modified) = run_and_render_group(
group_hooks,
filter,
store,
dry_run,
reporter,
printer,
status_printer,
verbose,
project_path,
prev_diff,
file_modified,
success,
has_unimplemented,
project_fail_fast,
)
.await?;

// Mark ready hooks as completed.
for &i in &ready {
completed[i] = true;
// Decrement remaining_deps for all dependents of hook i.
for &dependent in &dag.dependents[i] {
remaining_deps[dependent] -= 1;
}
}

if should_break {
return Ok(true);
}
}

Ok(false)
}

/// The resolved DAG structure for hook scheduling.
struct DagSchedule {
/// For each hook index, the set of hook indices it depends on.
deps: Vec<Vec<usize>>,
/// For each hook index, the set of hook indices that depend on it.
dependents: Vec<Vec<usize>>,
}

/// Build a DAG from hooks' `after` fields.
///
/// Resolves `group:<name>` references to the constituent hook indices.
/// Detects cycles and unknown references.
fn build_dag(hooks: &[InstalledHook]) -> Result<DagSchedule> {
let num_hooks = hooks.len();

// Build lookup maps: hook id -> index, group name -> list of indices.
let mut id_to_idx: FxHashMap<&str, usize> = FxHashMap::default();
let mut group_to_indices: FxHashMap<&str, Vec<usize>> = FxHashMap::default();

for (i, hook) in hooks.iter().enumerate() {
id_to_idx.insert(&hook.id, i);
if let Some(group) = &hook.group {
group_to_indices.entry(group.as_str()).or_default().push(i);
}
}

// Build dependency edges.
let mut deps: Vec<Vec<usize>> = vec![Vec::new(); num_hooks];
let mut dependents: Vec<Vec<usize>> = vec![Vec::new(); num_hooks];

for (i, hook) in hooks.iter().enumerate() {
for dep_ref in &hook.after {
if let Some(group_name) = dep_ref.strip_prefix("group:") {
let indices = group_to_indices.get(group_name).ok_or_else(|| {
anyhow::anyhow!(
"Hook `{}` has `after: [group:{}]` but no hook belongs to group `{}`",
hook.id,
group_name,
group_name
)
})?;
for &dep_idx in indices {
deps[i].push(dep_idx);
dependents[dep_idx].push(i);
}
} else {
let &dep_idx = id_to_idx.get(dep_ref.as_str()).ok_or_else(|| {
anyhow::anyhow!(
"Hook `{}` has `after: [{}]` but no hook with id `{}` exists",
hook.id,
dep_ref,
dep_ref
)
})?;
deps[i].push(dep_idx);
dependents[dep_idx].push(i);
}
}
}

// Detect cycles using Kahn's algorithm.
let mut in_degree: Vec<usize> = deps.iter().map(Vec::len).collect();
let mut queue: Vec<usize> = in_degree
.iter()
.enumerate()
.filter(|&(_, &d)| d == 0)
.map(|(i, _)| i)
.collect();
let mut visited = 0;

while let Some(node) = queue.pop() {
visited += 1;
for &dependent in &dependents[node] {
in_degree[dependent] -= 1;
if in_degree[dependent] == 0 {
queue.push(dependent);
}
}
}

if visited != num_hooks {
// Find hooks involved in cycles for a better error message.
let cycle_hooks: Vec<&str> = in_degree
.iter()
.enumerate()
.filter(|&(_, &d)| d > 0)
.map(|(i, _)| hooks[i].id.as_str())
.collect();
anyhow::bail!(
"Cycle detected in hook dependencies: {}",
cycle_hooks.join(", ")
);
}

Ok(DagSchedule { deps, dependents })
}

fn apply_group_outcome(
group_results: &[RunResult],
group_modified_files: bool,
Expand Down
Loading
Loading