Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions backends/arm/test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime

from pathlib import Path
from typing import Any, Optional
from typing import Any, Callable, Optional, ParamSpec, TypeVar

import pytest
from executorch.backends.arm.ethosu import EthosUCompileSpec
Expand Down Expand Up @@ -205,7 +205,7 @@ def get_vgf_compile_spec(
)
"""Xfails a test if Corsone320 FVP is not installed, or if the executor runner is not built"""

SkipIfNoModelConverter = pytest.mark.skipif(
SkipIfNoModelConverter = pytest.mark.skipif( # type: ignore[call-arg]
condition=not (model_converter_installed()),
raises=FileNotFoundError,
reason="Did not find model-converter on path",
Expand All @@ -221,14 +221,18 @@ def get_vgf_compile_spec(

xfail_type = str | tuple[str, type[Exception]]

_P = ParamSpec("_P")
_R = TypeVar("_R")
Decorator = Callable[[Callable[_P, _R]], Callable[_P, _R]]


def parametrize(
arg_name: str,
test_data: dict[str, Any],
xfails: dict[str, xfail_type] | None = None,
strict: bool = True,
flakies: dict[str, int] | None = None,
):
) -> Decorator:
"""
Custom version of pytest.mark.parametrize with some syntatic sugar and added xfail functionality
- test_data is expected as a dict of (id, test_data) pairs
Expand All @@ -241,7 +245,7 @@ def parametrize(
if flakies is None:
flakies = {}

def decorator_func(func):
def decorator_func(func: Callable[_P, _R]) -> Callable[_P, _R]:
"""Test data is transformed from a dict of (id, data) pairs to a list of pytest params to work with the native pytests parametrize function"""
pytest_testsuite = []
for id, test_parameters in test_data.items():
Expand All @@ -261,14 +265,16 @@ def decorator_func(func):
"xfail info needs to be str, or tuple[str, type[Exception]]"
)
# Set up our fail marker
marker: tuple[pytest.MarkDecorator, ...] # type: ignore[no-redef]
marker = (
pytest.mark.xfail(reason=reason, raises=raises, strict=strict),
)
else:
marker = ()
marker = () # type: ignore[assignment]

pytest_param = pytest.param(test_parameters, id=id, marks=marker)
pytest_testsuite.append(pytest_param)
return pytest.mark.parametrize(arg_name, pytest_testsuite)(func)
decorator = pytest.mark.parametrize(arg_name, pytest_testsuite)
return decorator(func)

return decorator_func
66 changes: 34 additions & 32 deletions backends/arm/test/runner_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pathlib import Path

from types import NoneType
from typing import Any, cast, Dict, List, Literal, Optional, Tuple
from typing import Any, cast, Dict, List, Optional, Tuple

import numpy as np
import torch
Expand All @@ -37,7 +37,7 @@
from torch.fx.node import Node

from torch.overrides import TorchFunctionMode
from tosa.TosaGraph import TosaGraph
from tosa.TosaGraph import TosaGraph # type: ignore[import-untyped]

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -149,25 +149,28 @@ def get_output_quantization_params(
Raises:
RuntimeError if no output quantization parameters are found.
"""
quant_params = {}
for node in output_node.args[0]:
if node.target == torch.ops.quantized_decomposed.dequantize_per_tensor.default:
quant_params[node] = QuantizationParams(
node_name=node.args[0].name,
scale=node.args[1],
zp=node.args[2],
qmin=node.args[3],
qmax=node.args[4],
dtype=node.args[5],
quant_params: dict[Node, QuantizationParams | None] = {}
for node in output_node.args[0]: # type: ignore[union-attr]
if (
node.target # type: ignore[union-attr]
== torch.ops.quantized_decomposed.dequantize_per_tensor.default
):
quant_params[node] = QuantizationParams( # type: ignore[index]
node_name=node.args[0].name, # type: ignore[arg-type, union-attr]
scale=node.args[1], # type: ignore[arg-type, union-attr]
zp=node.args[2], # type: ignore[arg-type, union-attr]
qmin=node.args[3], # type: ignore[arg-type, union-attr]
qmax=node.args[4], # type: ignore[arg-type, union-attr]
dtype=node.args[5], # type: ignore[arg-type, union-attr]
)
else:
quant_params[node] = None
quant_params[node] = None # type: ignore[index]
return quant_params


def torch_tensor_to_numpy(tensor: torch.Tensor) -> np.ndarray:
dtype = _torch_to_numpy_dtype_dict[tensor.dtype]
array = tensor.detach().numpy().astype(dtype)
array = tensor.detach().numpy().astype(dtype) # type: ignore[var-annotated]
dim_order = tensor.dim_order()
if dim_order == NHWC_ORDER:
a = array.transpose(NHWC_ORDER)
Expand Down Expand Up @@ -252,40 +255,39 @@ def run_target(
executorch_program_manager: ExecutorchProgramManager,
inputs: Tuple[torch.Tensor],
intermediate_path: str | Path,
target_board: Literal["corestone-300", "corestone-320", "vkml_emulation_layer"],
target_board: str,
elf_path: str | Path,
timeout: int = 120, # s
):
if target_board not in VALID_TARGET:
raise ValueError(f"Unsupported target: {target_board}")

if target_board in ("corstone-300", "corstone-320"):
return run_corstone(
executorch_program_manager,
inputs,
intermediate_path,
target_board,
elf_path,
timeout,
)
elif target_board == "vkml_emulation_layer":
if target_board == "vkml_emulation_layer":
return run_vkml_emulation_layer(
executorch_program_manager,
inputs,
intermediate_path,
elf_path,
)
return run_corstone(
executorch_program_manager,
inputs,
intermediate_path,
target_board,
elf_path,
timeout,
)


def save_inputs_to_file(
exported_program: ExportedProgram,
inputs: Tuple[torch.Tensor],
intermediate_path: str | Path,
):
input_file_paths = []
input_file_paths: list[str] = []
input_names = get_input_names(exported_program)
for input_name, input_ in zip(input_names, inputs):
input_path = save_bytes(intermediate_path, input_, input_name)
input_path = save_bytes(intermediate_path, input_, input_name) # type: ignore[arg-type]
input_file_paths.append(input_path)

return input_file_paths
Expand All @@ -298,9 +300,9 @@ def get_output_from_file(
):
output_np = []
output_node = exported_program.graph_module.graph.output_node()
for i, node in enumerate(output_node.args[0]):
for i, node in enumerate(output_node.args[0]): # type: ignore[union-attr]
output_dtype = node.meta["val"].dtype
tosa_ref_output = np.fromfile(
tosa_ref_output = np.fromfile( # type: ignore[var-annotated]
os.path.join(intermediate_path, f"{output_base_name}-{i}.bin"),
_torch_to_numpy_dtype_dict[output_dtype],
)
Expand Down Expand Up @@ -362,7 +364,7 @@ def run_corstone(
executorch_program_manager: ExecutorchProgramManager,
inputs: Tuple[torch.Tensor],
intermediate_path: str | Path,
target_board: Literal["corestone-300", "corestone-320"],
target_board: str,
elf_path: str | Path,
timeout: int = 120, # s
) -> list[torch.Tensor]:
Expand Down Expand Up @@ -749,7 +751,7 @@ def run_tosa_graph(
inputs_np = [torch_tensor_to_numpy(input_tensor) for input_tensor in inputs]

if isinstance(tosa_version, Tosa_1_00):
import tosa_reference_model as reference_model
import tosa_reference_model as reference_model # type: ignore[import-untyped]

debug_mode = "ALL" if logger.level <= logging.DEBUG else None
outputs_np, status = reference_model.run(
Expand All @@ -771,7 +773,7 @@ def run_tosa_graph(
# Convert output numpy arrays to tensors with same dim_order as the output nodes
result = [
numpy_to_torch_tensor(output_array, node)
for output_array, node in zip(outputs_np, output_node.args[0])
for output_array, node in zip(outputs_np, output_node.args[0]) # type: ignore[arg-type]
]

return result
Expand Down
3 changes: 2 additions & 1 deletion backends/arm/test/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import subprocess
import sys
import time
from typing import Sequence


def get_args():
Expand Down Expand Up @@ -96,7 +97,7 @@ def get_args():
return args


def run_external_cmd(cmd: []):
def run_external_cmd(cmd: Sequence[str]) -> None:
print("CALL:", *cmd, sep=" ")
try:
subprocess.check_call(cmd)
Expand Down
Loading