Skip to content

[Backend Tester] Write report progressively #13308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: gh/GregoryComer/119/head
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 103 additions & 91 deletions backends/test/suite/reporting.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import csv

from collections import Counter
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import timedelta
from enum import IntEnum
from functools import reduce
Expand All @@ -11,6 +11,40 @@
from torch.export import ExportedProgram


# The maximum number of model output tensors to log statistics for. Most model tests will
# only have one output, but some may return more than one tensor. This upper bound is needed
# upfront since the file is written progressively. Any outputs beyond these will not have stats logged.
MAX_LOGGED_MODEL_OUTPUTS = 2


# Field names for the CSV report.
CSV_FIELD_NAMES = [
"Test ID",
"Test Case",
"Flow",
"Params",
"Result",
"Result Detail",
"Delegated",
"Quantize Time (s)",
"Lower Time (s)",
"Delegated Nodes",
"Undelegated Nodes",
"Delegated Ops",
"Undelegated Ops",
"PTE Size (Kb)",
]

for i in range(MAX_LOGGED_MODEL_OUTPUTS):
CSV_FIELD_NAMES.extend(
[
f"Output {i} Error Max",
f"Output {i} Error MAE",
f"Output {i} SNR",
]
)


# Operators that are excluded from the counts returned by count_ops. These are used to
# exclude operatations that are not logically relevant or delegatable to backends.
OP_COUNT_IGNORED_OPS = {
Expand Down Expand Up @@ -167,11 +201,15 @@ def is_delegated(self):
)


@dataclass
class TestSessionState:
test_case_summaries: list[TestCaseSummary]
# True if the CSV header has been written to report__path.
has_written_report_header: bool = False

def __init__(self):
self.test_case_summaries = []
# The file path to write the detail report to, if enabled.
report_path: str | None = None

test_case_summaries: list[TestCaseSummary] = field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -249,11 +287,11 @@ def count_ops(program: dict[str, ExportedProgram] | ExportedProgram) -> Counter:
)


def begin_test_session():
def begin_test_session(report_path: str | None):
global _active_session

assert _active_session is None, "A test session is already active."
_active_session = TestSessionState()
_active_session = TestSessionState(report_path=report_path)


def log_test_summary(summary: TestCaseSummary):
Expand All @@ -262,6 +300,15 @@ def log_test_summary(summary: TestCaseSummary):
if _active_session is not None:
_active_session.test_case_summaries.append(summary)

if _active_session.report_path is not None:
Copy link
Contributor

@digantdesai digantdesai Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can multiple subprocesses write to this simultaneously?

file_mode = "a" if _active_session.has_written_report_header else "w"
with open(_active_session.report_path, file_mode) as f:
if not _active_session.has_written_report_header:
write_csv_header(f)
_active_session.has_written_report_header = True

write_csv_row(summary, f)
Copy link
Contributor

@digantdesai digantdesai Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies we crash when we run into some failure? Can we try to catch and fail gracefully? Instead of assuming that we can crash anytime.



def complete_test_session() -> RunSummary:
global _active_session
Expand All @@ -280,6 +327,13 @@ def _sum_op_counts(counter: Counter | None) -> int | None:
return sum(counter.values()) if counter is not None else None


def _serialize_params(params: dict[str, Any] | None) -> str:
if params is not None:
return str(dict(sorted(params.items())))
else:
return ""


def _serialize_op_counts(counter: Counter | None) -> str:
"""
A utility function to serialize op counts to a string, for the purpose of including
Expand All @@ -291,91 +345,49 @@ def _serialize_op_counts(counter: Counter | None) -> str:
return ""


def generate_csv_report(summary: RunSummary, output: TextIO):
"""Write a run summary report to a file in CSV format."""

field_names = [
"Test ID",
"Test Case",
"Flow",
"Result",
"Result Detail",
"Delegated",
"Quantize Time (s)",
"Lower Time (s)",
]

# Tests can have custom parameters. We'll want to report them here, so we need
# a list of all unique parameter names.
param_names = reduce(
lambda a, b: a.union(b),
(
set(s.params.keys())
for s in summary.test_case_summaries
if s.params is not None
),
set(),
)
field_names += (s.capitalize() for s in param_names)

# Add tensor error statistic field names for each output index.
max_outputs = max(
len(s.tensor_error_statistics) for s in summary.test_case_summaries
)
for i in range(max_outputs):
field_names.extend(
[
f"Output {i} Error Max",
f"Output {i} Error MAE",
f"Output {i} SNR",
]
)
field_names.extend(
[
"Delegated Nodes",
"Undelegated Nodes",
"Delegated Ops",
"Undelegated Ops",
"PTE Size (Kb)",
]
)

writer = csv.DictWriter(output, field_names)
def write_csv_header(output: TextIO):
writer = csv.DictWriter(output, CSV_FIELD_NAMES)
writer.writeheader()

for record in summary.test_case_summaries:
row = {
"Test ID": record.name,
"Test Case": record.base_name,
"Flow": record.flow,
"Result": record.result.to_short_str(),
"Result Detail": record.result.to_detail_str(),
"Delegated": "True" if record.is_delegated() else "False",
"Quantize Time (s)": (
f"{record.quantize_time.total_seconds():.3f}"
if record.quantize_time
else None
),
"Lower Time (s)": (
f"{record.lower_time.total_seconds():.3f}"
if record.lower_time
else None
),
}
if record.params is not None:
row.update({k.capitalize(): v for k, v in record.params.items()})

for output_idx, error_stats in enumerate(record.tensor_error_statistics):
row[f"Output {output_idx} Error Max"] = f"{error_stats.error_max:.3f}"
row[f"Output {output_idx} Error MAE"] = f"{error_stats.error_mae:.3f}"
row[f"Output {output_idx} SNR"] = f"{error_stats.sqnr:.3f}"

row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts)
row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts)
row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts)
row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts)
row["PTE Size (Kb)"] = (
f"{record.pte_size_bytes / 1000.0:.3f}" if record.pte_size_bytes else ""
)

writer.writerow(row)
def write_csv_row(record: TestCaseSummary, output: TextIO):
writer = csv.DictWriter(output, CSV_FIELD_NAMES)

row = {
"Test ID": record.name,
"Test Case": record.base_name,
"Flow": record.flow,
"Params": _serialize_params(record.params),
"Result": record.result.to_short_str(),
"Result Detail": record.result.to_detail_str(),
"Delegated": "True" if record.is_delegated() else "False",
"Quantize Time (s)": (
f"{record.quantize_time.total_seconds():.3f}"
if record.quantize_time
else None
),
"Lower Time (s)": (
f"{record.lower_time.total_seconds():.3f}" if record.lower_time else None
),
}

for output_idx, error_stats in enumerate(record.tensor_error_statistics):
if output_idx >= MAX_LOGGED_MODEL_OUTPUTS:
print(
f"Model output stats are truncated as model has more than {MAX_LOGGED_MODEL_OUTPUTS} outputs. Consider increasing MAX_LOGGED_MODEL_OUTPUTS."
)
break

row[f"Output {output_idx} Error Max"] = f"{error_stats.error_max:.3f}"
row[f"Output {output_idx} Error MAE"] = f"{error_stats.error_mae:.3f}"
row[f"Output {output_idx} SNR"] = f"{error_stats.sqnr:.3f}"

row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts)
row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts)
row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts)
row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts)
row["PTE Size (Kb)"] = (
f"{record.pte_size_bytes / 1000.0:.3f}" if record.pte_size_bytes else ""
)

writer.writerow(row)
8 changes: 1 addition & 7 deletions backends/test/suite/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
begin_test_session,
complete_test_session,
count_ops,
generate_csv_report,
RunSummary,
TestCaseSummary,
TestResult,
Expand Down Expand Up @@ -248,7 +247,7 @@ def build_test_filter(args: argparse.Namespace) -> TestFilter:
def runner_main():
args = parse_args()

begin_test_session()
begin_test_session(args.report)

if len(args.suite) > 1:
raise NotImplementedError("TODO Support multiple suites.")
Expand All @@ -263,11 +262,6 @@ def runner_main():
summary = complete_test_session()
print_summary(summary)

if args.report is not None:
with open(args.report, "w") as f:
print(f"Writing CSV report to {args.report}.")
generate_csv_report(summary, f)


if __name__ == "__main__":
runner_main()
19 changes: 9 additions & 10 deletions backends/test/suite/tests/test_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

from ..reporting import (
count_ops,
generate_csv_report,
RunSummary,
TestCaseSummary,
TestResult,
TestSessionState,
write_csv_header,
write_csv_row,
)

# Test data for simulated test results.
Expand Down Expand Up @@ -69,7 +70,9 @@ def test_csv_report_simple(self):
run_summary = RunSummary.from_session(session_state)

strio = StringIO()
generate_csv_report(run_summary, strio)
write_csv_header(strio)
for case_summary in run_summary.test_case_summaries:
write_csv_row(case_summary, strio)

# Attempt to deserialize and validate the CSV report.
report = DictReader(StringIO(strio.getvalue()))
Expand All @@ -81,32 +84,28 @@ def test_csv_report_simple(self):
self.assertEqual(records[0]["Test Case"], "test1")
self.assertEqual(records[0]["Flow"], "flow1")
self.assertEqual(records[0]["Result"], "Pass")
self.assertEqual(records[0]["Dtype"], "")
self.assertEqual(records[0]["Use_dynamic_shapes"], "")
self.assertEqual(records[0]["Params"], "")

# Validate second record: test1, backend2, LOWER_FAIL
self.assertEqual(records[1]["Test ID"], "test1_backend2_flow1")
self.assertEqual(records[1]["Test Case"], "test1")
self.assertEqual(records[1]["Flow"], "flow1")
self.assertEqual(records[1]["Result"], "Fail")
self.assertEqual(records[1]["Dtype"], "")
self.assertEqual(records[1]["Use_dynamic_shapes"], "")
self.assertEqual(records[1]["Params"], "")

# Validate third record: test2, backend1, SUCCESS_UNDELEGATED with dtype param
self.assertEqual(records[2]["Test ID"], "test2_backend1_flow1")
self.assertEqual(records[2]["Test Case"], "test2")
self.assertEqual(records[2]["Flow"], "flow1")
self.assertEqual(records[2]["Result"], "Pass")
self.assertEqual(records[2]["Dtype"], str(torch.float32))
self.assertEqual(records[2]["Use_dynamic_shapes"], "")
self.assertEqual(records[2]["Params"], str({"dtype": torch.float32}))

# Validate fourth record: test2, backend2, EXPORT_FAIL with use_dynamic_shapes param
self.assertEqual(records[3]["Test ID"], "test2_backend2_flow1")
self.assertEqual(records[3]["Test Case"], "test2")
self.assertEqual(records[3]["Flow"], "flow1")
self.assertEqual(records[3]["Result"], "Skip")
self.assertEqual(records[3]["Dtype"], "")
self.assertEqual(records[3]["Use_dynamic_shapes"], "True")
self.assertEqual(records[3]["Params"], str({"use_dynamic_shapes": True}))

def test_count_ops(self):
"""
Expand Down
Loading