diff --git a/backends/arm/test/common.py b/backends/arm/test/common.py index 3f90c8c056c..3e3d89f4569 100644 --- a/backends/arm/test/common.py +++ b/backends/arm/test/common.py @@ -47,16 +47,15 @@ def maybe_get_tosa_collate_path() -> str | None: tosa_test_base = os.environ.get("TOSA_TESTCASES_BASE_PATH") if tosa_test_base: current_test = os.environ.get("PYTEST_CURRENT_TEST") - #'backends/arm/test/ops/test_mean_dim.py::TestMeanDim::test_meandim_tosa_BI_0_zeros (call)' - test_class = current_test.split("::")[1] # type: ignore[union-attr] - test_name = current_test.split("::")[-1].split(" ")[0] # type: ignore[union-attr] + # '::test_collate_tosa_BI_tests[randn] (call)' + test_name = current_test.split("::")[1].split(" ")[0] # type: ignore[union-attr] if "BI" in test_name: tosa_test_base = os.path.join(tosa_test_base, "tosa-bi") elif "MI" in test_name: tosa_test_base = os.path.join(tosa_test_base, "tosa-mi") else: tosa_test_base = os.path.join(tosa_test_base, "other") - return os.path.join(tosa_test_base, test_class, test_name) + return os.path.join(tosa_test_base, test_name) return None diff --git a/backends/arm/test/misc/test_custom_partition.py b/backends/arm/test/misc/test_custom_partition.py index 00bc4d306ae..c2889f17ce3 100644 --- a/backends/arm/test/misc/test_custom_partition.py +++ b/backends/arm/test/misc/test_custom_partition.py @@ -4,11 +4,11 @@ # LICENSE file in the root directory of this source tree. import logging +from typing import Tuple import torch from executorch.backends.arm.test import common -from executorch.backends.arm.test.tester.arm_tester import ArmTester -from executorch.backends.arm.tosa_partitioner import TOSAPartitioner +from executorch.backends.arm.test.tester.test_pipeline import TosaPipelineMI from executorch.exir.backend.operator_support import ( DontPartition, DontPartitionModule, @@ -16,9 +16,13 @@ ) from executorch.exir.dialects._ops import ops as exir_ops +input_t1 = Tuple[torch.Tensor, torch.Tensor] # Input x, y + class CustomPartitioning(torch.nn.Module): - inputs = (torch.randn(10, 4, 5), torch.randn(10, 4, 5)) + inputs = { + "randn": (torch.randn(10, 4, 5), torch.randn(10, 4, 5)), + } def forward(self, x: torch.Tensor, y: torch.Tensor): z = x + y @@ -27,7 +31,9 @@ def forward(self, x: torch.Tensor, y: torch.Tensor): class NestedModule(torch.nn.Module): - inputs = (torch.randn(10, 4, 5), torch.randn(10, 4, 5)) + inputs = { + "randn": (torch.randn(10, 4, 5), torch.randn(10, 4, 5)), + } def __init__(self): super().__init__() @@ -39,192 +45,139 @@ def forward(self, x: torch.Tensor, y: torch.Tensor): return self.nested(a, b) -def test_single_reject(caplog): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_single_reject(caplog, test_data: input_t1): caplog.set_level(logging.INFO) module = CustomPartitioning() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) check = DontPartition(exir_ops.edge.aten.sigmoid.default) - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 2}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} + ) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() assert "Rejected by DontPartition" in caplog.text -def test_multiple_reject(): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_multiple_reject(test_data: input_t1): module = CustomPartitioning() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) check = DontPartition( exir_ops.edge.aten.sigmoid.default, exir_ops.edge.aten.mul.Tensor ) - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 1}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} + ) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() -def test_torch_op_reject(caplog): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_torch_op_reject(caplog, test_data: input_t1): caplog.set_level(logging.INFO) module = CustomPartitioning() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartition(torch.ops.aten.sigmoid.default) - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 2}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} ) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, + ) + pipeline.run() assert check.has_rejected_node() assert "Rejected by DontPartition" in caplog.text -def test_string_op_reject(): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_string_op_reject(test_data: input_t1): module = CustomPartitioning() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartition("aten.sigmoid.default") - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 2}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} ) - + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, + ) + pipeline.run() assert check.has_rejected_node() -def test_name_reject(caplog): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_name_reject(caplog, test_data: input_t1): caplog.set_level(logging.INFO) module = CustomPartitioning() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartitionName("mul", "sigmoid", exact=False) - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 1}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() assert "Rejected by DontPartitionName" in caplog.text -def test_module_reject(): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_module_reject(test_data: input_t1): module = NestedModule() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartitionModule(module_name="CustomPartitioning") - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 1}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() -def test_inexact_module_reject(caplog): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_inexact_module_reject(caplog, test_data: input_t1): caplog.set_level(logging.INFO) module = NestedModule() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartitionModule(module_name="Custom", exact=False) - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 1}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() assert "Rejected by DontPartitionModule" in caplog.text -def test_module_instance_reject(): +@common.parametrize("test_data", CustomPartitioning.inputs) +def test_module_instance_reject(test_data: input_t1): module = NestedModule() - inputs = module.inputs - compile_spec = common.get_tosa_compile_spec("TOSA-0.80+MI") check = DontPartitionModule(instance_name="nested") - partitioner = TOSAPartitioner(compile_spec, additional_checks=[check]) - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=compile_spec, - ) - .export() - .to_edge_transform_and_lower(partitioners=[partitioner]) - .check(["executorch_exir_dialects_edge__ops_aten_sigmoid_default"]) - .check_count({"torch.ops.higher_order.executorch_call_delegate": 1}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) + pipeline = TosaPipelineMI[input_t1](module, test_data, [], exir_op=[]) + pipeline.change_args("to_edge_transform_and_lower", additional_checks=[check]) + pipeline.change_args( + "check_count.exir", + {"executorch_exir_dialects_edge__ops_aten_sigmoid_default": 1}, ) + pipeline.run() assert check.has_rejected_node() diff --git a/backends/arm/test/misc/test_debug_feats.py b/backends/arm/test/misc/test_debug_feats.py index 60bf89b6e17..e9305b3dc21 100644 --- a/backends/arm/test/misc/test_debug_feats.py +++ b/backends/arm/test/misc/test_debug_feats.py @@ -1,5 +1,4 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. @@ -7,228 +6,218 @@ import os import shutil import tempfile -import unittest + +from typing import Tuple + +import pytest import torch -from executorch.backends.arm.test import common +from executorch.backends.arm.test import common, conftest +from executorch.backends.arm.test.tester.test_pipeline import ( + EthosU55PipelineBI, + TosaPipelineBI, + TosaPipelineMI, +) + -from executorch.backends.arm.test.tester.arm_tester import ArmTester +input_t1 = Tuple[torch.Tensor] # Input x class Linear(torch.nn.Module): + inputs = { + "randn": (torch.randn(5, 10, 25, 3),), + } + def __init__( self, - in_features: int, - out_features: int = 3, - bias: bool = True, ): super().__init__() - self.inputs = (torch.randn(5, 10, 25, in_features),) self.fc = torch.nn.Linear( - in_features=in_features, - out_features=out_features, - bias=bias, + in_features=3, + out_features=5, + bias=True, ) - def get_inputs(self): - return self.inputs - def forward(self, x): return self.fc(x) -class TestDumpPartitionedArtifact(unittest.TestCase): - """Tests dumping the partition artifact in ArmTester. Both to file and to stdout.""" - - def _tosa_MI_pipeline(self, module: torch.nn.Module, dump_file=None): - ( - ArmTester( - module, - example_inputs=module.get_inputs(), # type: ignore[operator] - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge_transform_and_lower() - .dump_artifact(dump_file) - .dump_artifact() - ) +"""Tests dumping the partition artifact in ArmTester. Both to file and to stdout.""" - def _tosa_BI_pipeline(self, module: torch.nn.Module, dump_file=None): - ( - ArmTester( - module, - example_inputs=module.get_inputs(), # type: ignore[operator] - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .dump_artifact(dump_file) - .dump_artifact() - ) - def _is_tosa_marker_in_file(self, tmp_file): - for line in open(tmp_file).readlines(): - if "'name': 'main'" in line: - return True - return False +def _tosa_MI_pipeline(module: torch.nn.Module, test_data: input_t1, dump_file=None): - def test_MI_artifact(self): - model = Linear(20, 30) - tmp_file = common.get_time_formatted_path( - tempfile.mkdtemp(), self._testMethodName - ) - self._tosa_MI_pipeline(model, dump_file=tmp_file) - assert os.path.exists(tmp_file), f"File {tmp_file} was not created" - if self._is_tosa_marker_in_file(tmp_file): - return # Implicit pass test - self.fail("File does not contain TOSA dump!") - - def test_BI_artifact(self): - model = Linear(20, 30) - tmp_file = common.get_time_formatted_path( - tempfile.mkdtemp(), self._testMethodName - ) - self._tosa_BI_pipeline(model, dump_file=tmp_file) - assert os.path.exists(tmp_file), f"File {tmp_file} was not created" - if self._is_tosa_marker_in_file(tmp_file): - return # Implicit pass test - self.fail("File does not contain TOSA dump!") - - -class TestNumericalDiffPrints(unittest.TestCase): - """Tests trigging the exception printout from the ArmTester's run and compare function.""" - - def test_numerical_diff_prints(self): - model = Linear(20, 30) - tester = ( - ArmTester( - model, - example_inputs=model.get_inputs(), - compile_spec=common.get_tosa_compile_spec( - "TOSA-0.80+MI", - custom_path=tempfile.mkdtemp("diff_print_test"), - ), - ) - .export() - .to_edge_transform_and_lower() - .to_executorch() - ) - # We expect an assertion error here. Any other issues will cause the - # test to fail. Likewise the test will fail if the assertion error is - # not present. - try: - # Tolerate 0 difference => we want to trigger a numerical diff - tester.run_method_and_compare_outputs(atol=0, rtol=0, qtol=0) - except AssertionError: - pass # Implicit pass test - else: - self.fail() - - -def test_dump_ops_and_dtypes(): - model = Linear(20, 30) - ( - ArmTester( - model, - example_inputs=model.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .dump_dtype_distribution() - .dump_operator_distribution() - .export() - .dump_dtype_distribution() - .dump_operator_distribution() - .to_edge_transform_and_lower() - .dump_dtype_distribution() - .dump_operator_distribution() + pipeline = TosaPipelineMI[input_t1](module, test_data, [], []) + pipeline.dump_artifact("to_edge_transform_and_lower") + pipeline.dump_artifact("to_edge_transform_and_lower", suffix=dump_file) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.run() + + +def _tosa_BI_pipeline(module: torch.nn.Module, test_data: input_t1, dump_file=None): + + pipeline = TosaPipelineBI[input_t1](module, test_data, [], []) + pipeline.dump_artifact("to_edge_transform_and_lower") + pipeline.dump_artifact("to_edge_transform_and_lower", suffix=dump_file) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.run() + + +def _is_tosa_marker_in_file(tmp_file): + for line in open(tmp_file).readlines(): + if "'name': 'main'" in line: + return True + return False + + +@common.parametrize("test_data", Linear.inputs) +def test_MI_artifact(test_data: input_t1): + model = Linear() + tmp_file = common.get_time_formatted_path( + tempfile.mkdtemp(), test_MI_artifact.__name__ ) - # Just test that there are no execptions. + _tosa_MI_pipeline(model, test_data, dump_file=tmp_file) + assert os.path.exists(tmp_file), f"File {tmp_file} was not created" + if _is_tosa_marker_in_file(tmp_file): + return # Implicit pass test + pytest.fail("File does not contain TOSA dump!") + + +@common.parametrize("test_data", Linear.inputs) +def test_BI_artifact(test_data: input_t1): + model = Linear() + tmp_file = common.get_time_formatted_path( + tempfile.mkdtemp(), test_BI_artifact.__name__ + ) + _tosa_BI_pipeline(model, test_data, dump_file=tmp_file) + assert os.path.exists(tmp_file), f"File {tmp_file} was not created" + if _is_tosa_marker_in_file(tmp_file): + return # Implicit pass test + pytest.fail("File does not contain TOSA dump!") -def test_dump_ops_and_dtypes_parseable(): - model = Linear(20, 30) - ( - ArmTester( - model, - example_inputs=model.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .dump_dtype_distribution(print_table=False) - .dump_operator_distribution(print_table=False) - .export() - .dump_dtype_distribution(print_table=False) - .dump_operator_distribution(print_table=False) - .to_edge_transform_and_lower() - .dump_dtype_distribution(print_table=False) - .dump_operator_distribution(print_table=False) +"""Tests trigging the exception printout from the ArmTester's run and compare function.""" + + +@common.parametrize("test_data", Linear.inputs) +def test_numerical_diff_print(test_data: input_t1): + pipeline = TosaPipelineMI[input_t1]( + Linear(), + test_data, + [], + [], + custom_path="diff_print_test", + ) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.run() + tester = pipeline.tester + # We expect an assertion error here. Any other issues will cause the + # test to fail. Likewise the test will fail if the assertion error is + # not present. + try: + # Tolerate 0 difference => we want to trigger a numerical diff + tester.run_method_and_compare_outputs(atol=0, rtol=0, qtol=0) + except AssertionError: + pass # Implicit pass test + else: + pytest.fail() + + +@common.parametrize("test_data", Linear.inputs) +def test_dump_ops_and_dtypes(test_data: input_t1): + pipeline = TosaPipelineBI[input_t1](Linear(), test_data, [], []) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.add_stage_after("quantize", pipeline.tester.dump_dtype_distribution) + pipeline.add_stage_after("quantize", pipeline.tester.dump_operator_distribution) + pipeline.add_stage_after("export", pipeline.tester.dump_dtype_distribution) + pipeline.add_stage_after("export", pipeline.tester.dump_operator_distribution) + pipeline.add_stage_after( + "to_edge_transform_and_lower", pipeline.tester.dump_dtype_distribution ) + pipeline.add_stage_after( + "to_edge_transform_and_lower", pipeline.tester.dump_operator_distribution + ) + pipeline.run() # Just test that there are no execptions. -class TestCollateTosaTests(unittest.TestCase): - """Tests the collation of TOSA tests through setting the environment variable TOSA_TESTCASE_BASE_PATH.""" - - def test_collate_tosa_BI_tests(self): - # Set the environment variable to trigger the collation of TOSA tests - os.environ["TOSA_TESTCASES_BASE_PATH"] = "test_collate_tosa_tests" - # Clear out the directory - - model = Linear(20, 30) - ( - ArmTester( - model, - example_inputs=model.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .to_executorch() - ) +@common.parametrize("test_data", Linear.inputs) +def test_dump_ops_and_dtypes_parseable(test_data: input_t1): + pipeline = TosaPipelineBI[input_t1](Linear(), test_data, [], []) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.add_stage_after("quantize", pipeline.tester.dump_dtype_distribution, False) + pipeline.add_stage_after( + "quantize", pipeline.tester.dump_operator_distribution, False + ) + pipeline.add_stage_after("export", pipeline.tester.dump_dtype_distribution, False) + pipeline.add_stage_after( + "export", pipeline.tester.dump_operator_distribution, False + ) + pipeline.add_stage_after( + "to_edge_transform_and_lower", pipeline.tester.dump_dtype_distribution, False + ) + pipeline.add_stage_after( + "to_edge_transform_and_lower", pipeline.tester.dump_operator_distribution, False + ) + pipeline.run() + # Just test that there are no execptions. - test_collate_dir = "test_collate_tosa_tests/tosa-bi/TestCollateTosaTests/test_collate_tosa_BI_tests" - # test that the output directory is created and contains the expected files - assert os.path.exists(test_collate_dir) - for file in os.listdir(test_collate_dir): - assert file.endswith(("TOSA-0.80+BI.json", "TOSA-0.80+BI.tosa")) +"""Tests the collation of TOSA tests through setting the environment variable TOSA_TESTCASE_BASE_PATH.""" - os.environ.pop("TOSA_TESTCASES_BASE_PATH") - shutil.rmtree("test_collate_tosa_tests", ignore_errors=True) +@common.parametrize("test_data", Linear.inputs) +def test_collate_tosa_BI_tests(test_data: input_t1): + # Set the environment variable to trigger the collation of TOSA tests + os.environ["TOSA_TESTCASES_BASE_PATH"] = "test_collate_tosa_tests" + # Clear out the directory + pipeline = TosaPipelineBI[input_t1](Linear(), test_data, [], []) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.run() -def test_dump_tosa_ops(caplog): - model = Linear(20, 30) - ( - ArmTester( - model, - example_inputs=model.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .dump_operator_distribution() + test_collate_dir = ( + "test_collate_tosa_tests/tosa-bi/test_collate_tosa_BI_tests[randn]" ) + # test that the output directory is created and contains the expected files + assert os.path.exists(test_collate_dir) + tosa_version = conftest.get_option("tosa_version") + for file in os.listdir(test_collate_dir): + file_name_prefix = f"TOSA-{tosa_version}+" + ( + "INT" if tosa_version == "1.0" else "BI" + ) + assert file.endswith((f"{file_name_prefix}.json", f"{file_name_prefix}.tosa")) + + os.environ.pop("TOSA_TESTCASES_BASE_PATH") + shutil.rmtree("test_collate_tosa_tests", ignore_errors=True) + + +@common.parametrize("test_data", Linear.inputs) +def test_dump_tosa_ops(caplog, test_data: input_t1): + pipeline = TosaPipelineBI[input_t1](Linear(), test_data, [], []) + pipeline.pop_stage("run_method_and_compare_outputs") + pipeline.dump_operator_distribution("to_edge_transform_and_lower") + pipeline.run() assert "TOSA operators:" in caplog.text -def test_fail_dump_tosa_ops(caplog): +class Add(torch.nn.Module): + inputs = { + "ones": (torch.ones(5),), + } + + def forward(self, x): + return x + x - class Add(torch.nn.Module): - def forward(self, x): - return x + x - model = Add() - compile_spec = common.get_u55_compile_spec() - ( - ArmTester(model, example_inputs=(torch.ones(5),), compile_spec=compile_spec) - .quantize() - .export() - .to_edge_transform_and_lower() - .dump_operator_distribution() +@common.parametrize("test_data", Add.inputs) +def test_fail_dump_tosa_ops(caplog, test_data: input_t1): + pipeline = EthosU55PipelineBI[input_t1]( + Add(), + test_data, + [], + [], + use_to_edge_transform_and_lower=True, ) + pipeline.dump_operator_distribution("to_edge_transform_and_lower") + pipeline.run() assert "Can not get operator distribution for Vela command stream." in caplog.text diff --git a/backends/arm/test/misc/test_dim_order_guards.py b/backends/arm/test/misc/test_dim_order_guards.py index 0698773e6f8..44c9e707324 100644 --- a/backends/arm/test/misc/test_dim_order_guards.py +++ b/backends/arm/test/misc/test_dim_order_guards.py @@ -1,19 +1,29 @@ -# Copyright 2024 Arm Limited and/or its affiliates. +# Copyright 2024-2025 Arm Limited and/or its affiliates. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -import unittest + +from typing import Tuple import pytest import torch from executorch.backends.arm.test import common -from executorch.backends.arm.test.tester.arm_tester import ArmTester +from executorch.backends.arm.test.tester.test_pipeline import ( + TosaPipelineBI, + TosaPipelineMI, +) + + +input_t1 = Tuple[torch.Tensor] # Input x class Conv2D(torch.nn.Module): + inputs: dict[str, input_t1] = { + "randn": (torch.randn(1, 2, 20, 20),), + } def __init__(self): super().__init__() @@ -22,37 +32,36 @@ def __init__(self): def forward(self, x): return self.conv2d(x.to(memory_format=torch.channels_last)) - def get_inputs(self): - return (torch.randn(1, 2, 20, 20),) - - -class TestDimOrderGuards(unittest.TestCase): - - def test_tosa_MI_pipeline(self): - module = Conv2D() - tester = ( - ArmTester( - module, - example_inputs=module.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge() - ) - with pytest.raises(RuntimeError): - tester.partition() - - def test_tosa_BI_pipeline(self): - module = Conv2D() - tester = ( - ArmTester( - module, - example_inputs=module.get_inputs(), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge() - ) - with pytest.raises(RuntimeError): - tester.partition() + +@common.parametrize("test_data", Conv2D.inputs) +def test_tosa_MI_pipeline(test_data: input_t1): + module = Conv2D() + pipeline = TosaPipelineMI[input_t1]( + module, + test_data, + [], + [], + use_to_edge_transform_and_lower=False, + ) + pos = pipeline.find_pos("partition") + pipeline._stages = pipeline._stages[:pos] + pipeline.run() + with pytest.raises(RuntimeError): + pipeline.tester.partition() + + +@common.parametrize("test_data", Conv2D.inputs) +def test_tosa_BI_pipeline(test_data: input_t1): + module = Conv2D() + pipeline = TosaPipelineBI[input_t1]( + module, + test_data, + [], + [], + use_to_edge_transform_and_lower=False, + ) + pos = pipeline.find_pos("partition") + pipeline._stages = pipeline._stages[:pos] + pipeline.run() + with pytest.raises(RuntimeError): + pipeline.tester.partition() diff --git a/backends/arm/test/misc/test_lifted_tensor.py b/backends/arm/test/misc/test_lifted_tensor.py index 092483fd632..7f1a9938037 100644 --- a/backends/arm/test/misc/test_lifted_tensor.py +++ b/backends/arm/test/misc/test_lifted_tensor.py @@ -4,24 +4,29 @@ # LICENSE file in the root directory of this source tree. import operator -import unittest -from typing import Union +from typing import Tuple, Union import torch from executorch.backends.arm.test import common -from executorch.backends.arm.test.tester.arm_tester import ArmTester -from parameterized import parameterized # type: ignore[import-untyped] +from executorch.backends.arm.test.tester.test_pipeline import ( + TosaPipelineBI, + TosaPipelineMI, +) +from executorch.backends.xnnpack.test.tester import ToEdge + + +input_t1 = Tuple[torch.Tensor] class LiftedTensor(torch.nn.Module): - test_data = [ - # (operator, test_data, length) - (operator.add, (torch.randn(2, 2), 2)), - (operator.truediv, (torch.ones(2, 2), 2)), - (operator.mul, (torch.randn(2, 2), 2)), - (operator.sub, (torch.rand(2, 2), 2)), - ] + test_data = { + # test_name: (operator, test_data, length) + "add": (operator.add, (torch.randn(2, 2), 2)), + "truediv": (operator.truediv, (torch.ones(2, 2), 2)), + "mul": (operator.mul, (torch.randn(2, 2), 2)), + "sub": (operator.sub, (torch.rand(2, 2), 2)), + } def __init__(self, op: callable): # type: ignore[valid-type] super().__init__() @@ -34,13 +39,13 @@ def forward(self, x: torch.Tensor, length) -> torch.Tensor: class LiftedScalarTensor(torch.nn.Module): - test_data = [ - # (operator, test_data) - (operator.add, (torch.randn(2, 2),), 1.0), - (operator.truediv, (torch.randn(4, 2),), 1.0), - (operator.mul, (torch.randn(1, 2),), 2.0), - (operator.sub, (torch.randn(3),), 1.0), - ] + test_data = { + # test_name: (operator, test_data) + "add": (operator.add, (torch.randn(2, 2),), 1.0), + "truediv": (operator.truediv, (torch.randn(4, 2),), 1.0), + "mul": (operator.mul, (torch.randn(1, 2),), 2.0), + "sub": (operator.sub, (torch.randn(3),), 1.0), + } def __init__(self, op: callable, arg1: Union[int, float, torch.tensor]): # type: ignore[valid-type] super().__init__() @@ -51,71 +56,78 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: return self.op(x, self.arg1) # type: ignore[misc] -class TestLiftedTensor(unittest.TestCase): - """Tests the ArmPartitioner with a placeholder of type lifted tensor.""" - - @parameterized.expand(LiftedTensor.test_data) - def test_partition_lifted_tensor_tosa_MI(self, op, data): - tester = ( - ArmTester( - LiftedTensor(op), - example_inputs=data, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge() - ) - signature = tester.get_artifact().exported_program().graph_signature - assert len(signature.lifted_tensor_constants) > 0 - tester.partition() - tester.to_executorch() - tester.run_method_and_compare_outputs(data) - - @parameterized.expand(LiftedTensor.test_data) - def test_partition_lifted_tensor_tosa_BI(self, op, data): - tester = ( - ArmTester( - LiftedTensor(op), - example_inputs=data, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge() - ) - signature = tester.get_artifact().exported_program().graph_signature - assert len(signature.lifted_tensor_constants) == 0 - tester.partition() - tester.to_executorch() - tester.run_method_and_compare_outputs(data) - - @parameterized.expand(LiftedScalarTensor.test_data) - def test_partition_lifted_scalar_tensor_tosa_MI(self, op, data, arg1): - ( - ArmTester( - LiftedScalarTensor(op, arg1), - example_inputs=(data), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge() - .partition() - .to_executorch() - .run_method_and_compare_outputs(data) - ) - - @parameterized.expand(LiftedScalarTensor.test_data) - def test_partition_lifted_scalar_tensor_tosa_BI(self, op, data, arg1): - ( - ArmTester( - LiftedScalarTensor(op, arg1), - example_inputs=(data), - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge() - .partition() - .to_executorch() - .run_method_and_compare_outputs(data) - ) +"""Tests the ArmPartitioner with a placeholder of type lifted tensor.""" + + +@common.parametrize("test_data", LiftedTensor.test_data) +def test_partition_lifted_tensor_tosa_MI(test_data: input_t1): + op = test_data[0] + data = test_data[1:] + module = LiftedTensor(op) + pipeline = TosaPipelineMI[input_t1]( + module, + *data, + [], + exir_op=[], + use_to_edge_transform_and_lower=False, + ) + pipeline.run() + to_edge_stage_name = pipeline.tester.stage_name(ToEdge) + signature = ( + pipeline.tester.stages[to_edge_stage_name] + .artifact.exported_program() + .graph_signature + ) + assert len(signature.lifted_tensor_constants) > 0 + + +@common.parametrize("test_data", LiftedTensor.test_data) +def test_partition_lifted_tensor_tosa_BI(test_data: input_t1): + op = test_data[0] + data = test_data[1:] + module = LiftedTensor(op) + pipeline = TosaPipelineBI[input_t1]( + module, + *data, + [], + exir_op=[], + use_to_edge_transform_and_lower=False, + ) + pipeline.run() + to_edge_stage_name = pipeline.tester.stage_name(ToEdge) + signature = ( + pipeline.tester.stages[to_edge_stage_name] + .artifact.exported_program() + .graph_signature + ) + assert len(signature.lifted_tensor_constants) == 0 + + +@common.parametrize("test_data", LiftedScalarTensor.test_data) +def test_partition_lifted_scalar_tensor_tosa_MI(test_data: input_t1): + op = test_data[0] + data = test_data[1:] + module = LiftedScalarTensor(op, data[-1]) + pipeline = TosaPipelineMI[input_t1]( + module, + data[0], + [], + exir_op=[], + use_to_edge_transform_and_lower=False, + ) + pipeline.run() + + +@common.parametrize("test_data", LiftedScalarTensor.test_data) +def test_partition_lifted_scalar_tensor_tosa_BI(test_data: input_t1): + op = test_data[0] + data = test_data[1:] + module = LiftedScalarTensor(op, data[-1]) + pipeline = TosaPipelineBI[input_t1]( + module, + data[0], + [], + exir_op=[], + use_to_edge_transform_and_lower=False, + ) + pipeline.run() diff --git a/backends/arm/test/misc/test_multiple_delegates.py b/backends/arm/test/misc/test_multiple_delegates.py index ab768d273c6..0b0122bf65e 100644 --- a/backends/arm/test/misc/test_multiple_delegates.py +++ b/backends/arm/test/misc/test_multiple_delegates.py @@ -1,57 +1,47 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -import unittest +from typing import Tuple import torch from executorch.backends.arm.test import common -from executorch.backends.arm.test.tester.arm_tester import ArmTester - - -class TestMultipleDelegates(unittest.TestCase): - class MultipleDelegatesModule(torch.nn.Module): - inputs = (torch.randn(10, 4, 5), torch.randn(10, 4, 5)) - - def get_inputs(self): - return self.inputs - - def forward(self, x: torch.Tensor, y: torch.Tensor): - z = x + y - s = torch.tan(z) - return s * z - - def test_tosa_MI(self): - module = self.MultipleDelegatesModule() - inputs = module.get_inputs() - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge_transform_and_lower() - .check_count({"torch.ops.higher_order.executorch_call_delegate": 2}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) - ) - - def test_tosa_BI(self): - module = self.MultipleDelegatesModule() - inputs = module.get_inputs() - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .check_count({"torch.ops.higher_order.executorch_call_delegate": 2}) - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs, qtol=1.0) - ) +from executorch.backends.arm.test.tester.test_pipeline import ( + TosaPipelineBI, + TosaPipelineMI, +) + + +input_t1 = Tuple[torch.Tensor, torch.Tensor] # Input x, y + + +class MultipleDelegatesModule(torch.nn.Module): + inputs = { + "randn": (torch.randn(10, 4, 5), torch.randn(10, 4, 5)), + } + + def forward(self, x: torch.Tensor, y: torch.Tensor): + z = x + y + s = torch.tan(z) + return s * z + + +@common.parametrize("test_data", MultipleDelegatesModule.inputs) +def test_tosa_MI_pipeline(test_data: input_t1): + pipeline = TosaPipelineMI[input_t1](MultipleDelegatesModule(), test_data, [], []) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} + ) + pipeline.run() + + +@common.parametrize("test_data", MultipleDelegatesModule.inputs) +def test_tosa_BI_pipeline(test_data: input_t1): + pipeline = TosaPipelineBI[input_t1]( + MultipleDelegatesModule(), test_data, [], [], qtol=1 + ) + pipeline.change_args( + "check_count.exir", {"torch.ops.higher_order.executorch_call_delegate": 2} + ) + pipeline.run() diff --git a/backends/arm/test/misc/test_multiple_outputs.py b/backends/arm/test/misc/test_multiple_outputs.py index d3bea9a4005..abb6bb1bf30 100644 --- a/backends/arm/test/misc/test_multiple_outputs.py +++ b/backends/arm/test/misc/test_multiple_outputs.py @@ -1,96 +1,60 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -import unittest -import pytest +from typing import Tuple + import torch -from executorch.backends.arm.test import common, conftest -from executorch.backends.arm.test.tester.arm_tester import ArmTester -from executorch.exir.backend.compile_spec_schema import CompileSpec - - -class TestMultipleOutputs(unittest.TestCase): - class MultipleOutputsModule(torch.nn.Module): - inputs = (torch.randn(10, 4, 5), torch.randn(10, 4, 5)) - - def get_inputs(self): - return self.inputs - - def forward(self, x: torch.Tensor, y: torch.Tensor): - return (x * y, x.sum(dim=-1, keepdim=True)) - - def test_tosa_MI_pipeline(self): - module = self.MultipleOutputsModule() - inputs = module.get_inputs() - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"), - ) - .export() - .to_edge_transform_and_lower() - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs) - ) - - def test_tosa_BI_pipeline(self): - module = self.MultipleOutputsModule() - inputs = module.get_inputs() - ( - ArmTester( - module, - example_inputs=inputs, - compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"), - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .to_executorch() - .run_method_and_compare_outputs(inputs=inputs, qtol=1.0) - ) - - def _test_ethosu_BI_pipeline( - self, - module: torch.nn.Module, - test_data: tuple[torch.Tensor], - compile_spec: CompileSpec, - ): - tester = ( - ArmTester( - module, - example_inputs=test_data, - compile_spec=compile_spec, - ) - .quantize() - .export() - .to_edge_transform_and_lower() - .to_executorch() - .serialize() - ) - if conftest.is_option_enabled("corstone_fvp"): - tester.run_method_and_compare_outputs(qtol=1, inputs=test_data) - - @pytest.mark.corstone_fvp - def test_u55_BI(self): - module = self.MultipleOutputsModule() - test_data = module.get_inputs() - self._test_ethosu_BI_pipeline( - module, - test_data, - common.get_u55_compile_spec(), - ) - - @pytest.mark.corstone_fvp - def test_u85_BI(self): - module = self.MultipleOutputsModule() - test_data = module.get_inputs() - self._test_ethosu_BI_pipeline( - module, - test_data, - common.get_u85_compile_spec(), - ) +from executorch.backends.arm.test import common +from executorch.backends.arm.test.tester.test_pipeline import ( + EthosU55PipelineBI, + EthosU85PipelineBI, + TosaPipelineBI, + TosaPipelineMI, +) + + +input_t1 = Tuple[torch.Tensor, torch.Tensor] # Input x, y + + +class MultipleOutputsModule(torch.nn.Module): + inputs: dict[str, input_t1] = { + "randn": (torch.randn(10, 4, 5), torch.randn(10, 4, 5)), + } + + def forward(self, x: torch.Tensor, y: torch.Tensor): + return (x * y, x.sum(dim=-1, keepdim=True)) + + +@common.parametrize("test_data", MultipleOutputsModule.inputs) +def test_tosa_MI_pipeline(test_data: input_t1): + pipeline = TosaPipelineMI[input_t1](MultipleOutputsModule(), test_data, [], []) + pipeline.run() + + +@common.parametrize("test_data", MultipleOutputsModule.inputs) +def test_tosa_BI_pipeline(test_data: input_t1): + pipeline = TosaPipelineBI[input_t1]( + MultipleOutputsModule(), test_data, [], [], qtol=1 + ) + pipeline.run() + + +@common.parametrize("test_data", MultipleOutputsModule.inputs) +@common.XfailIfNoCorstone300 +def test_U55_pipeline(test_data: input_t1): + pipeline = EthosU55PipelineBI[input_t1]( + MultipleOutputsModule(), test_data, [], [], qtol=1 + ) + pipeline.run() + + +@common.parametrize("test_data", MultipleOutputsModule.inputs) +@common.XfailIfNoCorstone320 +def test_U85_pipeline(test_data: input_t1): + pipeline = EthosU85PipelineBI[input_t1]( + MultipleOutputsModule(), test_data, [], [], qtol=1 + ) + pipeline.run() diff --git a/backends/arm/test/misc/test_tosa_spec.py b/backends/arm/test/misc/test_tosa_spec.py index 44eee2236e2..19136c514fb 100644 --- a/backends/arm/test/misc/test_tosa_spec.py +++ b/backends/arm/test/misc/test_tosa_spec.py @@ -1,5 +1,4 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. diff --git a/backends/arm/test/tester/arm_tester.py b/backends/arm/test/tester/arm_tester.py index 9e7e7450b7d..46d736a6688 100644 --- a/backends/arm/test/tester/arm_tester.py +++ b/backends/arm/test/tester/arm_tester.py @@ -62,6 +62,11 @@ ) from executorch.exir.backend.backend_api import validation_disabled from executorch.exir.backend.compile_spec_schema import CompileSpec +from executorch.exir.backend.operator_support import ( + DontPartition, + DontPartitionModule, + DontPartitionName, +) from executorch.exir.backend.partitioner import Partitioner from executorch.exir.lowered_backend_module import LoweredBackendModule from executorch.exir.pass_base import ExportPass @@ -331,14 +336,23 @@ def to_edge_transform_and_lower( to_edge_and_lower_stage: Optional[ToEdgeTransformAndLower] = None, partitioners: Optional[List[Partitioner]] = None, edge_compile_config: Optional[EdgeCompileConfig] = None, + additional_checks: Optional[ + List[Union[DontPartition | DontPartitionModule | DontPartitionName]] + ] = None, ): if to_edge_and_lower_stage is None: if partitioners is None: arm_partitioner = None if is_tosa(self.compile_spec): - arm_partitioner = TOSAPartitioner(compile_spec=self.compile_spec) + arm_partitioner = TOSAPartitioner( + compile_spec=self.compile_spec, + additional_checks=additional_checks, + ) elif is_ethosu(self.compile_spec): - arm_partitioner = EthosUPartitioner(compile_spec=self.compile_spec) + arm_partitioner = EthosUPartitioner( + compile_spec=self.compile_spec, + additional_checks=additional_checks, + ) else: raise ValueError("compile spec doesn't target any Arm Partitioner") partitioners = [arm_partitioner] diff --git a/backends/arm/test/tester/test_pipeline.py b/backends/arm/test/tester/test_pipeline.py index 13e2f80b5c5..58c7c657250 100644 --- a/backends/arm/test/tester/test_pipeline.py +++ b/backends/arm/test/tester/test_pipeline.py @@ -143,11 +143,13 @@ def add_stage(self, func: Callable, *args, **kwargs): f"Pos must be between [-{pipeline_length}, {pipeline_length}]" ) + stage_id = func.__name__ suffix = None if "suffix" in kwargs: suffix = kwargs.pop("suffix") + if stage_id == "dump_artifact": + args = (*args, suffix) - stage_id = func.__name__ unique_stages = [ "quantize", "export",