diff --git a/pyproject.toml b/pyproject.toml index 10fbecba..f74b78d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ Downloads = "https://github.com/rocky/python-xdis/releases" dev = [ "pre-commit", "pytest", + "tox", ] [project.scripts] diff --git a/test_crossversion/.gitignore b/test_crossversion/.gitignore new file mode 100644 index 00000000..ca5e974f --- /dev/null +++ b/test_crossversion/.gitignore @@ -0,0 +1,4 @@ +templates/ +**/__pycache__/ +*.pyc +.python-version diff --git a/test_crossversion/Makefile b/test_crossversion/Makefile new file mode 100644 index 00000000..3b8cda79 --- /dev/null +++ b/test_crossversion/Makefile @@ -0,0 +1,49 @@ +.PHONY: help clean get_sources setup_pyenv compile prepare test + +SOURCE=./templates/source/ +COMPILED=./templates/compiled/ +SERIALIZED=./templates/serialized/ + +# usage +define helptext +Crossversion xdis test usage: + help : show this menu + clean : remove compiled and serialized files + get_sources : symlink all .py files in ./ -> $(SOURCE) + setup_pyenv : setup local pyenv versions to be used by tox + compile : with each tox env, compile all sources in $(SOURCE) to $(COMPILED), then serialize with dis to $(SERIALIZED) + prepare : fully prepare test environment and compile test files + test : prepare and run tests. with each tox env, serialize pyc's in $(COMPILED) with xdis, then check against corresponding serialized pyc in $(SERIALIZED) +endef +export helptext + +#: show help menu +help: + @echo "$$helptext" + +#: remove compiled and serialized files +clean: + find . -name "*.pyc" -delete + find . -name "__pycache__" -type d -delete + rm -rf $(COMPILED)/* + rm -rf $(SERIALIZED)/* + +#: copy all .py files in ./ -> ./templates/source/ +get_sources: + cp -f *.py $(SOURCE) + +.python-version: + tox --listenvs | xargs pyenv local +#: setup local pyenv versions to be used by tox +setup_pyenv: .python-version + +#: with each tox env, compile all sources in ./templates/source/ to ./templates/compiled/, then serialize with dis to ./templates/serialized/ +compile: + tox p -c ./tox_prepare.ini + +#: fully prepare tests +prepare: clean get_sources setup_pyenv compile + +#: prepare and run tests. with each tox env, serialize pyc's in ./templates/compiled/ with xdis, then check against corresponding dis serialized pyc in ./templates/serialized/ +test: prepare + tox r -c ./tox.ini diff --git a/test_crossversion/USAGE.md b/test_crossversion/USAGE.md new file mode 100644 index 00000000..006808a5 --- /dev/null +++ b/test_crossversion/USAGE.md @@ -0,0 +1,16 @@ +# Automated crossversion testing +This testing suite is used for automatic testing of differences found between xdis and dis. +This is done by having a way to identically "serialize" important attributes in xdis and dis bytecodes. +We then can check a diff between a serialized xdis and dis bytecode to find if xdis is parsing something incorrectly. +Most tests should be ran using the makefile. + +# System Requirements +- `pyenv` and `pyenv-virtualenv` + - Each version needing to be tested should be installed with pyenv. +- `tox` + +# Usage +## Makefile +Run `make` or `make help` to show the help menu for running and preparing tests, or with `remake`, `remake --tasks`. + +To simply run tests, `make test` will copy some sources, prepare template files, and run tests. diff --git a/test_crossversion/config/__init__.py b/test_crossversion/config/__init__.py new file mode 100644 index 00000000..37d9a2ba --- /dev/null +++ b/test_crossversion/config/__init__.py @@ -0,0 +1,23 @@ +from configparser import ConfigParser +from pathlib import Path +from sys import version_info + +# main test root dir +_test_path = Path(__file__).parent.parent + +# system version of python +SYS_VERSION = f"{version_info.major}.{version_info.minor}" +SYS_VERSION_TUPLE = (version_info.major, version_info.minor, version_info.micro) + +# template dirs +TEMPLATE_DIR = _test_path / "templates" +TEMPLATE_SOURCE_DIR = TEMPLATE_DIR / "source" +TEMPLATE_COMPILED_DIR = TEMPLATE_DIR / "compiled" +TEMPLATE_SERIALIZED_DIR = TEMPLATE_DIR / "serialized" + +# check dirs and make them if needed +_check_dir = lambda dir: dir.mkdir() if not dir.exists() else True +_check_dir(TEMPLATE_DIR) +_check_dir(TEMPLATE_SOURCE_DIR) +_check_dir(TEMPLATE_COMPILED_DIR) +_check_dir(TEMPLATE_SERIALIZED_DIR) diff --git a/test_crossversion/prepare_templates.py b/test_crossversion/prepare_templates.py new file mode 100644 index 00000000..9d4ee36f --- /dev/null +++ b/test_crossversion/prepare_templates.py @@ -0,0 +1,51 @@ +import argparse +import logging +from py_compile import compile + +from serialize_bytecode import serialize_pyc +from config import SYS_VERSION, TEMPLATE_COMPILED_DIR, TEMPLATE_SERIALIZED_DIR, TEMPLATE_SOURCE_DIR + + +def prepare_templates(): + """ + Compile files in template source dir, then serialize with dis + Intermediary steps are saved in respective folders in templates / / + """ + # create folders to save pyc's + compiled_dir = TEMPLATE_COMPILED_DIR / SYS_VERSION + serialized_dir = TEMPLATE_SERIALIZED_DIR / SYS_VERSION + if not compiled_dir.exists(): + compiled_dir.mkdir() + if not serialized_dir.exists(): + serialized_dir.mkdir() + + # compile and serialize template files + num_source = 0 + for source in TEMPLATE_SOURCE_DIR.glob("*.py"): + # create paths + pyc_file = compiled_dir / f"{source.stem}_{SYS_VERSION}.pyc" + serialized_file = serialized_dir / f"{source.stem}_{SYS_VERSION}.txt" + + # compile pyc + compile(str(source), str(pyc_file)) + logging.info(f"Compiled {str(source)} -> {str(pyc_file)}") + + # serialize pyc + with serialized_file.open("w") as f: + serialize_pyc(pyc_file, False, f) + logging.info(f"Serialized {str(pyc_file)} -> {str(serialized_file)}") + num_source += 1 + + print(f"{num_source} files compiled and serialized") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="prepare_templates") + parser.add_argument("-V", "--verbose", action="store_true", help="Use verbose output") + args = parser.parse_args() + + # setup logger + logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG if args.verbose else None) + + # compile and serialize templates + prepare_templates() diff --git a/test_crossversion/serialize_bytecode.py b/test_crossversion/serialize_bytecode.py new file mode 100644 index 00000000..6f10f85f --- /dev/null +++ b/test_crossversion/serialize_bytecode.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path +from typing import Callable, TextIO + +from config import SYS_VERSION_TUPLE + +import xdis +from xdis import disassemble_file, iscode + + +# Util to format shorthand code obj name +# Used so we do not compare memory addrs +def _fmt_codeobj(co): + return f"" + + +def _iter_nested_bytecodes(bytecode, bytecode_constructor: Callable): + """ + iterate over a bytecode and its child bytecodes + + :param bytecode: bytecode object to iterate, will be yielded on first call + :param bytecode_constructor: constructor to create child bytecodes with + """ + bc_stack = [bytecode] + while bc_stack: + bc = bc_stack.pop() + bc_stack.extend(bytecode_constructor(obj) for obj in bc.codeobj.co_consts if iscode(obj)) + yield bc + + +def _get_headers_to_serialize(bytecode_version: tuple): + headers_to_serialize = [ + "co_argcount", + "co_cellvars", + "co_code", + "co_consts", + "co_firstlineno", + "co_flags", + "co_freevars", + "co_kwonlyargcount", + "co_name", + "co_names", + "co_nlocals", + "co_posonlyargcount", + "co_stacksize", + "co_varnames", + ] + + if bytecode_version >= (3, 10): + headers_to_serialize.append("co_lines") + if bytecode_version >= (3, 11): + headers_to_serialize.append("co_qualname") + # headers_to_serialize.append("co_positions"), not fully supported in xdis + return headers_to_serialize + + +def _format_headers(bytecode, bytecode_version: tuple, headers_to_serialize: list[str] | None) -> str: + """ + Format important headers (attrs) of bytecode. + + :param bytecode: bytecode object + :param bytecode_version: bytecode version tuple to track version specific headers + :param headers: list bytecode headers that we want to specifically format, excluding the other headers. By default, tests all params. + """ + + # default format for each attr + header_fmt = "{name} : {val}" + + # format headers + formatted_headers = [] + headers = headers_to_serialize if headers_to_serialize is not None else _get_headers_to_serialize(bytecode_version) + for attr_name in headers: + # check for missing attrs + if not hasattr(bytecode.codeobj, attr_name): + logging.warning(f"Codeobj missing test_attr {attr_name}") + continue + + attr_val = getattr(bytecode.codeobj, attr_name) + + # handle const attrs and some callables + if attr_name == "co_consts": + # filter code objects in co_consts + val = [f" str: + """Format all instructions in given bytecode.""" + # TODO revisit ignoring argrepr and argvals in tests + # we are ignoring argrepr and val for now, as xdis will sometimes include additional info there + + # default format for each instruction + inst_fmt = "{inst.opcode} {inst.opname} : {inst.arg} {argval}" + insts = [] + for inst in bytecode: + # skip cache + if inst.opname == "CACHE": + continue + + # filter and format argvals + if iscode(inst.argval): + argval = _fmt_codeobj(inst.argval) + else: + argval = inst.argval + + insts.append(inst_fmt.format(inst=inst, argval=argval)) + + return "\n".join(insts) + + +def format_bytecode(bytecode, bytecode_version: tuple, headers_to_serialize: list[str] | None = None, serialize_insts: bool = True) -> str: + """ + Create complete formatted string of bytecode. + + :param bytecode: bytecode object + :param bytecode_version: tuple of bytecode version to track version specific formatting + :param headers: list of bytecode headers we want to format in output. If None or not defined, we format all params by default. + :param serialize_insts: bool to determine if we serialize instructions or ignore them and dont output. + """ + + outstr = f"BYTECODE {bytecode.codeobj.co_name}\n" + outstr += "ATTRS:\n" + outstr += _format_headers(bytecode, bytecode_version, headers_to_serialize) + "\n" + if serialize_insts: + outstr += "INSTS:\n" + outstr += _format_insts(bytecode, bytecode_version) + "\n" + return outstr + + +def serialize_pyc(pyc: Path, use_xdis: bool = False, output_file: TextIO | None = sys.stdout, headers: list[str] | None = None, serialize_insts: bool = True) -> str: + """ + Serialize a pyc to text for testing, using dis or xdis. + + :param pyc: path of pyc file + :param use_xdis: boolean if we serialize with xdis, default use dis (meaning pyc must be same version as running python) + :param output_file: file to write output to + :param headers: list of bytecode headers we want to format in output. Default is None, where we format all params. + :param serialize_insts: bool to determine if we format instructions or ignore them and dont output save. + """ + + # create a code object in xdis or dis, and a constructor to make bytecodes with + if use_xdis: + # using xdis + from os import devnull + + # write to null so no disassembly output + with open(devnull, "w") as fnull: + # create xdis code obj + (_, code_object, version_tuple, _, _, is_pypy, _, _) = disassemble_file(str(pyc), fnull, asm_format="classic") + # get corresponding opcode class + opc = xdis.get_opcode(version_tuple, is_pypy, None) + # create xdis bytecode constructor + bytecode_constructor = lambda codeobj: xdis.Bytecode(codeobj, opc) + bytecode_version = version_tuple + else: + # using dis + import dis + import marshal + + # load code obj + code_object = marshal.loads(pyc.read_bytes()[16:]) + # create dis bytecode constructor + bytecode_constructor = lambda codeobj: dis.Bytecode(codeobj) + bytecode_version = SYS_VERSION_TUPLE + + # iter bytecodes and create list of formatted bytecodes strings + formatted_bytecodes = [] + init_bytecode = bytecode_constructor(code_object) + for bc in _iter_nested_bytecodes(init_bytecode, bytecode_constructor): + formatted_bytecodes.append(format_bytecode(bc, bytecode_version, headers, serialize_insts)) + + # write formatted bytecodes + full_formatted_bytecode = "\n".join(formatted_bytecodes) + if output_file: + output_file.write(full_formatted_bytecode) + + return full_formatted_bytecode + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="serialize_bytecode") + parser.add_argument( + "-x", + "--use_xdis", + help="Use xdis to serialize bytecode", + action="store_true", + ) + parser.add_argument( + "--headers", + help="List of specific code object params to test, defaults to all parameters. Should be 'co_*', for example, 'co_lines'", + nargs="*", + ) + parser.add_argument( + "--skip_insts", + help="Do not test accuracy of instructions", + action="store_false", + ) + parser.add_argument("pyc", help="PYC file to serialize.") + args = parser.parse_args() + + # verify pyc path + pyc_path = Path(args.pyc) + assert pyc_path.exists(), "PYC does not exist" + + # setup logger + logging.basicConfig(format="%(levelname)s: %(message)s") + + serialize_pyc(pyc_path, args.use_xdis, headers=args.headers if args.headers else None, serialize_insts=args.skip_insts) diff --git a/test_crossversion/test_xdis.py b/test_crossversion/test_xdis.py new file mode 100644 index 00000000..9e7d6cc3 --- /dev/null +++ b/test_crossversion/test_xdis.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from itertools import chain +from pathlib import Path +from typing import Iterable + +from config import SYS_VERSION, TEMPLATE_COMPILED_DIR, TEMPLATE_SERIALIZED_DIR +from serialize_bytecode import serialize_pyc + +import pytest + + +class SerializedTestCase: + """Test case for comparing a disassembled xdis and dis pyc, Needs a pyc to + disassemble with xdis then serialize, and a dis serialized pyc txt file.""" + + pyc_path: Path + serialized_txt_path: Path + serialized_dis: str + serialized_xdis: str + message: str + + def __init__(self, pyc: Path, serialized_txt: Path): + # check test case pair exist + assert pyc.exists() and serialized_txt.exists() + self.pyc_path = pyc + self.serialized_txt_path = serialized_txt + # read serialized bytecode + self.serialized_dis = serialized_txt.read_text() + self.serialized_xdis = serialize_pyc(pyc, use_xdis=True, output_file=None) + # debug message + self.message = f"{SYS_VERSION}: Checking equivalence: {self.pyc_path} <---> {self.serialized_txt_path}" + + def __str__(self) -> str: + return self.message + + def __repr__(self) -> str: + return self.__str__() + + +def get_versions() -> Iterable[str]: + """Get test versions by iterating through dirs in template compiled dir.""" + for dir in TEMPLATE_COMPILED_DIR.glob("*"): + if dir.is_dir(): + yield dir.name + + +def get_tests_by_version(v: str) -> Iterable[SerializedTestCase]: + """Iterate test cases from Template folder with given version v.""" + compiled_tests_dir = Path(TEMPLATE_COMPILED_DIR / v) + serialized_tests_dir = Path(TEMPLATE_SERIALIZED_DIR / v) + assert compiled_tests_dir.exists() + assert serialized_tests_dir.exists() + + for compiled_test in compiled_tests_dir.glob("*"): + test_stem = compiled_test.stem + serialized_test = Path(serialized_tests_dir / (test_stem + ".txt")) + + yield SerializedTestCase(compiled_test, serialized_test) + + +# @pytest.mark.parametrize("version", get_versions()) +# def test_version(version): +# """Test each version in compiled template folder.""" +# for case in get_tests_by_version(version): +# assert case.serialized_dis.splitlines() == case.serialized_xdis.splitlines() + + +@pytest.mark.parametrize( + "case", chain.from_iterable(get_tests_by_version(v) for v in get_versions()) +) +def test_case(case: SerializedTestCase): + assert case.serialized_dis.splitlines() == case.serialized_xdis.splitlines() diff --git a/test_crossversion/tox.ini b/test_crossversion/tox.ini new file mode 100644 index 00000000..8f34b3d1 --- /dev/null +++ b/test_crossversion/tox.ini @@ -0,0 +1,12 @@ +[tox] +min_version = 4.0 +# ENV LIST MUST BE COMMA SEPARATED LIST OF PYTHON VERSIONS +env_list = 3.9, 3.10, 3.11, 3.12, 3.13 + +[testenv] +description = Check all permutations of python dis code objects with xdis code objects. +deps = + -e=file:///{toxinidir}/../. + pytest +commands = + pytest {tty:--color=yes} {posargs} ./test_xdis.py diff --git a/test_crossversion/tox_prepare.ini b/test_crossversion/tox_prepare.ini new file mode 100644 index 00000000..3e4efb8a --- /dev/null +++ b/test_crossversion/tox_prepare.ini @@ -0,0 +1,12 @@ +[tox] +min_version = 4.0 +# ENV LIST MUST BE COMMA SEPARATED LIST OF PYTHON VERSIONS +env_list = 3.9, 3.10, 3.11, 3.12, 3.13 + +[testenv] +description = Compile and serialize source templates with dis +deps = + -e=file:///{toxinidir}/../. + pytest # not needed but speeds up env creation +commands = + python ./prepare_templates.py {posargs} diff --git a/xdis/codetype/code310.py b/xdis/codetype/code310.py index d79c4fd8..70ddf052 100644 --- a/xdis/codetype/code310.py +++ b/xdis/codetype/code310.py @@ -14,11 +14,10 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +import struct import types from copy import deepcopy -import struct - from xdis.codetype.code38 import Code38 from xdis.cross_types import UnicodeForPython3 from xdis.version_info import PYTHON_VERSION_TRIPLE, version_tuple_to_str @@ -124,17 +123,18 @@ def check(self): for field, fieldtype in self.fieldtypes.items(): val = getattr(self, field) if isinstance(fieldtype, tuple): - assert ( - type(val) in fieldtype - ), "%s should be one of the types %s; is type %s" % ( - field, - fieldtype, - type(val), + assert type(val) in fieldtype, ( + "%s should be one of the types %s; is type %s" + % ( + field, + fieldtype, + type(val), + ) ) else: - assert isinstance( - val, fieldtype - ), "%s should have type %s; is type %s" % (field, fieldtype, type(val)) + assert isinstance(val, fieldtype), ( + "%s should have type %s; is type %s" % (field, fieldtype, type(val)) + ) pass pass @@ -163,24 +163,34 @@ def co_lines(self): either be a positive integer, or None Parsing implementation adapted from: https://github.com/python/cpython/blob/3.10/Objects/lnotab_notes.txt + The algorithm presented in the lnotab_notes.txt file is slightly inaccurate. The first linetable entry will have a line delta of 0, and should be yielded instead of skipped. + This implementation follows the `lineiter_next` definition in https://github.com/python/cpython/blob/10a2a9b3bcf237fd6183f84941632cda59395319/Objects/codeobject.c#L1029C1-L1062C2, + and the `advance` function in https://github.com/python/cpython/blob/10a2a9b3bcf237fd6183f84941632cda59395319/Objects/codeobject.c#L1140-L1155. """ - line = self.co_firstlineno + end_offset = 0 + line = self.co_firstlineno + # co_linetable is pairs of (offset_delta: unsigned byte, line_delta: signed byte) - for offset_delta, line_delta in struct.iter_unpack('=Bb', self.co_linetable): + for offset_delta, line_delta in struct.iter_unpack("=Bb", self.co_linetable): assert isinstance(line_delta, int) assert isinstance(offset_delta, int) - if line_delta == 0: # No change to line number, just accumulate changes to end - end_offset += offset_delta - continue + start_offset = end_offset - end_offset = start_offset + offset_delta - if line_delta == -128: # No valid line number -- skip entry - continue - line += line_delta - if end_offset == start_offset: # Empty range, omit. + end_offset += offset_delta + + # line_delta of -128 signifies an instruction range that is not associated with any line + if line_delta != -128: + line += line_delta + display_line = line + else: + display_line = None + + # omit empty ranges + if start_offset == end_offset: continue - yield start_offset, end_offset, line + + yield start_offset, end_offset, display_line def encode_lineno_tab(self): """