diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index cad82b3b1..948a1866f 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -7,10 +7,10 @@ import logging import multiprocessing import os -import platform from concurrent.futures import ( FIRST_COMPLETED, ProcessPoolExecutor, + ThreadPoolExecutor, wait, ) from dataclasses import dataclass @@ -318,9 +318,22 @@ def _load_tasks_parallel(self, kinds, kind_graph, parameters): futures = set() edges = set(kind_graph.edges) - with ProcessPoolExecutor( - mp_context=multiprocessing.get_context("fork") - ) as executor: + # use processes if available; this allows us to use multiple CPU cores + # we should revisit this default when free-threaded python is more + # stable and performant. in the meantime, allowing the usage of threads + # can still be helpful when `fork` multiprocessing is not available + # (like windows and mac), and gives users the option to try using + # free threaded python to speed things up + if "fork" in multiprocessing.get_all_start_methods() and not os.environ.get( + "TASKGRAPH_USE_THREADS" + ): + executor = ProcessPoolExecutor( + mp_context=multiprocessing.get_context("fork") + ) + else: + executor = ThreadPoolExecutor(max_workers=os.process_cpu_count()) + + with executor: def submit_ready_kinds(): """Create the next batch of tasks for kinds without dependencies.""" @@ -434,14 +447,9 @@ def _run(self): yield "kind_graph", kind_graph logger.info("Generating full task set") - # Current parallel generation relies on multiprocessing, and forking. - # This causes problems on Windows and macOS due to how new processes - # are created there, and how doing so reinitializes global variables - # that are modified earlier in graph generation, that doesn't get - # redone in the new processes. Ideally this would be fixed, or we - # would take another approach to parallel kind generation. In the - # meantime, it's not supported outside of Linux. - if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"): + if "fork" not in multiprocessing.get_all_start_methods() or os.environ.get( + "TASKGRAPH_SERIAL" + ): all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) else: all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters) diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index 3c5f4c955..b07536b82 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -5,6 +5,7 @@ import pprint import re +import threading from collections.abc import Mapping import voluptuous @@ -12,6 +13,13 @@ import taskgraph from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path +# Global reentrant lock for thread-safe Schema creation, largely to work around +# thread unsafe code in voluptuous. +# RLock allows the same thread to acquire the lock multiple times (for recursive validation) +# This is particularly important when running with a free-threaded Python, which +# makes races more likely. +_schema_creation_lock = threading.RLock() + def validate_schema(schema, obj, msg_prefix): """ @@ -206,25 +214,32 @@ class Schema(voluptuous.Schema): """ def __init__(self, *args, check=True, **kwargs): - super().__init__(*args, **kwargs) + with _schema_creation_lock: + super().__init__(*args, **kwargs) - self.check = check - if not taskgraph.fast and self.check: - check_schema(self) + self.check = check + if not taskgraph.fast and self.check: + check_schema(self) def extend(self, *args, **kwargs): - schema = super().extend(*args, **kwargs) + with _schema_creation_lock: + schema = super().extend(*args, **kwargs) - if self.check: - check_schema(schema) - # We want twice extend schema to be checked too. - schema.__class__ = Schema - return schema + if self.check: + check_schema(schema) + # We want twice extend schema to be checked too. + schema.__class__ = Schema + return schema def _compile(self, schema): if taskgraph.fast: return - return super()._compile(schema) + with _schema_creation_lock: + return super()._compile(schema) + + def __call__(self, data): + with _schema_creation_lock: + return super().__call__(data) def __getitem__(self, item): return self.schema[item] # type: ignore diff --git a/test/test_generator.py b/test/test_generator.py index fe1695574..7f23f9707 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,6 +3,7 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import multiprocessing from concurrent.futures import ProcessPoolExecutor import pytest @@ -12,6 +13,11 @@ from taskgraph.generator import Kind, load_tasks_for_kind, load_tasks_for_kinds from taskgraph.loader.default import loader as default_loader +forkonly = pytest.mark.skipif( + "fork" not in multiprocessing.get_all_start_methods(), + reason="requires 'fork' multiprocessing support", +) + class FakePPE(ProcessPoolExecutor): loaded_kinds = [] @@ -21,6 +27,7 @@ def submit(self, kind_load_tasks, *args): return super().submit(kind_load_tasks, *args) +@forkonly def test_kind_ordering(mocker, maketgg): "When task kinds depend on each other, they are loaded in postorder" mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)