diff --git a/news/13247.feature.rst b/news/13247.feature.rst new file mode 100644 index 00000000000..770f84c240d --- /dev/null +++ b/news/13247.feature.rst @@ -0,0 +1,4 @@ +Bytecode compilation is parallelized to significantly speed up installation of +large/many packages. By default, the number of workers matches the available CPUs +(up to a hard-coded limit), but can be adjusted using the ``--install-jobs`` +option. To disable parallelization, pass ``--install-jobs 1``. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 81fed6e940d..1ed4a5359fb 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -1070,6 +1070,39 @@ def check_list_path_option(options: Values) -> None: ) +def _handle_jobs( + option: Option, opt_str: str, value: str, parser: OptionParser +) -> None: + if value == "auto": + setattr(parser.values, option.dest, "auto") + return + + try: + if (count := int(value)) > 0: + setattr(parser.values, option.dest, count) + return + except ValueError: + pass + + msg = "should be a positive integer or 'auto'" + raise_option_error(parser, option=option, msg=msg) + + +install_jobs: Callable[..., Option] = partial( + Option, + "--install-jobs", + dest="install_jobs", + default="auto", + type=str, + action="callback", + callback=_handle_jobs, + help=( + "Maximum number of workers to use while installing packages. " + "To disable parallelization, pass 1. (default: %default)" + ), +) + + ########## # groups # ########## diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index 5239d010421..9f5f4812345 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -270,6 +270,8 @@ def add_options(self) -> None: ), ) + self.cmd_opts.add_option(cmdoptions.install_jobs()) + @with_cleanup def run(self, options: Values, args: List[str]) -> int: if options.use_user_site and options.target_dir is not None: @@ -416,6 +418,10 @@ def run(self, options: Values, args: List[str]) -> int: # we're not modifying it. modifying_pip = pip_req.satisfied_by is None protect_pip_from_modification_on_windows(modifying_pip=modifying_pip) + if modifying_pip: + # Parallelization will re-import pip when starting new workers + # during installation which is unsafe if pip is being modified. + options.install_jobs = 1 reqs_to_build = [ r @@ -465,6 +471,7 @@ def run(self, options: Values, args: List[str]) -> int: use_user_site=options.use_user_site, pycompile=options.compile, progress_bar=options.progress_bar, + workers=options.install_jobs, ) lib_locations = get_lib_location_guesses( diff --git a/src/pip/_internal/operations/install/wheel.py b/src/pip/_internal/operations/install/wheel.py index 73e4bfc7c00..416e8da358c 100644 --- a/src/pip/_internal/operations/install/wheel.py +++ b/src/pip/_internal/operations/install/wheel.py @@ -1,19 +1,15 @@ """Support for installing and building the "wheel" binary package format.""" import collections -import compileall import contextlib import csv -import importlib import logging import os.path import re import shutil import sys -import warnings from base64 import urlsafe_b64encode from email.message import Message -from io import StringIO from itertools import chain, filterfalse, starmap from typing import ( IO, @@ -51,6 +47,7 @@ from pip._internal.models.scheme import SCHEME_KEYS, Scheme from pip._internal.utils.filesystem import adjacent_tmp_file, replace from pip._internal.utils.misc import ensure_dir, hash_file, partition +from pip._internal.utils.pyc_compile import BytecodeCompiler from pip._internal.utils.unpacking import ( current_umask, is_within_directory, @@ -417,12 +414,12 @@ def make( return super().make(specification, options) -def _install_wheel( # noqa: C901, PLR0915 function is too long +def _install_wheel( # noqa: C901 function is too long name: str, wheel_zip: ZipFile, wheel_path: str, scheme: Scheme, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler], warn_script_location: bool = True, direct_url: Optional[DirectUrl] = None, requested: bool = False, @@ -601,25 +598,14 @@ def pyc_source_file_paths() -> Generator[str, None, None]: continue yield full_installed_path - def pyc_output_path(path: str) -> str: - """Return the path the pyc file would have been written to.""" - return importlib.util.cache_from_source(path) - # Compile all of the pyc files for the installed files - if pycompile: - with contextlib.redirect_stdout(StringIO()) as stdout: - with warnings.catch_warnings(): - warnings.filterwarnings("ignore") - for path in pyc_source_file_paths(): - success = compileall.compile_file(path, force=True, quiet=True) - if success: - pyc_path = pyc_output_path(path) - assert os.path.exists(pyc_path) - pyc_record_path = cast( - "RecordPath", pyc_path.replace(os.path.sep, "/") - ) - record_installed(pyc_record_path, pyc_path) - logger.debug(stdout.getvalue()) + if pycompiler is not None: + for module in pycompiler(pyc_source_file_paths()): + if module.is_success: + pyc_record_path = module.pyc_path.replace(os.path.sep, "/") + record_installed(RecordPath(pyc_record_path), module.pyc_path) + if output := module.compile_output: + logger.debug(output) maker = PipScriptMaker(None, scheme.scripts) @@ -718,7 +704,7 @@ def install_wheel( wheel_path: str, scheme: Scheme, req_description: str, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler] = None, warn_script_location: bool = True, direct_url: Optional[DirectUrl] = None, requested: bool = False, @@ -730,7 +716,7 @@ def install_wheel( wheel_zip=z, wheel_path=wheel_path, scheme=scheme, - pycompile=pycompile, + pycompiler=pycompiler, warn_script_location=warn_script_location, direct_url=direct_url, requested=requested, diff --git a/src/pip/_internal/req/__init__.py b/src/pip/_internal/req/__init__.py index bf282dab8bc..ea1a79a0ef2 100644 --- a/src/pip/_internal/req/__init__.py +++ b/src/pip/_internal/req/__init__.py @@ -1,10 +1,14 @@ import collections import logging +from contextlib import nullcontext from dataclasses import dataclass -from typing import Generator, List, Optional, Sequence, Tuple +from functools import partial +from typing import Generator, Iterable, List, Optional, Sequence, Tuple +from zipfile import ZipFile from pip._internal.cli.progress_bars import get_install_progress_renderer from pip._internal.utils.logging import indent_log +from pip._internal.utils.pyc_compile import WorkerSetting, create_bytecode_compiler from .req_file import parse_requirements from .req_install import InstallRequirement @@ -33,6 +37,28 @@ def _validate_requirements( yield req.name, req +def _does_python_size_surpass_threshold( + requirements: Iterable[InstallRequirement], threshold: int +) -> bool: + """Inspect wheels to check whether there is enough .py code to + enable bytecode parallelization. + """ + py_size = 0 + for req in requirements: + if not req.local_file_path or not req.is_wheel: + # No wheel to inspect as this is a legacy editable. + continue + + with ZipFile(req.local_file_path, allowZip64=True) as wheel_file: + for entry in wheel_file.infolist(): + if entry.filename.endswith(".py"): + py_size += entry.file_size + if py_size > threshold: + return True + + return False + + def install_given_reqs( requirements: List[InstallRequirement], global_options: Sequence[str], @@ -43,6 +69,7 @@ def install_given_reqs( use_user_site: bool, pycompile: bool, progress_bar: str, + workers: WorkerSetting, ) -> List[InstallationResult]: """ Install everything in the given list. @@ -68,7 +95,15 @@ def install_given_reqs( ) items = renderer(items) - with indent_log(): + if pycompile: + code_size_check = partial( + _does_python_size_surpass_threshold, to_install.values() + ) + pycompiler = create_bytecode_compiler(workers, code_size_check) + else: + pycompiler = None + + with indent_log(), pycompiler or nullcontext(): for requirement in items: req_name = requirement.name assert req_name is not None @@ -87,7 +122,7 @@ def install_given_reqs( prefix=prefix, warn_script_location=warn_script_location, use_user_site=use_user_site, - pycompile=pycompile, + pycompiler=pycompiler, ) except Exception: # if install did not succeed, rollback previous uninstall diff --git a/src/pip/_internal/req/req_install.py b/src/pip/_internal/req/req_install.py index 3262d82658e..3dd1798c39a 100644 --- a/src/pip/_internal/req/req_install.py +++ b/src/pip/_internal/req/req_install.py @@ -53,6 +53,7 @@ redact_auth_from_url, ) from pip._internal.utils.packaging import get_requirement +from pip._internal.utils.pyc_compile import BytecodeCompiler from pip._internal.utils.subprocess import runner_with_spinner_message from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds from pip._internal.utils.unpacking import unpack_file @@ -812,7 +813,7 @@ def install( prefix: Optional[str] = None, warn_script_location: bool = True, use_user_site: bool = False, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler] = None, ) -> None: assert self.req is not None scheme = get_scheme( @@ -869,7 +870,7 @@ def install( self.local_file_path, scheme=scheme, req_description=str(self.req), - pycompile=pycompile, + pycompiler=pycompiler, warn_script_location=warn_script_location, direct_url=self.download_info if self.is_direct else None, requested=self.user_supplied, diff --git a/src/pip/_internal/utils/pyc_compile.py b/src/pip/_internal/utils/pyc_compile.py new file mode 100644 index 00000000000..472ab19e0d4 --- /dev/null +++ b/src/pip/_internal/utils/pyc_compile.py @@ -0,0 +1,170 @@ +# -------------------------------------------------------------------------- # +# NOTE: Importing from pip's internals or vendored modules should be AVOIDED +# so this module remains fast to import, minimizing the overhead of +# spawning a new bytecode compiler worker. +# -------------------------------------------------------------------------- # + +import compileall +import importlib +import os +import sys +import warnings +from collections.abc import Callable, Iterable, Iterator +from contextlib import contextmanager, redirect_stdout +from io import StringIO +from pathlib import Path +from typing import TYPE_CHECKING, Literal, NamedTuple, Optional, Protocol, Union + +if TYPE_CHECKING: + from pip._vendor.typing_extensions import Self + +WorkerSetting = Union[int, Literal["auto"]] + +CODE_SIZE_THRESHOLD = 1000 * 1000 # 1 MB of .py code +WORKER_LIMIT = 8 + + +@contextmanager +def _patch_main_module_hack() -> Iterator[None]: + """Temporarily replace __main__ to reduce the worker startup overhead. + + concurrent.futures imports the main module while initializing new workers + so any global state is retained in the workers. Unfortunately, when pip + is run from a console script wrapper, the wrapper unconditionally imports + pip._internal.cli.main and everything else it requires. This is *slow*. + + The compilation code does not depend on any global state, thus the costly + re-import of pip can be avoided by replacing __main__ with any random + module that does nothing. + """ + original_main = sys.modules["__main__"] + sys.modules["__main__"] = sys.modules["pip"] + try: + yield + finally: + sys.modules["__main__"] = original_main + + +class CompileResult(NamedTuple): + py_path: str + pyc_path: str + is_success: bool + compile_output: str + + +def _compile_single(py_path: Union[str, Path]) -> CompileResult: + # compile_file() returns True silently even if the source file is nonexistent. + if not os.path.exists(py_path): + raise FileNotFoundError(f"Python file '{py_path!s}' does not exist") + + with warnings.catch_warnings(), redirect_stdout(StringIO()) as stdout: + warnings.filterwarnings("ignore") + success = compileall.compile_file(py_path, force=True, quiet=True) + pyc_path = importlib.util.cache_from_source(py_path) # type: ignore[arg-type] + return CompileResult( + str(py_path), pyc_path, success, stdout.getvalue() # type: ignore[arg-type] + ) + + +class BytecodeCompiler(Protocol): + """Abstraction for compiling Python modules into bytecode in bulk.""" + + def __call__(self, paths: Iterable[str]) -> Iterable[CompileResult]: ... + + def __enter__(self) -> "Self": + return self + + def __exit__(self, *args: object) -> None: + return + + +class SerialCompiler(BytecodeCompiler): + """Compile a set of Python modules one by one in-process.""" + + def __call__(self, paths: Iterable[Union[str, Path]]) -> Iterable[CompileResult]: + for p in paths: + yield _compile_single(p) + + +class ParallelCompiler(BytecodeCompiler): + """Compile a set of Python modules using a pool of workers.""" + + def __init__(self, workers: int) -> None: + from concurrent import futures + + if sys.version_info >= (3, 14): + # Sub-interpreters have less overhead than OS processes. + self.pool = futures.InterpreterPoolExecutor(workers) + else: + self.pool = futures.ProcessPoolExecutor(workers) + self.workers = workers + + def __call__(self, paths: Iterable[Union[str, Path]]) -> Iterable[CompileResult]: + # New workers can be started at any time, so patch until fully done. + with _patch_main_module_hack(): + yield from self.pool.map(_compile_single, paths) + + def __exit__(self, *args: object) -> None: + # It's pointless to block on pool finalization, let it occur in background. + self.pool.shutdown(wait=False) + + +def create_bytecode_compiler( + max_workers: WorkerSetting = "auto", + code_size_check: Optional[Callable[[int], bool]] = None, +) -> BytecodeCompiler: + """Return a bytecode compiler appropriate for the workload and platform. + + Parallelization will only be used if: + - There are 2 or more CPUs available + - The maximum # of workers permitted is at least 2 + - There is "enough" code to be compiled to offset the worker startup overhead + (if it can be determined in advance via code_size_check) + + A maximum worker count of "auto" will use the number of CPUs available to the + process or system, up to a hard-coded limit (to avoid resource exhaustion). + + code_size_check is a callable that receives the code size threshold (in # of + bytes) for parallelization and returns whether it will be surpassed or not. + """ + import logging + + try: + # New in Python 3.13. + cpus: Optional[int] = os.process_cpu_count() # type: ignore + except AttributeError: + # Poor man's fallback. We won't respect PYTHON_CPU_COUNT, but the envvar + # was only added in Python 3.13 anyway. + try: + cpus = len(os.sched_getaffinity(0)) # exists on unix (usually) + except AttributeError: + cpus = os.cpu_count() + + logger = logging.getLogger(__name__) + logger.debug("Detected CPU count: %s", cpus) + logger.debug("Configured worker count: %s", max_workers) + + # Case 1: Parallelization is disabled or pointless (there's only one CPU). + if max_workers == 1 or cpus == 1 or cpus is None: + logger.debug("Bytecode will be compiled serially") + return SerialCompiler() + + # Case 2: There isn't enough code for parallelization to be worth it. + if code_size_check is not None and not code_size_check(CODE_SIZE_THRESHOLD): + logger.debug("Bytecode will be compiled serially (not enough .py code)") + return SerialCompiler() + + # Case 3: Attempt to initialize a parallelized compiler. + # The concurrent executors will spin up new workers on a "on-demand basis", + # which helps to avoid wasting time on starting new workers that won't be + # used. (** This isn't true for the fork start method, but forking is + # fast enough that it doesn't really matter.) + workers = min(cpus, WORKER_LIMIT) if max_workers == "auto" else max_workers + try: + compiler = ParallelCompiler(workers) + logger.debug("Bytecode will be compiled using at most %s workers", workers) + return compiler + except (ImportError, NotImplementedError, OSError) as e: + # Case 4: multiprocessing is broken, fall back to serial compilation. + logger.debug("Err! Falling back to serial bytecode compilation", exc_info=e) + return SerialCompiler() diff --git a/tests/unit/test_utils_pyc_compile.py b/tests/unit/test_utils_pyc_compile.py new file mode 100644 index 00000000000..d7d8a1e41ef --- /dev/null +++ b/tests/unit/test_utils_pyc_compile.py @@ -0,0 +1,172 @@ +from contextlib import contextmanager +from functools import partial +from pathlib import Path +from typing import Iterator, Optional, Type +from unittest.mock import Mock, patch + +import pytest +from pytest import param # noqa: PT013 + +from pip._internal.utils import pyc_compile +from pip._internal.utils.pyc_compile import ( + BytecodeCompiler, + ParallelCompiler, + SerialCompiler, + _compile_single, + create_bytecode_compiler, +) + +try: + import concurrent.futures + import multiprocessing +except (OSError, NotImplementedError, ImportError): + parallel_supported = False +else: + parallel_supported = True + +needs_parallel_compiler = pytest.mark.skipif( + not parallel_supported, reason="ParallelCompiler is unavailable" +) + + +@contextmanager +def patch_cpu_count(n: Optional[int]) -> Iterator[None]: + with patch("os.process_cpu_count", new=lambda: n, create=True): + yield + + +@pytest.fixture(autouse=True) +def force_spawn_method() -> Iterator[None]: + """Force the use of the spawn method to suppress thread-safety warnings.""" + if parallel_supported: + ctx = multiprocessing.get_context("spawn") + wrapped = partial(concurrent.futures.ProcessPoolExecutor, mp_context=ctx) + with patch.object(concurrent.futures, "ProcessPoolExecutor", wrapped): + yield + + +class TestCompileSingle: + def test_basic(self, tmp_path: Path) -> None: + source_file = tmp_path / "code.py" + source_file.write_text("print('hello, world')") + + result = _compile_single(source_file) + assert result.is_success + assert not result.compile_output.strip(), "nothing should be logged!" + assert "__pycache__" in result.pyc_path + assert Path(result.pyc_path).exists() + + def test_syntax_error( + self, tmp_path: Path, capsys: pytest.CaptureFixture[str] + ) -> None: + source_file = tmp_path / "code.py" + source_file.write_text("import ") + + result = _compile_single(source_file) + assert not result.is_success + assert result.compile_output.strip() + assert "SyntaxError" in result.compile_output + + stdout, stderr = capsys.readouterr() + assert not stdout, "output should be captured" + assert not stderr, "compileall does not use sys.stderr" + + def test_nonexistent_file(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError): + _compile_single(tmp_path / "aaa.py") + + +@pytest.mark.parametrize( + "compiler_kind", ["serial", param("parallel", marks=needs_parallel_compiler)] +) +def test_bulk_compilation(tmp_path: Path, compiler_kind: str) -> None: + files = [tmp_path / f"source{n}.py" for n in range(1, 11)] + for f in files: + f.write_text("pass") + + remaining = list(files) + compiler = SerialCompiler() if compiler_kind == "serial" else ParallelCompiler(2) + for result in compiler(files): + assert result.is_success + assert Path(result.pyc_path).exists() + remaining.remove(Path(result.py_path)) + + assert not remaining, "every file should've been compiled" + + +@pytest.mark.parametrize( + "compiler_kind", ["serial", param("parallel", marks=needs_parallel_compiler)] +) +def test_bulk_compilation_with_error(tmp_path: Path, compiler_kind: str) -> None: + good_files, bad_files = set(), set() + for n in range(1, 11): + source_file = tmp_path / f"source{n}.py" + if not n % 2: + source_file.write_text("pass") + good_files.add(source_file) + else: + source_file.write_text("import ") + bad_files.add(source_file) + + compiler = SerialCompiler() if compiler_kind == "serial" else ParallelCompiler(2) + + files = good_files | bad_files + remaining = files.copy() + reported_as_success, reported_as_fail = set(), set() + for result in compiler(files): + py_path = Path(result.py_path) + remaining.remove(py_path) + if result.is_success: + reported_as_success.add(py_path) + else: + reported_as_fail.add(py_path) + + assert not remaining, "every file should've been processed" + assert files - reported_as_success == bad_files + assert files - reported_as_fail == good_files + + +@needs_parallel_compiler +class TestCompilerSelection: + @pytest.mark.parametrize( + "cpus, expected_type", + [(None, SerialCompiler), (1, SerialCompiler), (2, ParallelCompiler)], + ) + def test_cpu_count( + self, cpus: Optional[int], expected_type: Type[BytecodeCompiler] + ) -> None: + with patch_cpu_count(cpus): + compiler = create_bytecode_compiler() + assert isinstance(compiler, expected_type) + if isinstance(compiler, ParallelCompiler): + assert compiler.workers == cpus + + def test_cpu_count_exceeds_limit(self) -> None: + with patch_cpu_count(10): + compiler = create_bytecode_compiler() + assert isinstance(compiler, ParallelCompiler) + assert compiler.workers == 8 + + def test_broken_multiprocessing(self) -> None: + fake_module = Mock() + fake_module.ProcessPoolExecutor = Mock(side_effect=NotImplementedError) + fake_module.InterpreterPoolExecutor = Mock(side_effect=NotImplementedError) + with ( + patch("concurrent.futures", fake_module), + patch.object( + pyc_compile, "ParallelCompiler", wraps=ParallelCompiler + ) as parallel_mock, + ): + compiler = create_bytecode_compiler(max_workers=2) + assert isinstance(compiler, SerialCompiler) + parallel_mock.assert_called_once() + + def test_only_one_worker(self) -> None: + with patch_cpu_count(2): + compiler = create_bytecode_compiler(max_workers=1) + assert isinstance(compiler, SerialCompiler) + + def test_not_enough_code(self) -> None: + with patch_cpu_count(2): + compiler = create_bytecode_compiler(code_size_check=lambda threshold: False) + assert isinstance(compiler, SerialCompiler)