Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/taskgraph/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import logging
import multiprocessing
import os
import platform
from concurrent.futures import (
FIRST_COMPLETED,
ProcessPoolExecutor,
ThreadPoolExecutor,
wait,
)
from dataclasses import dataclass
Expand Down Expand Up @@ -318,9 +318,22 @@ def _load_tasks_parallel(self, kinds, kind_graph, parameters):
futures = set()
edges = set(kind_graph.edges)

with ProcessPoolExecutor(
mp_context=multiprocessing.get_context("fork")
) as executor:
# use processes if available; this allows us to use multiple CPU cores
# we should revisit this default when free-threaded python is more
# stable and performant. in the meantime, allowing the usage of threads
# can still be helpful when `fork` multiprocessing is not available
# (like windows and mac), and gives users the option to try using
# free threaded python to speed things up
if "fork" in multiprocessing.get_all_start_methods() and not os.environ.get(
"TASKGRAPH_USE_THREADS"
):
executor = ProcessPoolExecutor(
mp_context=multiprocessing.get_context("fork")
)
else:
executor = ThreadPoolExecutor(max_workers=os.process_cpu_count())

with executor:

def submit_ready_kinds():
"""Create the next batch of tasks for kinds without dependencies."""
Expand Down Expand Up @@ -434,14 +447,9 @@ def _run(self):
yield "kind_graph", kind_graph

logger.info("Generating full task set")
# Current parallel generation relies on multiprocessing, and forking.
# This causes problems on Windows and macOS due to how new processes
# are created there, and how doing so reinitializes global variables
# that are modified earlier in graph generation, that doesn't get
# redone in the new processes. Ideally this would be fixed, or we
# would take another approach to parallel kind generation. In the
# meantime, it's not supported outside of Linux.
if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"):
if "fork" not in multiprocessing.get_all_start_methods() or os.environ.get(
"TASKGRAPH_SERIAL"
):
all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters)
else:
all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters)
Expand Down
37 changes: 26 additions & 11 deletions src/taskgraph/util/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@

import pprint
import re
import threading
from collections.abc import Mapping

import voluptuous

import taskgraph
from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path

# Global reentrant lock for thread-safe Schema creation, largely to work around
# thread unsafe code in voluptuous.
# RLock allows the same thread to acquire the lock multiple times (for recursive validation)
# This is particularly important when running with a free-threaded Python, which
# makes races more likely.
_schema_creation_lock = threading.RLock()


def validate_schema(schema, obj, msg_prefix):
"""
Expand Down Expand Up @@ -206,25 +214,32 @@ class Schema(voluptuous.Schema):
"""

def __init__(self, *args, check=True, **kwargs):
super().__init__(*args, **kwargs)
with _schema_creation_lock:
super().__init__(*args, **kwargs)

self.check = check
if not taskgraph.fast and self.check:
check_schema(self)
self.check = check
if not taskgraph.fast and self.check:
check_schema(self)

def extend(self, *args, **kwargs):
schema = super().extend(*args, **kwargs)
with _schema_creation_lock:
schema = super().extend(*args, **kwargs)

if self.check:
check_schema(schema)
# We want twice extend schema to be checked too.
schema.__class__ = Schema
return schema
if self.check:
check_schema(schema)
# We want twice extend schema to be checked too.
schema.__class__ = Schema
return schema

def _compile(self, schema):
if taskgraph.fast:
return
return super()._compile(schema)
with _schema_creation_lock:
return super()._compile(schema)

def __call__(self, data):
with _schema_creation_lock:
return super().__call__(data)

def __getitem__(self, item):
return self.schema[item] # type: ignore
Expand Down
7 changes: 7 additions & 0 deletions test/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import multiprocessing
from concurrent.futures import ProcessPoolExecutor

import pytest
Expand All @@ -12,6 +13,11 @@
from taskgraph.generator import Kind, load_tasks_for_kind, load_tasks_for_kinds
from taskgraph.loader.default import loader as default_loader

forkonly = pytest.mark.skipif(
"fork" not in multiprocessing.get_all_start_methods(),
reason="requires 'fork' multiprocessing support",
)


class FakePPE(ProcessPoolExecutor):
loaded_kinds = []
Expand All @@ -21,6 +27,7 @@ def submit(self, kind_load_tasks, *args):
return super().submit(kind_load_tasks, *args)


@forkonly
def test_kind_ordering(mocker, maketgg):
"When task kinds depend on each other, they are loaded in postorder"
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
Expand Down
Loading