-
Notifications
You must be signed in to change notification settings - Fork 646
[Backend Tester] Write report progressively #13308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 139 commits
f120e70
0fb85e6
4d8d844
dc12b40
ead0616
0f13676
b0b01f2
8b9c9ef
06bf03a
2f8f49b
8ca7766
bffb95f
d21492b
e2c4ea5
8230848
2a1f564
b35e7b1
5c4c6ce
9397803
9dfeb5a
ff5c4a5
42a5de5
402d8f5
34d3ab3
1105e04
482bd21
ea548b7
4108f54
7ef236b
4a58c9d
3b866b4
5ba25cb
19760fc
81dfb07
4d50265
5f66043
24e919d
523cc20
74c95fe
5d437b1
89757ce
423f79a
69f7f9c
c0f6224
e2ea2a3
7a2fab5
033c231
a9ed762
64b174a
3976629
27cd171
7bdd3e5
b1254cd
f2e2289
cdd15c1
e2df06e
4461bd8
7e97fd0
bcb697c
11a5a02
244b146
de21ac2
fd26fc7
4ae840d
710ea49
32f54b0
a27d18c
2eb59fc
5cc4941
ef7af5c
18e89c1
4719c90
dd09555
f1db3a0
e0700b2
f260b50
d62ee60
b2ab3a5
c23c3e9
c99c41a
bf57d6c
f261355
c3a24f9
1697cbc
b94b45e
5740f0a
ed6840d
f2a7e1f
0e162ab
c6bd56b
144a8ae
6f85fc1
2439022
bd79ef2
8932c29
ea2549c
ffaa1c3
bba2fa9
3a3e026
78086b4
f4b0dc2
5e92884
aa27776
54563ee
7e1a002
a628d29
3615d89
e994bc1
0aba8e1
4329bf6
105aabc
c1a51ee
1d34f49
933fba2
d468ae4
acbd480
e515bf1
803db00
ab18089
1897d4e
f65d80f
0d1f097
f0c2490
0046b02
32e1029
871312a
53990fe
567d055
cd998cf
2a837ab
31bc137
dae5d43
06b5532
a343abc
637b8a2
7141f6c
4b43363
995c4b5
0fc4475
a3c2dbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
import csv | ||
|
||
from collections import Counter | ||
from dataclasses import dataclass | ||
from dataclasses import dataclass, field | ||
from datetime import timedelta | ||
from enum import IntEnum | ||
from functools import reduce | ||
|
@@ -11,6 +11,40 @@ | |
from torch.export import ExportedProgram | ||
|
||
|
||
# The maximum number of model output tensors to log statistics for. Most model tests will | ||
# only have one output, but some may return more than one tensor. This upper bound is needed | ||
# upfront since the file is written progressively. Any outputs beyond these will not have stats logged. | ||
MAX_LOGGED_MODEL_OUTPUTS = 2 | ||
|
||
|
||
# Field names for the CSV report. | ||
CSV_FIELD_NAMES = [ | ||
"Test ID", | ||
"Test Case", | ||
"Flow", | ||
"Params", | ||
"Result", | ||
"Result Detail", | ||
"Delegated", | ||
"Quantize Time (s)", | ||
"Lower Time (s)", | ||
"Delegated Nodes", | ||
"Undelegated Nodes", | ||
"Delegated Ops", | ||
"Undelegated Ops", | ||
"PTE Size (Kb)", | ||
] | ||
|
||
for i in range(MAX_LOGGED_MODEL_OUTPUTS): | ||
CSV_FIELD_NAMES.extend( | ||
[ | ||
f"Output {i} Error Max", | ||
f"Output {i} Error MAE", | ||
f"Output {i} SNR", | ||
] | ||
) | ||
|
||
|
||
# Operators that are excluded from the counts returned by count_ops. These are used to | ||
# exclude operatations that are not logically relevant or delegatable to backends. | ||
OP_COUNT_IGNORED_OPS = { | ||
|
@@ -58,6 +92,36 @@ def is_non_backend_failure(self): | |
def is_backend_failure(self): | ||
return not self.is_success() and not self.is_non_backend_failure() | ||
|
||
def to_short_str(self): | ||
if self in {TestResult.SUCCESS, TestResult.SUCCESS_UNDELEGATED}: | ||
return "Pass" | ||
elif self == TestResult.SKIPPED: | ||
return "Skip" | ||
else: | ||
return "Fail" | ||
|
||
def to_detail_str(self): | ||
if self == TestResult.SUCCESS: | ||
return "" | ||
elif self == TestResult.SUCCESS_UNDELEGATED: | ||
return "" | ||
elif self == TestResult.SKIPPED: | ||
return "" | ||
elif self == TestResult.QUANTIZE_FAIL: | ||
return "Quantization Failed" | ||
elif self == TestResult.LOWER_FAIL: | ||
return "Lowering Failed" | ||
elif self == TestResult.PTE_LOAD_FAIL: | ||
return "PTE Load Failed" | ||
elif self == TestResult.PTE_RUN_FAIL: | ||
return "PTE Run Failed" | ||
elif self == TestResult.OUTPUT_MISMATCH_FAIL: | ||
return "Output Mismatch" | ||
elif self == TestResult.UNKNOWN_FAIL: | ||
return "Unknown Failure" | ||
else: | ||
raise ValueError(f"Invalid TestResult value: {self}.") | ||
|
||
def display_name(self): | ||
if self == TestResult.SUCCESS: | ||
return "Success (Delegated)" | ||
|
@@ -129,12 +193,23 @@ class TestCaseSummary: | |
pte_size_bytes: int | None = None | ||
""" The size of the PTE file in bytes. """ | ||
|
||
def is_delegated(self): | ||
return ( | ||
any(v > 0 for v in self.delegated_op_counts.values()) | ||
if self.delegated_op_counts | ||
else False | ||
) | ||
|
||
|
||
@dataclass | ||
class TestSessionState: | ||
test_case_summaries: list[TestCaseSummary] | ||
# True if the CSV header has been written to report__path. | ||
has_written_report_header: bool = False | ||
|
||
# The file path to write the detail report to, if enabled. | ||
report_path: str | None = None | ||
|
||
def __init__(self): | ||
self.test_case_summaries = [] | ||
test_case_summaries: list[TestCaseSummary] = field(default_factory=list) | ||
|
||
|
||
@dataclass | ||
|
@@ -212,11 +287,11 @@ def count_ops(program: dict[str, ExportedProgram] | ExportedProgram) -> Counter: | |
) | ||
|
||
|
||
def begin_test_session(): | ||
def begin_test_session(report_path: str | None): | ||
global _active_session | ||
|
||
assert _active_session is None, "A test session is already active." | ||
_active_session = TestSessionState() | ||
_active_session = TestSessionState(report_path=report_path) | ||
|
||
|
||
def log_test_summary(summary: TestCaseSummary): | ||
|
@@ -225,6 +300,15 @@ def log_test_summary(summary: TestCaseSummary): | |
if _active_session is not None: | ||
_active_session.test_case_summaries.append(summary) | ||
|
||
if _active_session.report_path is not None: | ||
file_mode = "a" if _active_session.has_written_report_header else "w" | ||
with open(_active_session.report_path, file_mode) as f: | ||
if not _active_session.has_written_report_header: | ||
write_csv_header(f) | ||
_active_session.has_written_report_header = True | ||
|
||
write_csv_row(summary, f) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implies we crash when we run into some failure? Can we try to catch and fail gracefully? Instead of assuming that we can crash anytime. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is mainly for memory corruption, which happens in a few cases. It might be possible to catch the SIGSEGV or other native fault, but I don't know what state the process is it, so I'm not sure if it's recoverable. Open to suggestions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this through pybinding? Does it crash the cpython process? or this is in a subprocess? if its the main process then yeah I don't know if we can try..catch but if its a subprocess then maybe. OK with unblocking you now.. |
||
|
||
|
||
def complete_test_session() -> RunSummary: | ||
global _active_session | ||
|
@@ -243,6 +327,13 @@ def _sum_op_counts(counter: Counter | None) -> int | None: | |
return sum(counter.values()) if counter is not None else None | ||
|
||
|
||
def _serialize_params(params: dict[str, Any] | None) -> str: | ||
if params is not None: | ||
return str(dict(sorted(params.items()))) | ||
else: | ||
return "" | ||
|
||
|
||
def _serialize_op_counts(counter: Counter | None) -> str: | ||
""" | ||
A utility function to serialize op counts to a string, for the purpose of including | ||
|
@@ -254,89 +345,49 @@ def _serialize_op_counts(counter: Counter | None) -> str: | |
return "" | ||
|
||
|
||
def generate_csv_report(summary: RunSummary, output: TextIO): | ||
"""Write a run summary report to a file in CSV format.""" | ||
|
||
field_names = [ | ||
"Test ID", | ||
"Test Case", | ||
"Backend", | ||
"Flow", | ||
"Result", | ||
"Quantize Time (s)", | ||
"Lowering Time (s)", | ||
] | ||
|
||
# Tests can have custom parameters. We'll want to report them here, so we need | ||
# a list of all unique parameter names. | ||
param_names = reduce( | ||
lambda a, b: a.union(b), | ||
( | ||
set(s.params.keys()) | ||
for s in summary.test_case_summaries | ||
if s.params is not None | ||
), | ||
set(), | ||
) | ||
field_names += (s.capitalize() for s in param_names) | ||
|
||
# Add tensor error statistic field names for each output index. | ||
max_outputs = max( | ||
len(s.tensor_error_statistics) for s in summary.test_case_summaries | ||
) | ||
for i in range(max_outputs): | ||
field_names.extend( | ||
[ | ||
f"Output {i} Error Max", | ||
f"Output {i} Error MAE", | ||
f"Output {i} Error MSD", | ||
f"Output {i} Error L2", | ||
f"Output {i} SQNR", | ||
] | ||
) | ||
field_names.extend( | ||
[ | ||
"Delegated Nodes", | ||
"Undelegated Nodes", | ||
"Delegated Ops", | ||
"Undelegated Ops", | ||
"PTE Size (Kb)", | ||
] | ||
) | ||
|
||
writer = csv.DictWriter(output, field_names) | ||
def write_csv_header(output: TextIO): | ||
writer = csv.DictWriter(output, CSV_FIELD_NAMES) | ||
writer.writeheader() | ||
|
||
for record in summary.test_case_summaries: | ||
row = { | ||
"Test ID": record.name, | ||
"Test Case": record.base_name, | ||
"Backend": record.backend, | ||
"Flow": record.flow, | ||
"Result": record.result.display_name(), | ||
"Quantize Time (s)": ( | ||
record.quantize_time.total_seconds() if record.quantize_time else None | ||
), | ||
"Lowering Time (s)": ( | ||
record.lower_time.total_seconds() if record.lower_time else None | ||
), | ||
} | ||
if record.params is not None: | ||
row.update({k.capitalize(): v for k, v in record.params.items()}) | ||
|
||
for output_idx, error_stats in enumerate(record.tensor_error_statistics): | ||
row[f"Output {output_idx} Error Max"] = error_stats.error_max | ||
row[f"Output {output_idx} Error MAE"] = error_stats.error_mae | ||
row[f"Output {output_idx} Error MSD"] = error_stats.error_msd | ||
row[f"Output {output_idx} Error L2"] = error_stats.error_l2_norm | ||
row[f"Output {output_idx} SQNR"] = error_stats.sqnr | ||
|
||
row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts) | ||
row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts) | ||
row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts) | ||
row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts) | ||
row["PTE Size (Kb)"] = ( | ||
record.pte_size_bytes / 1000.0 if record.pte_size_bytes else "" | ||
) | ||
|
||
writer.writerow(row) | ||
def write_csv_row(record: TestCaseSummary, output: TextIO): | ||
writer = csv.DictWriter(output, CSV_FIELD_NAMES) | ||
|
||
row = { | ||
"Test ID": record.name, | ||
"Test Case": record.base_name, | ||
"Flow": record.flow, | ||
"Params": _serialize_params(record.params), | ||
"Result": record.result.to_short_str(), | ||
"Result Detail": record.result.to_detail_str(), | ||
"Delegated": "True" if record.is_delegated() else "False", | ||
"Quantize Time (s)": ( | ||
f"{record.quantize_time.total_seconds():.3f}" | ||
if record.quantize_time | ||
else None | ||
), | ||
"Lower Time (s)": ( | ||
f"{record.lower_time.total_seconds():.3f}" if record.lower_time else None | ||
), | ||
} | ||
|
||
for output_idx, error_stats in enumerate(record.tensor_error_statistics): | ||
if output_idx >= MAX_LOGGED_MODEL_OUTPUTS: | ||
print( | ||
f"Model output stats are truncated as model has more than {MAX_LOGGED_MODEL_OUTPUTS} outputs. Consider increasing MAX_LOGGED_MODEL_OUTPUTS." | ||
) | ||
break | ||
|
||
row[f"Output {output_idx} Error Max"] = f"{error_stats.error_max:.3f}" | ||
row[f"Output {output_idx} Error MAE"] = f"{error_stats.error_mae:.3f}" | ||
row[f"Output {output_idx} SNR"] = f"{error_stats.sqnr:.3f}" | ||
|
||
row["Delegated Nodes"] = _sum_op_counts(record.delegated_op_counts) | ||
row["Undelegated Nodes"] = _sum_op_counts(record.undelegated_op_counts) | ||
row["Delegated Ops"] = _serialize_op_counts(record.delegated_op_counts) | ||
row["Undelegated Ops"] = _serialize_op_counts(record.undelegated_op_counts) | ||
row["PTE Size (Kb)"] = ( | ||
f"{record.pte_size_bytes / 1000.0:.3f}" if record.pte_size_bytes else "" | ||
) | ||
|
||
writer.writerow(row) |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can multiple subprocesses write to this simultaneously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not safely. What is the use case? Are you thinking we parallelize tests between processes? That seems nice to have, though I'd be inclined to deal with concurrency issues if/when we add that feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah all popular test frameworks do runs in parallel threads, so perhaps in the future.