Skip to content

Commit 9eded51

Browse files
ahalbhearsum
authored andcommitted
feat: generate kinds in parallel across multiple processes
This is a cleaned up and slightly improved version of @ahal's original patch. Most notably, it uses `wait` to resubmit new kinds as soon as they become available (instead of waiting for all kinds in each round to be completed). This means that if a slow kind gets submitted before all other (non-downstream) kinds have been submitted, that it won't block them. In the case of Gecko, the effect of this is that the `test` kind begins to process very quickly, and all other kinds are finished processing before that has completed. Locally, this took `./mach taskgraph tasks` from 1m26s to 1m9s (measured from command start to the final "Generated xxx tasks" message. On try the results were a bit more mixed. The minimum time I observed without this patch was 140s, while the maximum was 291s (which seems to have been caused by bugbug slowness...which I'm willing to throw out). Outside of that outlier, the maximum was 146s and the mean was 143s. The minimum time I observed with this patch was 130s, while the maximum was 144s and the mean was 138s. I presume the difference in results locally vs. Try is that locally I'm on a 64-core SSD machine, and the decision tasks run on lowered powered machines on Try, so there ends up being some resource contention (I/O, I suspect, because the ProcessPoolExecutor will only run one process per CPU core) when we process kinds in parallel there. Despite this disappointing result on Try, this may still be worth taking, as `./mach taskgraph` runs twice in the critical path of many try pushes (once on a developer's machine, and again in the decision task). raw data: Over 5 runs on try I got, without this patch: 291s, 146s, 146s, 140s, 140s In each of those, there were 241s, 92s, 94s, 90s, 90s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things in the critical path: 50s, 54s, 52s, 50s, 50s With this patch: 130s, 141s, and 144s, 140s, 135s In each of those, there were 105s, 114s, 115s, 114s, 109s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things, but it was almost entirely out of the critical path: 25s, 27s, 29s, 26s, 26s
1 parent 5ffdbdb commit 9eded51

File tree

2 files changed

+93
-35
lines changed

2 files changed

+93
-35
lines changed

src/taskgraph/generator.py

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

5+
from collections import defaultdict
56
import copy
7+
from itertools import chain
68
import logging
79
import os
10+
from concurrent.futures import (
11+
FIRST_COMPLETED,
12+
ProcessPoolExecutor,
13+
wait,
14+
)
815
from dataclasses import dataclass
916
from typing import Callable, Dict, Optional, Union
1017

@@ -44,16 +51,20 @@ def _get_loader(self):
4451
loader = "taskgraph.loader.default:loader"
4552
return find_object(loader)
4653

47-
def load_tasks(self, parameters, loaded_tasks, write_artifacts):
54+
def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts):
55+
logger.debug(f"Loading tasks for kind {self.name}")
56+
57+
parameters = Parameters(**parameters)
4858
loader = self._get_loader()
4959
config = copy.deepcopy(self.config)
5060

51-
kind_dependencies = config.get("kind-dependencies", [])
52-
kind_dependencies_tasks = {
53-
task.label: task for task in loaded_tasks if task.kind in kind_dependencies
54-
}
55-
56-
inputs = loader(self.name, self.path, config, parameters, loaded_tasks)
61+
inputs = loader(
62+
self.name,
63+
self.path,
64+
config,
65+
parameters,
66+
list(kind_dependencies_tasks.values()),
67+
)
5768

5869
transforms = TransformSequence()
5970
for xform_path in config["transforms"]:
@@ -87,6 +98,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts):
8798
)
8899
for task_dict in transforms(trans_config, inputs)
89100
]
101+
logger.info(f"Generated {len(tasks)} tasks for kind {self.name}")
90102
return tasks
91103

92104
@classmethod
@@ -251,6 +263,66 @@ def _load_kinds(self, graph_config, target_kinds=None):
251263
except KindNotFound:
252264
continue
253265

266+
def _load_tasks(self, kinds, kind_graph, parameters):
267+
all_tasks = {}
268+
futures_to_kind = {}
269+
futures = set()
270+
edges = set(kind_graph.edges)
271+
272+
def add_new_tasks(future):
273+
for task in future.result():
274+
if task.label in all_tasks:
275+
raise Exception("duplicate tasks with label " + task.label)
276+
all_tasks[task.label] = task
277+
278+
with ProcessPoolExecutor() as executor:
279+
280+
def submit_ready_kinds():
281+
"""Create the next batch of tasks for kinds without dependencies."""
282+
nonlocal kinds, edges, futures
283+
loaded_tasks = all_tasks.copy()
284+
kinds_with_deps = {edge[0] for edge in edges}
285+
ready_kinds = (
286+
set(kinds) - kinds_with_deps - set(futures_to_kind.values())
287+
)
288+
for name in ready_kinds:
289+
kind = kinds.get(name)
290+
if not kind:
291+
message = f'Could not find the kind "{name}"\nAvailable kinds:\n'
292+
for k in sorted(kinds):
293+
message += f' - "{k}"\n'
294+
raise Exception(message)
295+
296+
future = executor.submit(
297+
kind.load_tasks,
298+
dict(parameters),
299+
{
300+
k: t
301+
for k, t in loaded_tasks.items()
302+
if t.kind in kind.config.get("kind-dependencies", [])
303+
},
304+
self._write_artifacts,
305+
)
306+
future.add_done_callback(add_new_tasks)
307+
futures.add(future)
308+
futures_to_kind[future] = name
309+
310+
submit_ready_kinds()
311+
while futures:
312+
done, _ = wait(futures, return_when=FIRST_COMPLETED)
313+
for future in done:
314+
kind = futures_to_kind.pop(future)
315+
futures.remove(future)
316+
317+
# Update state for next batch of futures.
318+
del kinds[kind]
319+
edges = {e for e in edges if e[1] != kind}
320+
321+
# Submit any newly unblocked kinds
322+
submit_ready_kinds()
323+
324+
return all_tasks
325+
254326
def _run(self):
255327
logger.info("Loading graph configuration.")
256328
graph_config = load_graph_config(self.root_dir)
@@ -305,31 +377,8 @@ def _run(self):
305377
)
306378

307379
logger.info("Generating full task set")
308-
all_tasks = {}
309-
for kind_name in kind_graph.visit_postorder():
310-
logger.debug(f"Loading tasks for kind {kind_name}")
311-
312-
kind = kinds.get(kind_name)
313-
if not kind:
314-
message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n'
315-
for k in sorted(kinds):
316-
message += f' - "{k}"\n'
317-
raise Exception(message)
380+
all_tasks = self._load_tasks(kinds, kind_graph, parameters)
318381

319-
try:
320-
new_tasks = kind.load_tasks(
321-
parameters,
322-
list(all_tasks.values()),
323-
self._write_artifacts,
324-
)
325-
except Exception:
326-
logger.exception(f"Error loading tasks for kind {kind_name}:")
327-
raise
328-
for task in new_tasks:
329-
if task.label in all_tasks:
330-
raise Exception("duplicate tasks with label " + task.label)
331-
all_tasks[task.label] = task
332-
logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}")
333382
full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset()))
334383
yield self.verify("full_task_set", full_task_set, graph_config, parameters)
335384

test/test_generator.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,25 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

55

6+
from concurrent.futures import ProcessPoolExecutor
67
import pytest
7-
from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config
8+
from pytest_taskgraph import WithFakeKind, fake_load_graph_config
89

910
from taskgraph import generator, graph
1011
from taskgraph.generator import Kind, load_tasks_for_kind
1112
from taskgraph.loader.default import loader as default_loader
1213

1314

14-
def test_kind_ordering(maketgg):
15+
class FakePPE(ProcessPoolExecutor):
16+
loaded_kinds = []
17+
def submit(self, kind_load_tasks, *args):
18+
self.loaded_kinds.append(kind_load_tasks.__self__.name)
19+
return super().submit(kind_load_tasks, *args)
20+
21+
22+
def test_kind_ordering(mocker, maketgg):
1523
"When task kinds depend on each other, they are loaded in postorder"
24+
mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE)
1625
tgg = maketgg(
1726
kinds=[
1827
("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}),
@@ -21,7 +30,7 @@ def test_kind_ordering(maketgg):
2130
]
2231
)
2332
tgg._run_until("full_task_set")
24-
assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
33+
assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"]
2534

2635

2736
def test_full_task_set(maketgg):
@@ -275,5 +284,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co
275284
kind = Kind(
276285
name="fake", path="foo/bar", config=kind_config, graph_config=graph_config
277286
)
278-
tasks = kind.load_tasks(parameters, [], False)
287+
tasks = kind.load_tasks(parameters, {}, False)
279288
assert tasks

0 commit comments

Comments
 (0)