diff --git a/.github/scripts/monitor_slurm_job.sh b/.github/scripts/monitor_slurm_job.sh new file mode 100755 index 000000000..7e6637795 --- /dev/null +++ b/.github/scripts/monitor_slurm_job.sh @@ -0,0 +1,139 @@ +#!/bin/bash +# Monitor a SLURM job and stream its output in real-time +# Usage: monitor_slurm_job.sh + +set -e + +if [ $# -ne 2 ]; then + echo "Usage: $0 " + exit 1 +fi + +job_id="$1" +output_file="$2" + +echo "Submitted batch job $job_id" +echo "Monitoring output file: $output_file" + +# Wait for file to appear with retry logic for transient squeue failures +echo "Waiting for job to start..." +squeue_retries=0 +max_squeue_retries=5 +while [ ! -f "$output_file" ]; do + # Check if job is still queued/running + if squeue -j "$job_id" &>/dev/null; then + squeue_retries=0 # Reset on success + sleep 5 + else + squeue_retries=$((squeue_retries + 1)) + if [ $squeue_retries -ge $max_squeue_retries ]; then + # Job not in queue and output file doesn't exist + if [ ! -f "$output_file" ]; then + echo "ERROR: Job $job_id not in queue and output file not created" + exit 1 + fi + break + fi + # Exponential backoff + sleep_time=$((2 ** squeue_retries)) + echo "Warning: squeue check failed, retrying in ${sleep_time}s..." + sleep $sleep_time + fi +done + +echo "=== Streaming output for job $job_id ===" +# Stream output while job runs +tail -f "$output_file" & +tail_pid=$! + +# Wait for job to complete with retry logic for transient squeue failures +squeue_failures=0 +while true; do + if squeue -j "$job_id" &>/dev/null; then + squeue_failures=0 + else + squeue_failures=$((squeue_failures + 1)) + # Check if job actually completed using sacct (if available) + if [ $squeue_failures -ge 3 ]; then + if command -v sacct >/dev/null 2>&1; then + state=$(sacct -j "$job_id" --format=State --noheader 2>/dev/null | head -n1 | awk '{print $1}') + # Consider job done only if it reached a terminal state + case "$state" in + COMPLETED|FAILED|CANCELLED|TIMEOUT|OUT_OF_MEMORY) + break + ;; + *) + # treat as transient failure, reset failures and continue polling + squeue_failures=0 + ;; + esac + else + # No sacct: avoid false positive by doing an extra check cycle + squeue_failures=2 + fi + fi + fi + sleep 5 +done + +# Wait for output file to finish growing (stabilize) before stopping tail +if [ -f "$output_file" ]; then + last_size=-1 + same_count=0 + while true; do + size=$(stat -c%s "$output_file" 2>/dev/null || echo -1) + if [ "$size" -eq "$last_size" ] && [ "$size" -ge 0 ]; then + same_count=$((same_count + 1)) + else + same_count=0 + last_size=$size + fi + # two consecutive stable checks (~10s) implies file likely flushed + if [ $same_count -ge 2 ]; then + break + fi + sleep 5 + done +fi + +# Stop tailing +kill $tail_pid 2>/dev/null || true + +echo "" +echo "=== Final output ===" +cat "$output_file" + +# Check exit status with sacct fallback +exit_code="" + +# Try scontrol first (works for recent jobs) +scontrol_output=$(scontrol show job "$job_id" 2>/dev/null || echo "") +if [ -n "$scontrol_output" ]; then + exit_code=$(echo "$scontrol_output" | grep -oE 'ExitCode=[0-9]+:[0-9]+' | cut -d= -f2 || echo "") +fi + +# If scontrol failed or returned invalid job, try sacct (for completed/aged-out jobs) +if [ -z "$exit_code" ]; then + echo "Warning: scontrol failed to get exit code, trying sacct..." + sacct_output=$(sacct -j "$job_id" --format=ExitCode --noheader --parsable2 2>/dev/null | head -n1 || echo "") + if [ -n "$sacct_output" ]; then + exit_code="$sacct_output" + fi +fi + +# If we still can't determine exit code, fail explicitly +if [ -z "$exit_code" ]; then + echo "ERROR: Unable to determine exit status for job $job_id" + echo "Both scontrol and sacct failed to return valid exit code" + exit 1 +fi + +# Check if job succeeded +if [ "$exit_code" != "0:0" ]; then + echo "ERROR: Job $job_id failed with exit code $exit_code" + exit 1 +fi + +echo "Job $job_id completed successfully" +exit 0 + diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 563566788..3d9c13f3c 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -23,7 +23,7 @@ jobs: filters: ".github/file-filter.yml" self: - name: "${{ matrix.name }} (${{ matrix.device }})" + name: "${{ matrix.name }} (${{ matrix.device }}${{ matrix.interface != 'none' && format('-{0}', matrix.interface) || '' }})" if: ${{ github.repository=='MFlowCode/MFC' && needs.file-changes.outputs.checkall=='true' && ((github.event_name=='pull_request_review' && github.event.review.state=='approved') || (github.event_name=='pull_request' && (github.event.pull_request.user.login=='sbryngelson' || github.event.pull_request.user.login=='wilfonba'))) }} needs: file-changes strategy: @@ -73,7 +73,7 @@ jobs: runs-on: group: ${{ matrix.group }} labels: ${{ matrix.labels }} - timeout-minutes: 1400 + timeout-minutes: 480 env: ACTIONS_RUNNER_FORCE_ACTIONS_NODE_VERSION: node16 ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true @@ -99,9 +99,54 @@ jobs: - name: Bench (Master v. PR) run: | - (cd pr && bash .github/workflows/${{ matrix.cluster }}/submit-bench.sh .github/workflows/${{ matrix.cluster }}/bench.sh ${{ matrix.device }} ${{ matrix.interface }}) & - (cd master && bash .github/workflows/${{ matrix.cluster }}/submit-bench.sh .github/workflows/${{ matrix.cluster }}/bench.sh ${{ matrix.device }} ${{ matrix.interface }}) & - wait %1 && wait %2 + set -e + + # Function to submit and monitor using extracted script + submit_and_monitor() { + local dir=$1 + local device=$2 + local interface=$3 + local cluster=$4 + + cd "$dir" + + # Submit job + submit_output=$(bash .github/workflows/$cluster/submit-bench.sh \ + .github/workflows/$cluster/bench.sh $device $interface 2>&1) + + job_id=$(echo "$submit_output" | sed -n 's/.*Submitted batch job \([0-9][0-9]*\).*/\1/p') + job_slug="bench-$device-$interface" + output_file="${job_slug}.out" + + if [ -z "$job_id" ]; then + echo "ERROR: Failed to submit job" + echo "$submit_output" + return 1 + fi + + # Use the monitoring script + bash .github/scripts/monitor_slurm_job.sh "$job_id" "$output_file" + } + + # Run both jobs with monitoring + (submit_and_monitor pr ${{ matrix.device }} ${{ matrix.interface }} ${{ matrix.cluster }}) & + pr_pid=$! + + (submit_and_monitor master ${{ matrix.device }} ${{ matrix.interface }} ${{ matrix.cluster }}) & + master_pid=$! + + # Wait and capture exit codes reliably + pr_exit=0 + master_exit=0 + + wait "$pr_pid" || pr_exit=$? + wait "$master_pid" || master_exit=$? + + # Explicitly check and quote to avoid test errors + if [ "${pr_exit}" -ne 0 ] || [ "${master_exit}" -ne 0 ]; then + echo "One or both benchmark jobs failed: pr_exit=${pr_exit}, master_exit=${master_exit}" + exit 1 + fi - name: Generate & Post Comment run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d7b4703a1..251b276ac 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -93,23 +93,40 @@ jobs: OPT2: ${{ matrix.debug == 'debug' && '-% 20' || '' }} self: - name: Self Hosted + name: "${{ matrix.cluster_name }} (${{ matrix.device }}${{ matrix.interface != 'none' && format('-{0}', matrix.interface) || '' }})" if: github.repository == 'MFlowCode/MFC' && needs.file-changes.outputs.checkall == 'true' needs: file-changes continue-on-error: false - timeout-minutes: 1400 + timeout-minutes: 480 strategy: matrix: - device: ['gpu'] - interface: ['acc', 'omp'] - lbl: ['gt', 'frontier'] include: - - device: 'cpu' + # Phoenix (GT) + - lbl: 'gt' + cluster_name: 'Georgia Tech | Phoenix' + device: 'gpu' + interface: 'acc' + - lbl: 'gt' + cluster_name: 'Georgia Tech | Phoenix' + device: 'gpu' + interface: 'omp' + - lbl: 'gt' + cluster_name: 'Georgia Tech | Phoenix' + device: 'cpu' interface: 'none' - lbl: 'gt' - - device: 'cpu' + # Frontier (ORNL) + - lbl: 'frontier' + cluster_name: 'Oak Ridge | Frontier' + device: 'gpu' + interface: 'acc' + - lbl: 'frontier' + cluster_name: 'Oak Ridge | Frontier' + device: 'gpu' + interface: 'omp' + - lbl: 'frontier' + cluster_name: 'Oak Ridge | Frontier' + device: 'cpu' interface: 'none' - lbl: 'frontier' runs-on: group: phoenix labels: ${{ matrix.lbl }} diff --git a/toolchain/mfc/bench.py b/toolchain/mfc/bench.py index eb1b12003..feb33198c 100644 --- a/toolchain/mfc/bench.py +++ b/toolchain/mfc/bench.py @@ -1,4 +1,4 @@ -import os, sys, uuid, subprocess, dataclasses, typing, math +import os, sys, uuid, subprocess, dataclasses, typing, math, traceback import rich.table @@ -16,6 +16,7 @@ class BenchCase: path: str args: typing.List[str] +# pylint: disable=too-many-locals, too-many-branches, too-many-statements def bench(targets = None): if targets is None: targets = ARG("targets") @@ -36,6 +37,10 @@ def bench(targets = None): case.args = case.args + ARG("--") case.path = os.path.abspath(case.path) + # Validate case file exists early + if not os.path.exists(case.path): + raise MFCException(f"Benchmark case file not found: {case.path}") + results = { "metadata": { "invocation": sys.argv[1:], @@ -44,6 +49,8 @@ def bench(targets = None): "cases": {}, } + failed_cases = [] + for i, case in enumerate(CASES): summary_filepath = os.path.join(bench_dirpath, f"{case.slug}.yaml") log_filepath = os.path.join(bench_dirpath, f"{case.slug}.out") @@ -54,21 +61,81 @@ def bench(targets = None): cons.print(f"> Log: [bold]{os.path.relpath(log_filepath)}[/bold]") cons.print(f"> Summary: [bold]{os.path.relpath(summary_filepath)}[/bold]") - with open(log_filepath, "w") as log_file: - system( - ["./mfc.sh", "run", case.path, "--case-optimization"] + - ["--targets"] + [t.name for t in targets] + - ["--output-summary", summary_filepath] + - case.args + - ["--", "--gbpp", ARG('mem')], - stdout=log_file, - stderr=subprocess.STDOUT) - - results["cases"][case.slug] = { - "description": dataclasses.asdict(case), - "output_summary": file_load_yaml(summary_filepath), - } + try: + with open(log_filepath, "w") as log_file: + result = system( + ["./mfc.sh", "run", case.path, "--case-optimization"] + + ["--targets"] + [t.name for t in targets] + + ["--output-summary", summary_filepath] + + case.args + + ["--", "--gbpp", str(ARG('mem'))], + stdout=log_file, + stderr=subprocess.STDOUT) + + # Check return code (handle CompletedProcess or int defensively) + rc = result.returncode if hasattr(result, "returncode") else result + if rc != 0: + cons.print(f"[bold red]ERROR[/bold red]: Case {case.slug} failed with exit code {rc}") + cons.print(f"[bold red] Check log at: {log_filepath}[/bold red]") + failed_cases.append(case.slug) + continue + + # Validate summary file exists + if not os.path.exists(summary_filepath): + cons.print(f"[bold red]ERROR[/bold red]: Summary file not created for {case.slug}") + cons.print(f"[bold red] Expected: {summary_filepath}[/bold red]") + failed_cases.append(case.slug) + continue + + # Load summary + summary = file_load_yaml(summary_filepath) + + # Validate all targets have required data + validation_failed = False + for target in targets: + if target.name not in summary: + cons.print(f"[bold red]ERROR[/bold red]: Target {target.name} missing from summary for {case.slug}") + validation_failed = True + break + + if "exec" not in summary[target.name]: + cons.print(f"[bold red]ERROR[/bold red]: 'exec' time missing for {target.name} in {case.slug}") + validation_failed = True + break + + if target.name == "simulation" and "grind" not in summary[target.name]: + cons.print(f"[bold red]ERROR[/bold red]: 'grind' time missing for simulation in {case.slug}") + validation_failed = True + break + + if validation_failed: + failed_cases.append(case.slug) + continue + + # Add to results + results["cases"][case.slug] = { + "description": dataclasses.asdict(case), + "output_summary": summary, + } + cons.print(f"[bold green]✓[/bold green] Case {case.slug} completed successfully") + + except Exception as e: + cons.print(f"[bold red]ERROR[/bold red]: Unexpected error running {case.slug}: {e}") + cons.print(f"[dim]{traceback.format_exc()}[/dim]") + failed_cases.append(case.slug) + finally: + cons.unindent() + + # Report results + if failed_cases: + cons.print() + cons.print(f"[bold red]Failed cases ({len(failed_cases)}):[/bold red]") + for slug in failed_cases: + cons.print(f" - {slug}") + cons.print() + raise MFCException(f"Benchmarking failed: {len(failed_cases)}/{len(CASES)} cases failed") + # Write output file_dump_yaml(ARG("output"), results) cons.print(f"Wrote results to [bold magenta]{os.path.relpath(ARG('output'))}[/bold magenta].") @@ -137,11 +204,15 @@ def diff(): grind_time_value = lhs_summary[target.name]["grind"] / rhs_summary[target.name]["grind"] speedups[i] += f" & Grind: {grind_time_value:.2f}" - if grind_time_value <0.95: + if grind_time_value < 0.95: cons.print(f"[bold red]Error[/bold red]: Benchmarking failed since grind time speedup for {target.name} below acceptable threshold (<0.95) - Case: {slug}") err = 1 - except Exception as _: - pass + except Exception as e: + cons.print( + f"[bold red]ERROR[/bold red]: Failed to compute speedup for {target.name} in {slug}: {e}\n" + f"{traceback.format_exc()}" + ) + err = 1 table.add_row(f"[magenta]{slug}[/magenta]", *speedups) diff --git a/toolchain/mfc/test/test.py b/toolchain/mfc/test/test.py index 0ba6e160f..5f9d4d82c 100644 --- a/toolchain/mfc/test/test.py +++ b/toolchain/mfc/test/test.py @@ -1,4 +1,4 @@ -import os, typing, shutil, time, itertools +import os, typing, shutil, time, itertools, threading from random import sample, seed import rich, rich.table @@ -23,6 +23,22 @@ total_test_count = 0 errors = [] +# Early abort thresholds +MIN_CASES_BEFORE_ABORT = 20 +FAILURE_RATE_THRESHOLD = 0.3 + +# Per-test timeout (1 hour) +TEST_TIMEOUT_SECONDS = 3600 + +# Global abort flag for thread-safe early termination +# This flag is set when the failure rate exceeds the threshold, signaling +# all worker threads to exit gracefully. This avoids raising exceptions +# from worker threads which could leave the scheduler in an undefined state. +abort_tests = threading.Event() + +class TestTimeoutError(MFCException): + pass + # pylint: disable=too-many-branches, trailing-whitespace def __filter(cases_) -> typing.List[TestCase]: cases = cases_[:] @@ -91,7 +107,7 @@ def __filter(cases_) -> typing.List[TestCase]: return selected_cases, skipped_cases def test(): - # pylint: disable=global-statement, global-variable-not-assigned + # pylint: disable=global-statement, global-variable-not-assigned, too-many-statements global nFAIL, nPASS, nSKIP, total_test_count global errors @@ -160,6 +176,20 @@ def test(): [ sched.Task(ppn=case.ppn, func=handle_case, args=[case], load=case.get_cell_count()) for case in cases ], ARG("jobs"), ARG("gpus")) + # Check if we aborted due to high failure rate + if abort_tests.is_set(): + total_completed = nFAIL + nPASS + cons.print() + cons.unindent() + if total_completed > 0: + raise MFCException( + f"Excessive test failures: {nFAIL}/{total_completed} " + f"failed ({nFAIL/total_completed*100:.1f}%)" + ) + raise MFCException( + f"Excessive test failures: {nFAIL} failed, but no tests were completed." + ) + nSKIP = len(skipped_cases) cons.print() cons.unindent() @@ -181,96 +211,241 @@ def test(): # pylint: disable=too-many-locals, too-many-branches, too-many-statements, trailing-whitespace +def _process_silo_file(silo_filepath: str, case: TestCase, out_filepath: str): + """Process a single silo file with h5dump and check for NaNs/Infinities.""" + # Check that silo file exists before attempting to process it + if not os.path.exists(silo_filepath): + raise MFCException( + f"Test {case}: Expected silo file missing: {silo_filepath}. " + f"Check the post-process log at: {out_filepath}" + ) + + h5dump = f"{HDF5.get_install_dirpath(case.to_input_file())}/bin/h5dump" + + if not os.path.exists(h5dump or ""): + if not does_command_exist("h5dump"): + raise MFCException("h5dump couldn't be found.") + h5dump = shutil.which("h5dump") + + output, err = get_program_output([h5dump, silo_filepath]) + + if err != 0: + raise MFCException( + f"Test {case}: Failed to run h5dump on {silo_filepath}. " + f"You can find the run's output in {out_filepath}, " + f"and the case dictionary in {case.get_filepath()}." + ) + + if "nan," in output: + raise MFCException( + f"Test {case}: Post Process has detected a NaN. You can find the run's output in {out_filepath}, " + f"and the case dictionary in {case.get_filepath()}." + ) + + if "inf," in output: + raise MFCException( + f"Test {case}: Post Process has detected an Infinity. You can find the run's output in {out_filepath}, " + f"and the case dictionary in {case.get_filepath()}." + ) + + +def _print_multirank_debug_info(case: TestCase): + """Print debug information for multi-rank post-process failures.""" + case_dir = case.get_dirpath() + restart_dir = os.path.join(case_dir, "restart_data") + + cons.print("[bold yellow]Multi-rank debug (ppn >= 2): inspecting restart_data and post_process.inp[/bold yellow]") + cons.print(f"[bold yellow] Case directory:[/bold yellow] {case_dir}") + cons.print(f"[bold yellow] restart_data directory:[/bold yellow] {restart_dir}") + + # List restart_data contents + if os.path.isdir(restart_dir): + try: + entries = sorted(os.listdir(restart_dir)) + except OSError as exc: + cons.print(f"[bold yellow] Could not list restart_data contents: {exc}[/bold yellow]") + else: + cons.print(f"[bold yellow] restart_data entries ({len(entries)} total, showing up to 20):[/bold yellow]") + for name in entries[:20]: + cons.print(f" - {name}") + else: + cons.print("[bold yellow] restart_data directory does not exist[/bold yellow]") + + # Dump key case parameters relevant to restart/post-process + params = getattr(case, "params", {}) + def _param(name: str): + return params.get(name, "") + + cons.print("[bold yellow] Selected case parameters relevant to restart:[/bold yellow]") + for key in ( + "t_step_start", + "t_step_stop", + "t_step_save", + "n_start", + "t_save", + "parallel_io", + "file_per_process", + ): + cons.print(f" {key} = {_param(key)}") + + # Show the beginning of post_process.inp if present + ppi_path = os.path.join(case_dir, "post_process.inp") + if os.path.exists(ppi_path): + cons.print(f"[bold yellow] First lines of post_process.inp ({ppi_path}):[/bold yellow]") + try: + with open(ppi_path, "r", encoding="utf-8", errors="replace") as f: + for i, line in enumerate(f): + if i >= 40: + break + cons.print(" " + line.rstrip()) + except OSError as exc: + cons.print(f"[bold yellow] Could not read post_process.inp: {exc}[/bold yellow]") + else: + cons.print("[bold yellow] post_process.inp not found in case directory[/bold yellow]") + + def _handle_case(case: TestCase, devices: typing.Set[int]): # pylint: disable=global-statement, global-variable-not-assigned global current_test_number start_time = time.time() + # Set timeout using threading.Timer (works in worker threads) + # Note: we intentionally do not use signal.alarm() here because signals + # only work in the main thread; sched.sched runs tests in worker threads. + # threading.Timer works correctly in this threaded context. + timeout_flag = threading.Event() + timeout_timer = threading.Timer(TEST_TIMEOUT_SECONDS, timeout_flag.set) + timeout_timer.start() + tol = case.compute_tolerance() case.delete_output() case.create_directory() if ARG("dry_run"): cons.print(f" [bold magenta]{case.get_uuid()}[/bold magenta] SKIP {case.trace}") + timeout_timer.cancel() return - cmd = case.run([PRE_PROCESS, SIMULATION], gpus=devices) - - out_filepath = os.path.join(case.get_dirpath(), "out_pre_sim.txt") + try: + # Decide which targets to run in a single pipeline. + # - Default: PRE_PROCESS + SIMULATION (used for golden comparison) + # - With --test-all (-a): also include POST_PROCESS so that the configuration + # used by simulation and post_process is consistent (e.g. parallel_io, + # file_per_process, *_wrt flags). This ensures simulation writes the + # Lustre-style restart/grid files (e.g. restart_data/lustre_x_cb.dat) + # that post_process expects. + targets = [PRE_PROCESS, SIMULATION, POST_PROCESS] if ARG("test_all") else [PRE_PROCESS, SIMULATION] - common.file_write(out_filepath, cmd.stdout) + # Check timeout before starting + if timeout_flag.is_set(): + raise TestTimeoutError("Test case exceeded 1 hour timeout") + cmd = case.run(targets, gpus=devices) - if cmd.returncode != 0: - cons.print(cmd.stdout) - raise MFCException(f"Test {case}: Failed to execute MFC.") + # Check timeout after simulation + if timeout_flag.is_set(): + raise TestTimeoutError("Test case exceeded 1 hour timeout") - pack, err = packer.pack(case.get_dirpath()) - if err is not None: - raise MFCException(f"Test {case}: {err}") + out_filepath = os.path.join(case.get_dirpath(), "out_pre_sim.txt") - if pack.has_NaNs(): - raise MFCException(f"Test {case}: NaNs detected in the case.") - - golden_filepath = os.path.join(case.get_dirpath(), "golden.txt") - if ARG("generate"): - common.delete_file(golden_filepath) - pack.save(golden_filepath) - else: - if not os.path.isfile(golden_filepath): - raise MFCException(f"Test {case}: The golden file does not exist! To generate golden files, use the '--generate' flag.") + common.file_write(out_filepath, cmd.stdout) - golden = packer.load(golden_filepath) + if cmd.returncode != 0: + cons.print(cmd.stdout) + # If test_all is enabled and the pipeline failed, provide extra debug info + if ARG("test_all") and getattr(case, "ppn", 1) >= 2: + _print_multirank_debug_info(case) + raise MFCException(f"Test {case}: Failed to execute MFC.") - if ARG("add_new_variables"): - for pfilepath, pentry in list(pack.entries.items()): - if golden.find(pfilepath) is None: - golden.set(pentry) + pack, err = packer.pack(case.get_dirpath()) + if err is not None: + raise MFCException(f"Test {case}: {err}") - for gfilepath, gentry in list(golden.entries.items()): - if pack.find(gfilepath) is None: - golden.remove(gentry) + if pack.has_NaNs(): + raise MFCException(f"Test {case}: NaNs detected in the case.") - golden.save(golden_filepath) + golden_filepath = os.path.join(case.get_dirpath(), "golden.txt") + if ARG("generate"): + common.delete_file(golden_filepath) + pack.save(golden_filepath) else: - err, msg = packtol.compare(pack, packer.load(golden_filepath), packtol.Tolerance(tol, tol)) - if msg is not None: - raise MFCException(f"Test {case}: {msg}") - - if ARG("test_all"): - case.delete_output() - cmd = case.run([PRE_PROCESS, SIMULATION, POST_PROCESS], gpus=devices) - out_filepath = os.path.join(case.get_dirpath(), "out_post.txt") - common.file_write(out_filepath, cmd.stdout) - - for silo_filepath in os.listdir(os.path.join(case.get_dirpath(), 'silo_hdf5', 'p0')): - silo_filepath = os.path.join(case.get_dirpath(), 'silo_hdf5', 'p0', silo_filepath) - h5dump = f"{HDF5.get_install_dirpath(case.to_input_file())}/bin/h5dump" + if not os.path.isfile(golden_filepath): + raise MFCException(f"Test {case}: The golden file does not exist! To generate golden files, use the '--generate' flag.") - if not os.path.exists(h5dump or ""): - if not does_command_exist("h5dump"): - raise MFCException("h5dump couldn't be found.") + golden = packer.load(golden_filepath) - h5dump = shutil.which("h5dump") + if ARG("add_new_variables"): + for pfilepath, pentry in list(pack.entries.items()): + if golden.find(pfilepath) is None: + golden.set(pentry) - output, err = get_program_output([h5dump, silo_filepath]) + for gfilepath, gentry in list(golden.entries.items()): + if pack.find(gfilepath) is None: + golden.remove(gentry) - if err != 0: - raise MFCException(f"Test {case}: Failed to run h5dump. You can find the run's output in {out_filepath}, and the case dictionary in {case.get_filepath()}.") - - if "nan," in output: - raise MFCException(f"Test {case}: Post Process has detected a NaN. You can find the run's output in {out_filepath}, and the case dictionary in {case.get_filepath()}.") + golden.save(golden_filepath) + else: + err, msg = packtol.compare(pack, packer.load(golden_filepath), packtol.Tolerance(tol, tol)) + if msg is not None: + raise MFCException(f"Test {case}: {msg}") + + if ARG("test_all"): + # We already ran PRE_PROCESS, SIMULATION, and POST_PROCESS together + # in the single pipeline above. At this point: + # - If cmd.returncode != 0, post_process (or an earlier stage) + # failed and we want to surface that with verbose diagnostics. + # - If cmd.returncode == 0, post_process completed and should + # have written its outputs (e.g., silo_hdf5) based on a + # configuration that had "post_process" in ARGS["mfc"]["targets"], + # so parallel_io and restart_data layout are consistent. + + out_post_filepath = os.path.join(case.get_dirpath(), "out_post.txt") + # Write the full pipeline output to an explicit post-process log too, + # even though it includes pre/sim messages. This is helpful for CI. + common.file_write(out_post_filepath, cmd.stdout) + + if cmd.returncode != 0: + cons.print(cmd.stdout) + + # Extra debug for multi-rank restart/post-process issues + if getattr(case, "ppn", 1) >= 2: + _print_multirank_debug_info(case) + + raise MFCException( + f"Test {case}: Failed to execute MFC (post-process). " + f"See log at: {out_post_filepath}" + ) + + # After a successful post_process run, inspect Silo/HDF5 outputs + silo_dir = os.path.join(case.get_dirpath(), 'silo_hdf5', 'p0') + if os.path.isdir(silo_dir): + for silo_filename in os.listdir(silo_dir): + silo_filepath = os.path.join(silo_dir, silo_filename) + _process_silo_file(silo_filepath, case, out_post_filepath) - if "inf," in output: - raise MFCException(f"Test {case}: Post Process has detected an Infinity. You can find the run's output in {out_filepath}, and the case dictionary in {case.get_filepath()}.") + case.delete_output() - case.delete_output() + end_time = time.time() + duration = end_time - start_time - end_time = time.time() - duration = end_time - start_time + current_test_number += 1 + progress_str = f"({current_test_number:3d}/{total_test_count:3d})" + cons.print(f" {progress_str} [bold magenta]{case.get_uuid()}[/bold magenta] {duration:6.2f} {case.trace}") - current_test_number += 1 - progress_str = f"({current_test_number:3d}/{total_test_count:3d})" - cons.print(f" {progress_str} [bold magenta]{case.get_uuid()}[/bold magenta] {duration:6.2f} {case.trace}") + except TestTimeoutError as exc: + log_path = os.path.join(case.get_dirpath(), 'out_pre_sim.txt') + if os.path.exists(log_path): + log_msg = f"Check the log at: {log_path}" + else: + log_msg = ( + f"Log file ({log_path}) may not exist if the timeout occurred early." + ) + raise MFCException( + f"Test {case} exceeded 1 hour timeout. " + f"This may indicate a hung simulation or misconfigured case. " + f"{log_msg}" + ) from exc + finally: + timeout_timer.cancel() # Cancel timeout timer def handle_case(case: TestCase, devices: typing.Set[int]): @@ -278,6 +453,10 @@ def handle_case(case: TestCase, devices: typing.Set[int]): global nFAIL, nPASS, nSKIP global errors + # Check if we should abort before processing this case + if abort_tests.is_set(): + return # Exit gracefully if abort was requested + nAttempts = 0 if ARG('single'): max_attempts = max(ARG('max_attempts'), 3) @@ -301,4 +480,18 @@ def handle_case(case: TestCase, devices: typing.Set[int]): errors.append(f"[bold red]Failed test {case} after {nAttempts} attempt(s).[/bold red]") errors.append(f"{exc}") + # Check if we should abort early due to high failure rate + # Skip this check during dry-run (only builds, doesn't run tests) + if not ARG("dry_run"): + total_completed = nFAIL + nPASS + if total_completed >= MIN_CASES_BEFORE_ABORT: + failure_rate = nFAIL / total_completed + if failure_rate >= FAILURE_RATE_THRESHOLD: + cons.print(f"\n[bold red]CRITICAL: {failure_rate*100:.1f}% failure rate detected after {total_completed} tests.[/bold red]") + cons.print("[bold red]This suggests a systemic issue (bad build, broken environment, etc.)[/bold red]") + cons.print("[bold red]Aborting remaining tests to fail fast.[/bold red]\n") + # Set abort flag instead of raising exception from worker thread + abort_tests.set() + return # Exit gracefully + return