Skip to content

Commit a5fc968

Browse files
bhearsumahal
andauthored
feat: generate kinds in parallel across multiple processes (#738)
* feat: generate kinds in parallel across multiple processes This is a cleaned up and slightly improved version of @ahal's original patch. Most notably, it uses `wait` to resubmit new kinds as soon as they become available (instead of waiting for all kinds in each round to be completed). This means that if a slow kind gets submitted before all other (non-downstream) kinds have been submitted, that it won't block them. In the case of Gecko, the effect of this is that the `test` kind begins to process very quickly, and all other kinds are finished processing before that has completed. Locally, this took `./mach taskgraph tasks` from 1m26s to 1m9s (measured from command start to the final "Generated xxx tasks" message. On try the results were a bit more mixed. The minimum time I observed without this patch was 140s, while the maximum was 291s (which seems to have been caused by bugbug slowness...which I'm willing to throw out). Outside of that outlier, the maximum was 146s and the mean was 143s. The minimum time I observed with this patch was 130s, while the maximum was 144s and the mean was 138s. I presume the difference in results locally vs. Try is that locally I'm on a 64-core SSD machine, and the decision tasks run on lowered powered machines on Try, so there ends up being some resource contention (I/O, I suspect, because the ProcessPoolExecutor will only run one process per CPU core) when we process kinds in parallel there. Despite this disappointing result on Try, this may still be worth taking, as `./mach taskgraph` runs twice in the critical path of many try pushes (once on a developer's machine, and again in the decision task). raw data: Over 5 runs on try I got, without this patch: 291s, 146s, 146s, 140s, 140s In each of those, there were 241s, 92s, 94s, 90s, 90s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things in the critical path: 50s, 54s, 52s, 50s, 50s With this patch: 130s, 141s, and 144s, 140s, 135s In each of those, there were 105s, 114s, 115s, 114s, 109s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things, but it was almost entirely out of the critical path: 25s, 27s, 29s, 26s, 26s * refactor: make fake graph config's picklable --------- Co-authored-by: Andrew Halberstadt <[email protected]>
1 parent 260446b commit a5fc968

File tree

3 files changed

+104
-37
lines changed

3 files changed

+104
-37
lines changed

packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,13 @@ def _load_kinds(self, graph_config, target_kind=None):
6262
yield FakeKind(kind_name, "/fake", config, graph_config)
6363

6464

65+
class FakeGraphConfig(GraphConfig):
66+
def register(self):
67+
pass
68+
69+
6570
def fake_load_graph_config(root_dir):
66-
graph_config = GraphConfig(
71+
graph_config = FakeGraphConfig(
6772
{
6873
"trust-domain": "test-domain",
6974
"taskgraph": {
@@ -103,7 +108,6 @@ def fake_load_graph_config(root_dir):
103108
},
104109
root_dir,
105110
)
106-
graph_config.__dict__["register"] = lambda: None
107111
return graph_config
108112

109113

src/taskgraph/generator.py

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
import copy
66
import logging
77
import os
8+
from concurrent.futures import (
9+
FIRST_COMPLETED,
10+
ProcessPoolExecutor,
11+
wait,
12+
)
813
from dataclasses import dataclass
914
from typing import Callable, Dict, Optional, Union
1015

@@ -44,16 +49,20 @@ def _get_loader(self):
4449
loader = "taskgraph.loader.default:loader"
4550
return find_object(loader)
4651

47-
def load_tasks(self, parameters, loaded_tasks, write_artifacts):
52+
def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts):
53+
logger.debug(f"Loading tasks for kind {self.name}")
54+
55+
parameters = Parameters(**parameters)
4856
loader = self._get_loader()
4957
config = copy.deepcopy(self.config)
5058

51-
kind_dependencies = config.get("kind-dependencies", [])
52-
kind_dependencies_tasks = {
53-
task.label: task for task in loaded_tasks if task.kind in kind_dependencies
54-
}
55-
56-
inputs = loader(self.name, self.path, config, parameters, loaded_tasks)
59+
inputs = loader(
60+
self.name,
61+
self.path,
62+
config,
63+
parameters,
64+
list(kind_dependencies_tasks.values()),
65+
)
5766

5867
transforms = TransformSequence()
5968
for xform_path in config["transforms"]:
@@ -87,6 +96,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts):
8796
)
8897
for task_dict in transforms(trans_config, inputs)
8998
]
99+
logger.info(f"Generated {len(tasks)} tasks for kind {self.name}")
90100
return tasks
91101

92102
@classmethod
@@ -251,6 +261,71 @@ def _load_kinds(self, graph_config, target_kinds=None):
251261
except KindNotFound:
252262
continue
253263

264+
def _load_tasks(self, kinds, kind_graph, parameters):
265+
all_tasks = {}
266+
futures_to_kind = {}
267+
futures = set()
268+
edges = set(kind_graph.edges)
269+
270+
def add_new_tasks(future):
271+
for task in future.result():
272+
if task.label in all_tasks:
273+
raise Exception("duplicate tasks with label " + task.label)
274+
all_tasks[task.label] = task
275+
276+
with ProcessPoolExecutor() as executor:
277+
278+
def submit_ready_kinds():
279+
"""Create the next batch of tasks for kinds without dependencies."""
280+
nonlocal kinds, edges, futures
281+
loaded_tasks = all_tasks.copy()
282+
kinds_with_deps = {edge[0] for edge in edges}
283+
ready_kinds = (
284+
set(kinds) - kinds_with_deps - set(futures_to_kind.values())
285+
)
286+
for name in ready_kinds:
287+
kind = kinds.get(name)
288+
if not kind:
289+
message = (
290+
f'Could not find the kind "{name}"\nAvailable kinds:\n'
291+
)
292+
for k in sorted(kinds):
293+
message += f' - "{k}"\n'
294+
raise Exception(message)
295+
296+
future = executor.submit(
297+
kind.load_tasks,
298+
dict(parameters),
299+
{
300+
k: t
301+
for k, t in loaded_tasks.items()
302+
if t.kind in kind.config.get("kind-dependencies", [])
303+
},
304+
self._write_artifacts,
305+
)
306+
future.add_done_callback(add_new_tasks)
307+
futures.add(future)
308+
futures_to_kind[future] = name
309+
310+
submit_ready_kinds()
311+
while futures:
312+
done, _ = wait(futures, return_when=FIRST_COMPLETED)
313+
for future in done:
314+
if exc := future.exception():
315+
executor.shutdown(wait=False, cancel_futures=True)
316+
raise exc
317+
kind = futures_to_kind.pop(future)
318+
futures.remove(future)
319+
320+
# Update state for next batch of futures.
321+
del kinds[kind]
322+
edges = {e for e in edges if e[1] != kind}
323+
324+
# Submit any newly unblocked kinds
325+
submit_ready_kinds()
326+
327+
return all_tasks
328+
254329
def _run(self):
255330
logger.info("Loading graph configuration.")
256331
graph_config = load_graph_config(self.root_dir)
@@ -305,31 +380,8 @@ def _run(self):
305380
)
306381

307382
logger.info("Generating full task set")
308-
all_tasks = {}
309-
for kind_name in kind_graph.visit_postorder():
310-
logger.debug(f"Loading tasks for kind {kind_name}")
311-
312-
kind = kinds.get(kind_name)
313-
if not kind:
314-
message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n'
315-
for k in sorted(kinds):
316-
message += f' - "{k}"\n'
317-
raise Exception(message)
383+
all_tasks = self._load_tasks(kinds, kind_graph, parameters)
318384

319-
try:
320-
new_tasks = kind.load_tasks(
321-
parameters,
322-
list(all_tasks.values()),
323-
self._write_artifacts,
324-
)
325-
except Exception:
326-
logger.exception(f"Error loading tasks for kind {kind_name}:")
327-
raise
328-
for task in new_tasks:
329-
if task.label in all_tasks:
330-
raise Exception("duplicate tasks with label " + task.label)
331-
all_tasks[task.label] = task
332-
logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}")
333385
full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset()))
334386
yield self.verify("full_task_set", full_task_set, graph_config, parameters)
335387

test/test_generator.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,27 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55

6+
from concurrent.futures import ProcessPoolExecutor
7+
68
import pytest
7-
from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config
9+
from pytest_taskgraph import WithFakeKind, fake_load_graph_config
810

911
from taskgraph import generator, graph
1012
from taskgraph.generator import Kind, load_tasks_for_kind
1113
from taskgraph.loader.default import loader as default_loader
1214

1315

14-
def test_kind_ordering(maketgg):
16+
class FakePPE(ProcessPoolExecutor):
17+
loaded_kinds = []
18+
19+
def submit(self, kind_load_tasks, *args):
20+
self.loaded_kinds.append(kind_load_tasks.__self__.name)
21+
return super().submit(kind_load_tasks, *args)
22+
23+
24+
def test_kind_ordering(mocker, maketgg):
1525
"When task kinds depend on each other, they are loaded in postorder"
26+
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
1627
tgg = maketgg(
1728
kinds=[
1829
("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}),
@@ -21,7 +32,7 @@ def test_kind_ordering(maketgg):
2132
]
2233
)
2334
tgg._run_until("full_task_set")
24-
assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
35+
assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
2536

2637

2738
def test_full_task_set(maketgg):
@@ -275,5 +286,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co
275286
kind = Kind(
276287
name="fake", path="foo/bar", config=kind_config, graph_config=graph_config
277288
)
278-
tasks = kind.load_tasks(parameters, [], False)
289+
tasks = kind.load_tasks(parameters, {}, False)
279290
assert tasks

0 commit comments

Comments
 (0)