diff --git a/.github/scripts/check_spec_failures.py b/.github/scripts/check_spec_failures.py index de026d463..b8dd61c87 100644 --- a/.github/scripts/check_spec_failures.py +++ b/.github/scripts/check_spec_failures.py @@ -6,6 +6,8 @@ COMPLETENESS_VERDICT - Verdict from completeness check TRACE_INFRA_FAILURE - Whether trace failure was infrastructure-related COMPLETENESS_INFRA_FAILURE - Whether completeness failure was infrastructure-related + TRACE_FINDINGS - Findings text from traceability check + COMPLETENESS_FINDINGS - Findings text from completeness check GITHUB_WORKSPACE - Workspace root (for package imports) """ @@ -24,9 +26,18 @@ from scripts.ai_review_common import spec_validation_failed # noqa: E402 -def _is_infra_failure(flag: str) -> bool: - """Return True if the infrastructure failure flag is set.""" - return flag.lower() in ("true", "1", "yes") +def _is_infra_failure(flag: str, findings: str = "") -> bool: + """Return True if the failure is infrastructure-related. + + Checks the explicit flag first. Falls back to detecting infrastructure + failure keywords in the findings text, which handles cases where the + composite action output does not propagate correctly. + """ + if flag.lower() in ("true", "1", "yes"): + return True + if findings and "infrastructure failure" in findings.lower(): + return True + return False def build_parser() -> argparse.ArgumentParser: @@ -54,6 +65,16 @@ def build_parser() -> argparse.ArgumentParser: default=os.environ.get("COMPLETENESS_INFRA_FAILURE", ""), help="Whether completeness failure was infrastructure-related", ) + parser.add_argument( + "--trace-findings", + default=os.environ.get("TRACE_FINDINGS", ""), + help="Findings text from traceability check", + ) + parser.add_argument( + "--completeness-findings", + default=os.environ.get("COMPLETENESS_FINDINGS", ""), + help="Findings text from completeness check", + ) return parser @@ -62,8 +83,10 @@ def main(argv: list[str] | None = None) -> int: trace: str = args.trace_verdict completeness: str = args.completeness_verdict - trace_infra = _is_infra_failure(args.trace_infra_failure) - completeness_infra = _is_infra_failure(args.completeness_infra_failure) + trace_infra = _is_infra_failure(args.trace_infra_failure, args.trace_findings) + completeness_infra = _is_infra_failure( + args.completeness_infra_failure, args.completeness_findings + ) if trace_infra and completeness_infra: print( diff --git a/.github/workflows/ai-session-protocol.yml b/.github/workflows/ai-session-protocol.yml index cc1d68315..0cc206f90 100644 --- a/.github/workflows/ai-session-protocol.yml +++ b/.github/workflows/ai-session-protocol.yml @@ -296,6 +296,18 @@ jobs: id: validate-claims run: python3 .github/scripts/validate_investigation_claims.py +# Pass-through job: satisfies required "Aggregate Results" check when path +# filter skips the real aggregate job. GitHub branch protection requires +# SUCCESS (not SKIPPED) for required checks. See issue #1168. + + aggregate-skip: + name: Aggregate Results + runs-on: ubuntu-24.04-arm + needs: [detect-changes] + if: always() && needs.detect-changes.result == 'success' && needs.detect-changes.outputs.has_sessions != 'true' + steps: + - run: echo "Skipped - no session file changes detected" + # Aggregate results and post comment aggregate: diff --git a/.github/workflows/ai-spec-validation.yml b/.github/workflows/ai-spec-validation.yml index 21046628f..91a7f82a9 100644 --- a/.github/workflows/ai-spec-validation.yml +++ b/.github/workflows/ai-spec-validation.yml @@ -320,4 +320,6 @@ jobs: COMPLETENESS_VERDICT: ${{ steps.completeness.outputs.verdict }} TRACE_INFRA_FAILURE: ${{ steps.trace.outputs['infrastructure-failure'] }} COMPLETENESS_INFRA_FAILURE: ${{ steps.completeness.outputs['infrastructure-failure'] }} + TRACE_FINDINGS: ${{ steps.trace.outputs.findings }} + COMPLETENESS_FINDINGS: ${{ steps.completeness.outputs.findings }} run: python3 .github/scripts/check_spec_failures.py diff --git a/.serena/project.yml b/.serena/project.yml index 9dd4adabd..345b00ecc 100644 --- a/.serena/project.yml +++ b/.serena/project.yml @@ -107,3 +107,16 @@ default_modes: # fixed set of tools to use as the base tool set (if non-empty), replacing Serena's default set of tools. # This cannot be combined with non-empty excluded_tools or included_optional_tools. fixed_tools: [] + +# time budget (seconds) per tool call for the retrieval of additional symbol information +# such as docstrings or parameter information. +# This overrides the corresponding setting in the global configuration; see the documentation there. +# If null or missing, use the setting from the global configuration. +symbol_info_budget: + +# The language backend to use for this project. +# If not set, the global setting from serena_config.yml is used. +# Valid values: LSP, JetBrains +# Note: the backend is fixed at startup. If a project with a different backend +# is activated post-init, an error will be returned. +language_backend: diff --git a/scripts/detect_hook_bypass.py b/scripts/detect_hook_bypass.py index bb2efbb7f..d748d8b67 100644 --- a/scripts/detect_hook_bypass.py +++ b/scripts/detect_hook_bypass.py @@ -22,12 +22,18 @@ import argparse import json +import re import subprocess import sys from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path +# Matches squashed merge-resolution commits (single parent, merge-like subject) +_MERGE_SUBJECT_RE = re.compile( + r"^Merge (branch|remote-tracking branch) '.+' into .+" +) + @dataclass class BypassIndicator: @@ -68,7 +74,13 @@ def get_current_branch() -> str: def get_pr_commits(base_ref: str) -> list[tuple[str, str]]: - """Get commits in the PR (since diverging from base). + """Get non-merge commits in the PR (since diverging from base). + + Skips merge commits because they integrate changes already validated + on their source branches. Also skips squashed merge-resolution commits + (single-parent commits with merge-like subjects) since they only + bring in base-branch changes. Only authored commits are checked for + hook bypass indicators. Returns list of (sha, subject) tuples. """ @@ -76,6 +88,7 @@ def get_pr_commits(base_ref: str) -> list[tuple[str, str]]: [ "git", "log", + "--no-merges", f"{base_ref}..HEAD", "--format=%H %s", ], @@ -91,6 +104,8 @@ def get_pr_commits(base_ref: str) -> list[tuple[str, str]]: if not line.strip(): continue sha, _, subject = line.partition(" ") + if _MERGE_SUBJECT_RE.match(subject): + continue commits.append((sha, subject)) return commits diff --git a/scripts/workflow/__init__.py b/scripts/workflow/__init__.py index 47442ba3c..2c71f5a3c 100644 --- a/scripts/workflow/__init__.py +++ b/scripts/workflow/__init__.py @@ -1,4 +1,7 @@ -"""Workflow execution and chaining for agent pipelines.""" +"""Workflow execution and chaining for agent pipelines. + +Supports sequential chaining, parallel execution, and refinement loops. +""" from scripts.workflow.coordinator import ( CentralizedStrategy, @@ -10,6 +13,15 @@ find_ready_steps, get_strategy, ) +from scripts.workflow.executor import WorkflowExecutor +from scripts.workflow.parallel import ( + AggregationStrategy, + ParallelGroup, + ParallelStepExecutor, + can_parallelize, + identify_parallel_groups, + mark_parallel_steps, +) from scripts.workflow.schema import ( CoordinationMode, StepKind, @@ -22,20 +34,27 @@ ) __all__ = [ + "AggregationStrategy", + "CentralizedStrategy", "CoordinationMode", "CoordinationStrategy", - "CentralizedStrategy", "HierarchicalStrategy", "MeshStrategy", + "ParallelGroup", + "ParallelStepExecutor", "StepKind", "StepRef", "StepResult", "WorkflowDefinition", + "WorkflowExecutor", "WorkflowResult", "WorkflowStatus", "WorkflowStep", "aggregate_subordinate_outputs", "build_execution_plan", + "can_parallelize", "find_ready_steps", "get_strategy", + "identify_parallel_groups", + "mark_parallel_steps", ] diff --git a/scripts/workflow/parallel.py b/scripts/workflow/parallel.py new file mode 100644 index 000000000..229cb5fac --- /dev/null +++ b/scripts/workflow/parallel.py @@ -0,0 +1,364 @@ +"""Parallel execution support for agent workflow pipelines. + +Provides concurrent execution of independent workflow steps and batch +spawning patterns for multi-agent coordination. Implements ADR-009 +parallel-safe multi-agent design patterns. + +Exit Codes (ADR-035): + 0 - Success + 1 - Logic error (parallel execution failed) + 2 - Config error (invalid parallelization) +""" + +from __future__ import annotations + +import concurrent.futures +import logging +from collections.abc import Callable +from dataclasses import dataclass, field +from enum import Enum + +from scripts.workflow.schema import ( + StepKind, + StepResult, + WorkflowDefinition, + WorkflowStatus, + WorkflowStep, +) + +logger = logging.getLogger(__name__) + + +class AggregationStrategy(Enum): + """How to combine outputs from parallel steps. + + Per ADR-009 aggregation strategies: + - MERGE: Combine all outputs (non-conflicting) + - VOTE: Select majority result (redundant execution) + - ESCALATE: Flag conflicts for human/agent resolution + """ + + MERGE = "merge" + VOTE = "vote" + ESCALATE = "escalate" + + +@dataclass +class ParallelGroup: + """A set of steps that can execute concurrently. + + Groups are identified by analyzing step dependencies. Steps with + no unsatisfied dependencies can run in the same group. + """ + + step_names: list[str] = field(default_factory=list) + + def __len__(self) -> int: + return len(self.step_names) + + +@dataclass +class ParallelResult: + """Result from parallel step execution.""" + + step_results: list[StepResult] = field(default_factory=list) + succeeded: bool = True + failed_steps: list[str] = field(default_factory=list) + + def outputs(self) -> dict[str, str]: + """Return mapping of step names to their outputs.""" + return {r.step_name: r.output for r in self.step_results if r.succeeded} + + +def identify_parallel_groups(workflow: WorkflowDefinition) -> list[ParallelGroup]: + """Analyze workflow to find steps that can run in parallel. + + Uses topological ordering with dependency analysis. Steps are + grouped by their "level" in the dependency graph. Steps at the + same level have no dependencies on each other. + + Returns: + List of ParallelGroup, ordered by execution sequence. + Steps in the same group can run concurrently. + """ + if not workflow.steps: + return [] + + # Build dependency graph + deps: dict[str, set[str]] = {} + for step in workflow.steps: + deps[step.name] = set(step.depends_on()) + + # Calculate levels using topological sort + levels: dict[str, int] = {} + remaining = set(deps.keys()) + + current_level = 0 + while remaining: + # Find steps with all dependencies satisfied + ready = { + name + for name in remaining + if all(d in levels for d in deps[name]) + } + + if not ready: + # Circular dependency is a critical error + remaining_steps = ", ".join(sorted(remaining)) + msg = f"Circular dependency detected in workflow steps: {remaining_steps}" + raise ValueError(msg) + + for name in ready: + levels[name] = current_level + remaining.remove(name) + + current_level += 1 + + # Build priority lookup from workflow steps + priority_map: dict[str, int] = {s.name: s.priority for s in workflow.steps} + + # Group by level, sorted by priority (higher priority first) + max_level = max(levels.values()) if levels else 0 + groups: list[ParallelGroup] = [] + for level in range(max_level + 1): + step_names = [name for name, lvl in levels.items() if lvl == level] + step_names.sort(key=lambda n: priority_map.get(n, 0), reverse=True) + groups.append(ParallelGroup(step_names=step_names)) + + return groups + + +def can_parallelize(workflow: WorkflowDefinition) -> bool: + """Check if a workflow has opportunities for parallel execution. + + Returns True if any group has more than one step. + """ + groups = identify_parallel_groups(workflow) + return any(len(g) > 1 for g in groups) + + +StepExecutor = Callable[[WorkflowStep, str, int], str] + + +class ParallelStepExecutor: + """Execute multiple workflow steps concurrently. + + Uses a thread pool to run independent steps in parallel. Each step + receives its input (from prior steps) and produces output. + + This implements the batch spawning pattern from Issue #168: + - Launch multiple agents simultaneously + - Independent work streams with no blocking dependencies + - Aggregate results after completion + """ + + def __init__( + self, + runner: StepExecutor, + max_workers: int | None = None, + aggregation: AggregationStrategy = AggregationStrategy.MERGE, + ) -> None: + """Initialize parallel executor. + + Args: + runner: Function to execute a single step + max_workers: Maximum concurrent executions (None = CPU count) + aggregation: Strategy for combining parallel outputs + """ + self._runner = runner + self._max_workers = max_workers + self._aggregation = aggregation + + def execute_parallel( + self, + steps: list[WorkflowStep], + inputs: dict[str, str], + iteration: int = 1, + ) -> ParallelResult: + """Execute a group of steps concurrently. + + Args: + steps: Steps to execute in parallel + inputs: Mapping of step name to input string + iteration: Current refinement loop iteration + + Returns: + ParallelResult with outputs from all steps + """ + if not steps: + return ParallelResult() + + # Single step, no need for threading overhead + if len(steps) == 1: + step = steps[0] + step_input = inputs.get(step.name, "") + return self._execute_single(step, step_input, iteration) + + # Parallel execution with thread pool + result = ParallelResult() + + # Submit higher-priority steps first for earlier scheduling + sorted_steps = sorted(steps, key=lambda s: s.priority, reverse=True) + + with concurrent.futures.ThreadPoolExecutor( + max_workers=self._max_workers + ) as pool: + futures: dict[concurrent.futures.Future[str], WorkflowStep] = {} + + for step in sorted_steps: + step_input = inputs.get(step.name, "") + future = pool.submit(self._runner, step, step_input, iteration) + futures[future] = step + + for future in concurrent.futures.as_completed(futures): + step = futures[future] + try: + output = future.result() + result.step_results.append( + StepResult( + step_name=step.name, + status=WorkflowStatus.COMPLETED, + output=output, + iteration=iteration, + ) + ) + except Exception as exc: + logger.warning( + "Parallel step '%s' failed: %s", + step.name, + exc, + ) + result.step_results.append( + StepResult( + step_name=step.name, + status=WorkflowStatus.FAILED, + error=str(exc), + iteration=iteration, + ) + ) + result.failed_steps.append(step.name) + result.succeeded = False + + return result + + def _execute_single( + self, + step: WorkflowStep, + step_input: str, + iteration: int, + ) -> ParallelResult: + """Execute a single step without threading.""" + result = ParallelResult() + try: + output = self._runner(step, step_input, iteration) + result.step_results.append( + StepResult( + step_name=step.name, + status=WorkflowStatus.COMPLETED, + output=output, + iteration=iteration, + ) + ) + except Exception as exc: + logger.warning("Step '%s' failed: %s", step.name, exc) + result.step_results.append( + StepResult( + step_name=step.name, + status=WorkflowStatus.FAILED, + error=str(exc), + iteration=iteration, + ) + ) + result.failed_steps.append(step.name) + result.succeeded = False + + return result + + def aggregate_outputs( + self, + outputs: dict[str, str], + strategy: AggregationStrategy | None = None, + ) -> str: + """Combine outputs from parallel steps. + + Args: + outputs: Mapping of step names to outputs + strategy: Override aggregation strategy (defaults to instance setting) + + Returns: + Combined output string + """ + strategy = strategy or self._aggregation + + if not outputs: + return "" + + if strategy == AggregationStrategy.MERGE: + # Combine all outputs with separator + parts = [f"## {name}\n{output}" for name, output in outputs.items()] + return "\n\n---\n\n".join(parts) + + if strategy == AggregationStrategy.VOTE: + # Count identical outputs, return most common + from collections import Counter + counts = Counter(outputs.values()) + most_common = counts.most_common(1) + if most_common: + return most_common[0][0] + return "" + + if strategy == AggregationStrategy.ESCALATE: + # Return all outputs with conflict marker and routing directive + if len(set(outputs.values())) > 1: + header = ( + "## CONFLICT DETECTED - Multiple outputs require resolution\n" + "**Route to: high-level-advisor** (ADR-009 consensus escalation)\n\n" + ) + parts = [f"### {name}\n{output}" for name, output in outputs.items()] + return header + "\n\n---\n\n".join(parts) + # No conflict, return single value + return next(iter(outputs.values()), "") + + return "" + + +def mark_parallel_steps(workflow: WorkflowDefinition) -> WorkflowDefinition: + """Annotate workflow steps with parallel execution markers. + + Sets step.kind = StepKind.PARALLEL for steps that can run + concurrently with others in their group. + + Returns a new WorkflowDefinition with updated step kinds. + """ + groups = identify_parallel_groups(workflow) + + # Create mapping of step name to whether it can be parallel + parallel_names: set[str] = set() + for group in groups: + if len(group) > 1: + parallel_names.update(group.step_names) + + # Create new steps with updated kind + new_steps = [] + for step in workflow.steps: + if step.name in parallel_names: + new_step = WorkflowStep( + name=step.name, + agent=step.agent, + kind=StepKind.PARALLEL, + inputs_from=step.inputs_from, + prompt_template=step.prompt_template, + max_retries=step.max_retries, + condition=step.condition, + priority=step.priority, + ) + else: + new_step = step + new_steps.append(new_step) + + return WorkflowDefinition( + name=workflow.name, + steps=new_steps, + max_iterations=workflow.max_iterations, + metadata=workflow.metadata, + ) diff --git a/scripts/workflow/schema.py b/scripts/workflow/schema.py index c442c68b5..6054ed5b3 100644 --- a/scripts/workflow/schema.py +++ b/scripts/workflow/schema.py @@ -73,6 +73,7 @@ class WorkflowStep: prompt_template: str = "" max_retries: int = 0 condition: str = "" + priority: int = 0 is_coordinator: bool = False subordinates: list[str] = field(default_factory=list) diff --git a/tests/test_check_spec_failures.py b/tests/test_check_spec_failures.py index c7db54a98..00d247a90 100644 --- a/tests/test_check_spec_failures.py +++ b/tests/test_check_spec_failures.py @@ -128,3 +128,35 @@ def test_real_fail_not_masked_by_infra(self): "--trace-infra-failure", "true", ]) assert rc == 1 + + def test_infra_detected_from_findings_text(self, capsys): + """Findings text fallback detects infrastructure failures.""" + rc = main([ + "--trace-verdict", "CRITICAL_FAIL", + "--completeness-verdict", "CRITICAL_FAIL", + "--trace-findings", + "Copilot CLI infrastructure failure after 3 attempts", + "--completeness-findings", + "Copilot CLI infrastructure failure after 3 attempts", + ]) + assert rc == 0 + assert "infrastructure failure" in capsys.readouterr().out.lower() + + def test_infra_detected_from_one_finding(self): + """One finding with infra text, other PASS, returns 0.""" + rc = main([ + "--trace-verdict", "CRITICAL_FAIL", + "--completeness-verdict", "PASS", + "--trace-findings", + "Copilot CLI infrastructure failure after 3 attempts", + ]) + assert rc == 0 + + def test_findings_without_infra_keyword_still_fails(self): + """Findings without infrastructure keyword do not suppress failure.""" + rc = main([ + "--trace-verdict", "CRITICAL_FAIL", + "--completeness-verdict", "PASS", + "--trace-findings", "Some other error message", + ]) + assert rc == 1 diff --git a/tests/test_workflow_parallel.py b/tests/test_workflow_parallel.py new file mode 100644 index 000000000..9ca3f2e73 --- /dev/null +++ b/tests/test_workflow_parallel.py @@ -0,0 +1,333 @@ +"""Tests for parallel workflow execution. + +Covers parallel group identification, concurrent step execution, +and output aggregation strategies per ADR-009. +""" + +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock + +import pytest + +from scripts.workflow.parallel import ( + AggregationStrategy, + ParallelGroup, + ParallelStepExecutor, + can_parallelize, + identify_parallel_groups, + mark_parallel_steps, +) +from scripts.workflow.schema import ( + StepKind, + StepRef, + WorkflowDefinition, + WorkflowStep, +) + + +class TestIdentifyParallelGroups: + def test_sequential_steps_in_separate_groups(self) -> None: + """Each dependent step gets its own group.""" + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="critic", inputs_from=[StepRef(name="a")]), + WorkflowStep(name="c", agent="qa", inputs_from=[StepRef(name="b")]), + ] + wd = WorkflowDefinition(name="seq", steps=steps) + + groups = identify_parallel_groups(wd) + + assert len(groups) == 3 + assert groups[0].step_names == ["a"] + assert groups[1].step_names == ["b"] + assert groups[2].step_names == ["c"] + + def test_independent_steps_in_same_group(self) -> None: + """Steps with no dependencies can run together.""" + steps = [ + WorkflowStep(name="research", agent="analyst"), + WorkflowStep(name="security", agent="security"), + WorkflowStep(name="devops", agent="devops"), + ] + wd = WorkflowDefinition(name="parallel", steps=steps) + + groups = identify_parallel_groups(wd) + + assert len(groups) == 1 + assert set(groups[0].step_names) == {"research", "security", "devops"} + + def test_diamond_dependency(self) -> None: + """Diamond pattern: A -> B,C -> D.""" + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="critic", inputs_from=[StepRef(name="a")]), + WorkflowStep(name="c", agent="security", inputs_from=[StepRef(name="a")]), + WorkflowStep( + name="d", + agent="orchestrator", + inputs_from=[StepRef(name="b"), StepRef(name="c")], + ), + ] + wd = WorkflowDefinition(name="diamond", steps=steps) + + groups = identify_parallel_groups(wd) + + assert len(groups) == 3 + assert groups[0].step_names == ["a"] + assert set(groups[1].step_names) == {"b", "c"} + assert groups[2].step_names == ["d"] + + def test_empty_workflow(self) -> None: + """Empty workflow returns no groups.""" + wd = WorkflowDefinition(name="empty", steps=[]) + groups = identify_parallel_groups(wd) + assert groups == [] + + def test_priority_ordering_within_group(self) -> None: + """Steps in the same group are ordered by priority (highest first).""" + steps = [ + WorkflowStep(name="low", agent="analyst", priority=1), + WorkflowStep(name="high", agent="security", priority=10), + WorkflowStep(name="mid", agent="devops", priority=5), + ] + wd = WorkflowDefinition(name="priority", steps=steps) + + groups = identify_parallel_groups(wd) + + assert len(groups) == 1 + assert groups[0].step_names == ["high", "mid", "low"] + + def test_circular_dependency_raises_error(self) -> None: + """Circular dependency raises ValueError.""" + # Create A -> B -> A cycle + steps = [ + WorkflowStep(name="a", agent="analyst", inputs_from=[StepRef(name="b")]), + WorkflowStep(name="b", agent="critic", inputs_from=[StepRef(name="a")]), + ] + wd = WorkflowDefinition(name="circular", steps=steps) + + with pytest.raises(ValueError, match="Circular dependency detected"): + identify_parallel_groups(wd) + + +class TestCanParallelize: + def test_true_for_independent_steps(self) -> None: + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="security"), + ] + wd = WorkflowDefinition(name="test", steps=steps) + assert can_parallelize(wd) is True + + def test_false_for_sequential_chain(self) -> None: + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="critic", inputs_from=[StepRef(name="a")]), + ] + wd = WorkflowDefinition(name="test", steps=steps) + assert can_parallelize(wd) is False + + def test_false_for_single_step(self) -> None: + steps = [WorkflowStep(name="a", agent="analyst")] + wd = WorkflowDefinition(name="test", steps=steps) + assert can_parallelize(wd) is False + + +class TestParallelStepExecutor: + def test_single_step_no_threading(self) -> None: + """Single step executes without thread pool overhead.""" + runner = MagicMock(return_value="output") + executor = ParallelStepExecutor(runner=runner) + step = WorkflowStep(name="single", agent="analyst") + + result = executor.execute_parallel([step], {"single": "input"}) + + assert result.succeeded + assert len(result.step_results) == 1 + assert result.step_results[0].output == "output" + runner.assert_called_once() + + def test_parallel_execution_runs_concurrently(self) -> None: + """Multiple steps execute in parallel.""" + execution_times: dict[str, float] = {} + lock = threading.Lock() + + def slow_runner(step: WorkflowStep, inp: str, iteration: int) -> str: + with lock: + execution_times[step.name] = time.time() + time.sleep(0.1) + return f"done-{step.name}" + + executor = ParallelStepExecutor(runner=slow_runner, max_workers=3) + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="security"), + WorkflowStep(name="c", agent="devops"), + ] + + start = time.time() + result = executor.execute_parallel(steps, {}) + elapsed = time.time() - start + + assert result.succeeded + assert len(result.step_results) == 3 + # Parallel execution should take ~0.1s, not ~0.3s + assert elapsed < 0.25 + + def test_failed_step_marks_result_failed(self) -> None: + """A failing step sets succeeded=False.""" + + def failing_runner(step: WorkflowStep, inp: str, iteration: int) -> str: + if step.name == "fail": + raise RuntimeError("intentional failure") + return "ok" + + executor = ParallelStepExecutor(runner=failing_runner) + steps = [ + WorkflowStep(name="ok", agent="analyst"), + WorkflowStep(name="fail", agent="security"), + ] + + result = executor.execute_parallel(steps, {}) + + assert not result.succeeded + assert "fail" in result.failed_steps + assert result.outputs() == {"ok": "ok"} + + def test_priority_ordering_in_execution(self) -> None: + """Higher-priority steps are submitted first to the thread pool.""" + submission_order: list[str] = [] + lock = threading.Lock() + + def tracking_runner(step: WorkflowStep, inp: str, iteration: int) -> str: + with lock: + submission_order.append(step.name) + return "ok" + + executor = ParallelStepExecutor(runner=tracking_runner, max_workers=1) + steps = [ + WorkflowStep(name="low", agent="analyst", priority=1), + WorkflowStep(name="high", agent="security", priority=10), + ] + + executor.execute_parallel(steps, {}) + + # With max_workers=1, execution is serial in submission order + assert submission_order == ["high", "low"] + + def test_outputs_method(self) -> None: + """outputs() returns completed step outputs.""" + runner = MagicMock(return_value="result") + executor = ParallelStepExecutor(runner=runner) + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="critic"), + ] + + result = executor.execute_parallel(steps, {}) + + assert result.outputs() == {"a": "result", "b": "result"} + + +class TestAggregationStrategies: + def test_merge_combines_outputs(self) -> None: + executor = ParallelStepExecutor( + runner=MagicMock(), + aggregation=AggregationStrategy.MERGE, + ) + outputs = {"analyst": "analysis", "security": "findings"} + + merged = executor.aggregate_outputs(outputs) + + assert "## analyst" in merged + assert "analysis" in merged + assert "## security" in merged + assert "findings" in merged + + def test_vote_returns_majority(self) -> None: + executor = ParallelStepExecutor( + runner=MagicMock(), + aggregation=AggregationStrategy.VOTE, + ) + outputs = {"a": "yes", "b": "yes", "c": "no"} + + result = executor.aggregate_outputs(outputs) + + assert result == "yes" + + def test_escalate_marks_conflict(self) -> None: + executor = ParallelStepExecutor( + runner=MagicMock(), + aggregation=AggregationStrategy.ESCALATE, + ) + outputs = {"a": "option1", "b": "option2"} + + result = executor.aggregate_outputs(outputs) + + assert "CONFLICT DETECTED" in result + assert "high-level-advisor" in result + assert "option1" in result + assert "option2" in result + + def test_escalate_no_conflict(self) -> None: + executor = ParallelStepExecutor( + runner=MagicMock(), + aggregation=AggregationStrategy.ESCALATE, + ) + outputs = {"a": "same", "b": "same"} + + result = executor.aggregate_outputs(outputs) + + assert "CONFLICT" not in result + assert result == "same" + + def test_empty_outputs(self) -> None: + executor = ParallelStepExecutor(runner=MagicMock()) + assert executor.aggregate_outputs({}) == "" + + +class TestMarkParallelSteps: + def test_marks_concurrent_steps(self) -> None: + """Steps that can run in parallel get PARALLEL kind.""" + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="security"), + WorkflowStep( + name="c", + agent="orchestrator", + inputs_from=[StepRef(name="a"), StepRef(name="b")], + ), + ] + wd = WorkflowDefinition(name="test", steps=steps) + + marked = mark_parallel_steps(wd) + + assert marked.get_step("a").kind == StepKind.PARALLEL + assert marked.get_step("b").kind == StepKind.PARALLEL + assert marked.get_step("c").kind == StepKind.AGENT + + def test_sequential_steps_not_marked(self) -> None: + """Dependent steps keep AGENT kind.""" + steps = [ + WorkflowStep(name="a", agent="analyst"), + WorkflowStep(name="b", agent="critic", inputs_from=[StepRef(name="a")]), + ] + wd = WorkflowDefinition(name="test", steps=steps) + + marked = mark_parallel_steps(wd) + + assert marked.get_step("a").kind == StepKind.AGENT + assert marked.get_step("b").kind == StepKind.AGENT + + +class TestParallelGroup: + def test_len(self) -> None: + group = ParallelGroup(step_names=["a", "b", "c"]) + assert len(group) == 3 + + def test_empty(self) -> None: + group = ParallelGroup() + assert len(group) == 0