diff --git a/backends/test/suite/reporting.py b/backends/test/suite/reporting.py index a19c63dd474..6294ab9434f 100644 --- a/backends/test/suite/reporting.py +++ b/backends/test/suite/reporting.py @@ -1,7 +1,7 @@ import csv from collections import Counter -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import timedelta from enum import IntEnum from functools import reduce @@ -11,6 +11,40 @@ from torch.export import ExportedProgram +# The maximum number of model output tensors to log statistics for. Most model tests will +# only have one output, but some may return more than one tensor. This upper bound is needed +# upfront since the file is written progressively. Any outputs beyond these will not have stats logged. +MAX_LOGGED_MODEL_OUTPUTS = 2 + + +# Field names for the CSV report. +CSV_FIELD_NAMES = [ + "Test ID", + "Test Case", + "Flow", + "Params", + "Result", + "Result Detail", + "Delegated", + "Quantize Time (s)", + "Lower Time (s)", + "Delegated Nodes", + "Undelegated Nodes", + "Delegated Ops", + "Undelegated Ops", + "PTE Size (Kb)", +] + +for i in range(MAX_LOGGED_MODEL_OUTPUTS): + CSV_FIELD_NAMES.extend( + [ + f"Output {i} Error Max", + f"Output {i} Error MAE", + f"Output {i} SNR", + ] + ) + + # Operators that are excluded from the counts returned by count_ops. These are used to # exclude operatations that are not logically relevant or delegatable to backends. OP_COUNT_IGNORED_OPS = { @@ -167,11 +201,15 @@ def is_delegated(self): ) +@dataclass class TestSessionState: - test_case_summaries: list[TestCaseSummary] + # True if the CSV header has been written to report__path. + has_written_report_header: bool = False - def __init__(self): - self.test_case_summaries = [] + # The file path to write the detail report to, if enabled. + report_path: str | None = None + + test_case_summaries: list[TestCaseSummary] = field(default_factory=list) @dataclass @@ -249,11 +287,11 @@ def count_ops(program: dict[str, ExportedProgram] | ExportedProgram) -> Counter: ) -def begin_test_session(): +def begin_test_session(report_path: str | None): global _active_session assert _active_session is None, "A test session is already active." - _active_session = TestSessionState() + _active_session = TestSessionState(report_path=report_path) def log_test_summary(summary: TestCaseSummary): @@ -262,6 +300,15 @@ def log_test_summary(summary: TestCaseSummary): if _active_session is not None: _active_session.test_case_summaries.append(summary) + if _active_session.report_path is not None: + file_mode = "a" if _active_session.has_written_report_header else "w" + with open(_active_session.report_path, file_mode) as f: + if not _active_session.has_written_report_header: + write_csv_header(f) + _active_session.has_written_report_header = True + + write_csv_row(summary, f) + def complete_test_session() -> RunSummary: global _active_session @@ -280,6 +327,13 @@ def _sum_op_counts(counter: Counter | None) -> int | None: return sum(counter.values()) if counter is not None else None +def _serialize_params(params: dict[str, Any] | None) -> str: + if params is not None: + return str(dict(sorted(params.items()))) + else: + return "" + + def _serialize_op_counts(counter: Counter | None) -> str: """ A utility function to serialize op counts to a string, for the purpose of including @@ -291,91 +345,49 @@ def _serialize_op_counts(counter: Counter | None) -> str: return "" -def generate_csv_report(summary: RunSummary, output: TextIO): - """Write a run summary report to a file in CSV format.""" - - field_names = [ - "Test ID", - "Test Case", - "Flow", - "Result", - "Result Detail", - "Delegated", - "Quantize Time (s)", - "Lower Time (s)", - ] - - # Tests can have custom parameters. We'll want to report them here, so we need - # a list of all unique parameter names. - param_names = reduce( - lambda a, b: a.union(b), - ( - set(s.params.keys()) - for s in summary.test_case_summaries - if s.params is not None - ), - set(), - ) - field_names += (s.capitalize() for s in param_names) - - # Add tensor error statistic field names for each output index. - max_outputs = max( - len(s.tensor_error_statistics) for s in summary.test_case_summaries - ) - for i in range(max_outputs): - field_names.extend( - [ - f"Output {i} Error Max", - f"Output {i} Error MAE", - f"Output {i} SNR", - ] - ) - field_names.extend( - [ - "Delegated Nodes", - "Undelegated Nodes", - "Delegated Ops", - "Undelegated Ops", - "PTE Size (Kb)", - ] - ) - - writer = csv.DictWriter(output, field_names) +def write_csv_header(output: TextIO): + writer = csv.DictWriter(output, CSV_FIELD_NAMES) writer.writeheader() - for record in summary.test_case_summaries: - row = { - "Test ID": record.name, - "Test Case": record.base_name, - "Flow": record.flow, - "Result": record.result.to_short_str(), - "Result Detail": record.result.to_detail_str(), - "Delegated": "True" if record.is_delegated() else "False", - "Quantize Time (s)": ( - f"{record.quantize_time.total_seconds():.3f}" - if record.quantize_time - else None - ), - "Lower Time (s)": ( - f"{record.lower_time.total_seconds():.3f}" - if record.lower_time - else None - ), - } - if record.params is not None: - row.update({k.capitalize(): v for k, v in record.params.items()}) - - for output_idx, error_stats in enumerate(record.tensor_error_statistics): - row[f"Output {output_idx} Error Max"] = f"{error_stats.error_max:.3f}" - row[f"Output {output_idx} Error MAE"] = f"{error_stats.error_mae:.3f}" - row[f"Output {output_idx} SNR"] = f"{error_stats.sqnr:.3f}" - - row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts) - row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts) - row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts) - row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts) - row["PTE Size (Kb)"] = ( - f"{record.pte_size_bytes / 1000.0:.3f}" if record.pte_size_bytes else "" - ) - writer.writerow(row) +def write_csv_row(record: TestCaseSummary, output: TextIO): + writer = csv.DictWriter(output, CSV_FIELD_NAMES) + + row = { + "Test ID": record.name, + "Test Case": record.base_name, + "Flow": record.flow, + "Params": _serialize_params(record.params), + "Result": record.result.to_short_str(), + "Result Detail": record.result.to_detail_str(), + "Delegated": "True" if record.is_delegated() else "False", + "Quantize Time (s)": ( + f"{record.quantize_time.total_seconds():.3f}" + if record.quantize_time + else None + ), + "Lower Time (s)": ( + f"{record.lower_time.total_seconds():.3f}" if record.lower_time else None + ), + } + + for output_idx, error_stats in enumerate(record.tensor_error_statistics): + if output_idx >= MAX_LOGGED_MODEL_OUTPUTS: + print( + f"Model output stats are truncated as model has more than {MAX_LOGGED_MODEL_OUTPUTS} outputs. Consider increasing MAX_LOGGED_MODEL_OUTPUTS." + ) + break + + row[f"Output {output_idx} Error Max"] = f"{error_stats.error_max:.3f}" + row[f"Output {output_idx} Error MAE"] = f"{error_stats.error_mae:.3f}" + row[f"Output {output_idx} SNR"] = f"{error_stats.sqnr:.3f}" + + row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts) + row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts) + row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts) + row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts) + row["PTE Size (Kb)"] = ( + f"{record.pte_size_bytes / 1000.0:.3f}" if record.pte_size_bytes else "" + ) + + writer.writerow(row) diff --git a/backends/test/suite/runner.py b/backends/test/suite/runner.py index 101e168476b..b128d64eca2 100644 --- a/backends/test/suite/runner.py +++ b/backends/test/suite/runner.py @@ -25,7 +25,6 @@ begin_test_session, complete_test_session, count_ops, - generate_csv_report, RunSummary, TestCaseSummary, TestResult, @@ -248,7 +247,7 @@ def build_test_filter(args: argparse.Namespace) -> TestFilter: def runner_main(): args = parse_args() - begin_test_session() + begin_test_session(args.report) if len(args.suite) > 1: raise NotImplementedError("TODO Support multiple suites.") @@ -263,11 +262,6 @@ def runner_main(): summary = complete_test_session() print_summary(summary) - if args.report is not None: - with open(args.report, "w") as f: - print(f"Writing CSV report to {args.report}.") - generate_csv_report(summary, f) - if __name__ == "__main__": runner_main() diff --git a/backends/test/suite/tests/test_reporting.py b/backends/test/suite/tests/test_reporting.py index c3324b58332..6ab4817b44c 100644 --- a/backends/test/suite/tests/test_reporting.py +++ b/backends/test/suite/tests/test_reporting.py @@ -9,11 +9,12 @@ from ..reporting import ( count_ops, - generate_csv_report, RunSummary, TestCaseSummary, TestResult, TestSessionState, + write_csv_header, + write_csv_row, ) # Test data for simulated test results. @@ -69,7 +70,9 @@ def test_csv_report_simple(self): run_summary = RunSummary.from_session(session_state) strio = StringIO() - generate_csv_report(run_summary, strio) + write_csv_header(strio) + for case_summary in run_summary.test_case_summaries: + write_csv_row(case_summary, strio) # Attempt to deserialize and validate the CSV report. report = DictReader(StringIO(strio.getvalue())) @@ -81,32 +84,28 @@ def test_csv_report_simple(self): self.assertEqual(records[0]["Test Case"], "test1") self.assertEqual(records[0]["Flow"], "flow1") self.assertEqual(records[0]["Result"], "Pass") - self.assertEqual(records[0]["Dtype"], "") - self.assertEqual(records[0]["Use_dynamic_shapes"], "") + self.assertEqual(records[0]["Params"], "") # Validate second record: test1, backend2, LOWER_FAIL self.assertEqual(records[1]["Test ID"], "test1_backend2_flow1") self.assertEqual(records[1]["Test Case"], "test1") self.assertEqual(records[1]["Flow"], "flow1") self.assertEqual(records[1]["Result"], "Fail") - self.assertEqual(records[1]["Dtype"], "") - self.assertEqual(records[1]["Use_dynamic_shapes"], "") + self.assertEqual(records[1]["Params"], "") # Validate third record: test2, backend1, SUCCESS_UNDELEGATED with dtype param self.assertEqual(records[2]["Test ID"], "test2_backend1_flow1") self.assertEqual(records[2]["Test Case"], "test2") self.assertEqual(records[2]["Flow"], "flow1") self.assertEqual(records[2]["Result"], "Pass") - self.assertEqual(records[2]["Dtype"], str(torch.float32)) - self.assertEqual(records[2]["Use_dynamic_shapes"], "") + self.assertEqual(records[2]["Params"], str({"dtype": torch.float32})) # Validate fourth record: test2, backend2, EXPORT_FAIL with use_dynamic_shapes param self.assertEqual(records[3]["Test ID"], "test2_backend2_flow1") self.assertEqual(records[3]["Test Case"], "test2") self.assertEqual(records[3]["Flow"], "flow1") self.assertEqual(records[3]["Result"], "Skip") - self.assertEqual(records[3]["Dtype"], "") - self.assertEqual(records[3]["Use_dynamic_shapes"], "True") + self.assertEqual(records[3]["Params"], str({"use_dynamic_shapes": True})) def test_count_ops(self): """