|
| 1 | +from io import DEFAULT_BUFFER_SIZE |
| 2 | +from lzma import compress |
| 3 | +from pathlib import Path |
| 4 | +from random import seed |
| 5 | +import sys |
| 6 | +from typing import BinaryIO, Callable, Iterator, Optional, cast |
| 7 | + |
| 8 | +import pytest |
| 9 | + |
| 10 | +from xz import XZFile |
| 11 | +from xz.common import create_xz_index_footer, parse_xz_footer, parse_xz_index |
| 12 | +from xz.io import IOCombiner, IOStatic |
| 13 | + |
| 14 | +if sys.version_info >= (3, 9): |
| 15 | + from random import randbytes |
| 16 | +else: |
| 17 | + from random import getrandbits |
| 18 | + |
| 19 | + def randbytes(length: int) -> bytes: |
| 20 | + return getrandbits(length * 8).to_bytes(length, "little") |
| 21 | + |
| 22 | + |
| 23 | +@pytest.fixture |
| 24 | +def ram_usage() -> Iterator[Callable[[], int]]: |
| 25 | + try: |
| 26 | + import tracemalloc # pylint: disable=import-outside-toplevel |
| 27 | + except ImportError: # e.g. PyPy |
| 28 | + pytest.skip("tracemalloc module not available") |
| 29 | + |
| 30 | + try: |
| 31 | + tracemalloc.start() |
| 32 | + yield lambda: tracemalloc.get_traced_memory()[1] |
| 33 | + finally: |
| 34 | + tracemalloc.stop() |
| 35 | + |
| 36 | + |
| 37 | +BLOCK_SIZE = 1_000_000 |
| 38 | + |
| 39 | + |
| 40 | +@pytest.fixture |
| 41 | +def fileobj() -> BinaryIO: |
| 42 | + # create xz raw data composed of many identical blocks |
| 43 | + nb_blocks = 50 |
| 44 | + |
| 45 | + seed(0) |
| 46 | + data = compress(randbytes(BLOCK_SIZE)) |
| 47 | + header = data[:12] |
| 48 | + footer = data[-12:] |
| 49 | + check, backward_size = parse_xz_footer(footer) |
| 50 | + block = data[12 : -12 - backward_size] |
| 51 | + records = parse_xz_index(data[-12 - backward_size : -12]) |
| 52 | + index_footer = create_xz_index_footer(check, records * nb_blocks) |
| 53 | + |
| 54 | + return cast( |
| 55 | + BinaryIO, |
| 56 | + IOCombiner( |
| 57 | + IOStatic(header), |
| 58 | + *[IOStatic(block)] * nb_blocks, |
| 59 | + IOStatic(index_footer), |
| 60 | + ), |
| 61 | + ) |
| 62 | + |
| 63 | + |
| 64 | +def test_read_linear( |
| 65 | + # pylint: disable=redefined-outer-name |
| 66 | + fileobj: BinaryIO, |
| 67 | + ram_usage: Callable[[], int], |
| 68 | +) -> None: |
| 69 | + with XZFile(fileobj) as xz_file: |
| 70 | + # read almost one block |
| 71 | + xz_file.read(BLOCK_SIZE - 1) |
| 72 | + one_block_memory = ram_usage() |
| 73 | + |
| 74 | + # read all the file |
| 75 | + while xz_file.read(DEFAULT_BUFFER_SIZE): |
| 76 | + assert ( |
| 77 | + # should not use much more memory, take 2 as error margin |
| 78 | + ram_usage() |
| 79 | + < one_block_memory * 2 |
| 80 | + ), f"Consumes too much RAM (at {100 * xz_file.tell() / len(xz_file):.0f}%)" |
| 81 | + |
| 82 | + |
| 83 | +def test_write( |
| 84 | + tmp_path: Path, |
| 85 | + # pylint: disable=redefined-outer-name |
| 86 | + ram_usage: Callable[[], int], |
| 87 | +) -> None: |
| 88 | + nb_blocks = 10 |
| 89 | + |
| 90 | + seed(0) |
| 91 | + |
| 92 | + one_block_memory: Optional[int] = None |
| 93 | + |
| 94 | + with XZFile(tmp_path / "archive.xz", "w") as xz_file: |
| 95 | + for i in range(nb_blocks): |
| 96 | + xz_file.change_block() |
| 97 | + xz_file.write(randbytes(BLOCK_SIZE)) |
| 98 | + |
| 99 | + if one_block_memory is None: |
| 100 | + one_block_memory = ram_usage() |
| 101 | + else: |
| 102 | + assert ( |
| 103 | + # should not use much more memory, take 2 as error margin |
| 104 | + ram_usage() |
| 105 | + < one_block_memory * 2 |
| 106 | + ), f"Consumes too much RAM (at {i / nb_blocks:.0f}%)" |
0 commit comments