diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index a944a8b02..79031d808 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -154,14 +154,40 @@ def __init__( if root_dir is None: root_dir = "taskcluster" self.root_dir = root_dir - self._parameters = parameters + self._parameters_input = parameters self._decision_task_id = decision_task_id self._write_artifacts = write_artifacts self._enable_verifications = enable_verifications + self._graph_config = None + self._parameters = None + self._kinds = None + self._kind_graph = None + self._full_task_set = None + self._full_task_graph = None + self._target_task_set = None + self._target_task_graph = None + self._optimized_task_graph = None + self._morphed_task_graph = None + self._label_to_taskid = None + + @property + def graph_config(self): + """ + The configuration for this graph. + + @type: TaskGraph + """ + if self._graph_config: + return self._graph_config + + logger.info("Loading graph configuration.") + self._graph_config = load_graph_config(self.root_dir) + self._graph_config.register() + # Initial verifications that don't depend on any generation state. + self.verify("initial") + self.verify("graph_config", self._graph_config) - # start the generator - self._run = self._run() # type: ignore - self._run_results = {} + return self._graph_config @property def parameters(self): @@ -170,7 +196,64 @@ def parameters(self): @type: Properties """ - return self._run_until("parameters") + if self._parameters: + return self._parameters + + if callable(self._parameters_input): + parameters = self._parameters_input(self.graph_config) + else: + parameters = self._parameters_input + + logger.info(f"Using {self._parameters}") + logger.debug(f"Dumping parameters:\n{repr(self._parameters)}") + + self.verify("parameters", parameters) + self._parameters = parameters + return parameters + + @property + def kinds(self): + if self._kinds: + return self._kinds + logger.info("Loading kinds") + # put the kinds into a graph and sort topologically so that kinds are loaded + # in post-order + target_kinds = sorted(self.parameters.get("target-kinds", [])) + if target_kinds: + logger.info( + "Limiting kinds to following kinds and dependencies: {}".format( + ", ".join(target_kinds) + ) + ) + self._kinds = { + kind.name: kind + for kind in self._load_kinds(self.graph_config, target_kinds) + } + self.verify("kinds", self._kinds) + return self._kinds + + @property + def kind_graph(self): + """ + The dependency graph of kinds. + + @type: Graph + """ + if self._kind_graph: + return self._kind_graph + edges = set() + for kind in self.kinds.values(): + for dep in kind.config.get("kind-dependencies", []): + edges.add((kind.name, dep, "kind-dependency")) + self._kind_graph = Graph(frozenset(self.kinds), frozenset(edges)) + + target_kinds = sorted(self.parameters.get("target-kinds", [])) + if target_kinds: + self._kind_graph = self._kind_graph.transitive_closure( + set(target_kinds) | {"docker-image"} + ) + + return self._kind_graph @property def full_task_set(self): @@ -179,7 +262,42 @@ def full_task_set(self): @type: TaskGraph """ - return self._run_until("full_task_set") + if self._full_task_set: + return self._full_task_set + + logger.info("Generating full task set") + # The short version of the below is: we only support parallel kind + # processing on Linux. + # + # Current parallel generation relies on multiprocessing, and more + # specifically: the "fork" multiprocessing method. This is not supported + # at all on Windows (it uses "spawn"). Forking is supported on macOS, + # but no longer works reliably in all cases, and our usage of it here + # causes crashes. See https://github.com/python/cpython/issues/77906 + # and http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html + # for more details on that. + # Other methods of multiprocessing (both "spawn" and "forkserver") + # do not work for our use case, because they cause global variables + # to be reinitialized, which are sometimes modified earlier in graph + # generation. These issues can theoretically be worked around by + # eliminating all reliance on globals as part of task generation, but + # is far from a small amount of work in users like Gecko/Firefox. + # In the long term, the better path forward is likely to be switching + # to threading with a free-threaded python to achieve similar parallel + # processing. + if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"): + all_tasks = self._load_tasks_serial( + self.kinds, self.kind_graph, self.parameters + ) + else: + all_tasks = self._load_tasks_parallel( + self.kinds, self.kind_graph, self.parameters + ) + + full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) + self.verify("full_task_set", full_task_set, self.graph_config, self.parameters) + self._full_task_set = full_task_set + return self._full_task_set @property def full_task_graph(self): @@ -189,7 +307,32 @@ def full_task_graph(self): @type: TaskGraph """ - return self._run_until("full_task_graph") + if self._full_task_graph: + return self._full_task_graph + + all_tasks = self.full_task_set.tasks + logger.info("Generating full task graph") + edges = set() + for t in self.full_task_set: + for depname, dep in t.dependencies.items(): + if dep not in all_tasks.keys(): + raise Exception( + f"Task '{t.label}' lists a dependency that does not exist: '{dep}'" + ) + edges.add((t.label, dep, depname)) + + full_task_graph = TaskGraph( + all_tasks, + Graph(frozenset(self.full_task_set.graph.nodes), frozenset(edges)), + ) + logger.info( + f"Full task graph contains {len(self.full_task_set.graph.nodes)} tasks and {len(edges)} dependencies" + ) + self.verify( + "full_task_graph", full_task_graph, self.graph_config, self.parameters + ) + self._full_task_graph = full_task_graph + return full_task_graph @property def target_task_set(self): @@ -198,7 +341,39 @@ def target_task_set(self): @type: TaskGraph """ - return self._run_until("target_task_set") + if self._target_task_set: + return self._target_task_set + + all_tasks = self.full_task_set.tasks + logger.info("Generating target task set") + target_task_set = TaskGraph( + dict(all_tasks), + Graph(frozenset(all_tasks.keys()), frozenset()), + ) + filters = self.parameters.get("filters", []) + if not filters: + # Default to target_tasks_method if none specified. + filters.append("target_tasks_method") + filters = [filter_tasks.filter_task_functions[f] for f in filters] + + for fltr in filters: + old_len = len(target_task_set.graph.nodes) + target_tasks = set( + fltr(self._target_task_set, self.parameters, self.graph_config) + ) + self._target_task_set = TaskGraph( + {l: all_tasks[l] for l in target_tasks}, + Graph(frozenset(target_tasks), frozenset()), + ) + logger.info( + f"Filter {fltr.__name__} pruned {old_len - len(target_tasks)} tasks ({len(target_tasks)} remain)" + ) + + self.verify( + "target_task_set", target_task_set, self.graph_config, self.parameters + ) + self._target_task_set = target_task_set + return target_task_set @property def target_task_graph(self): @@ -207,7 +382,38 @@ def target_task_graph(self): @type: TaskGraph """ - return self._run_until("target_task_graph") + if self._target_task_graph: + return self._target_task_graph + + logger.info("Generating target task graph") + # include all tasks with `always_target` set + if self.parameters["enable_always_target"]: + always_target_tasks = { + t.label + for t in self.full_task_graph.tasks.values() + if t.attributes.get("always_target") + if self.parameters["enable_always_target"] is True + or t.kind in self.parameters["enable_always_target"] + } + else: + always_target_tasks = set() + + target_tasks = set(self.target_task_set.tasks) + logger.info( + f"Adding {len(always_target_tasks) - len(always_target_tasks & target_tasks)} tasks with `always_target` attribute" # type: ignore + ) + requested_tasks = target_tasks | always_target_tasks # type: ignore + target_graph = self.full_task_graph.graph.transitive_closure(requested_tasks) + all_tasks = self.full_task_set.tasks + target_task_graph = TaskGraph( + {l: all_tasks[l] for l in target_graph.nodes}, + target_graph, + ) + self.verify( + "target_task_graph", target_task_graph, self.graph_config, self.parameters + ) + self._target_task_graph = target_task_graph + return target_task_graph @property def optimized_task_graph(self): @@ -218,7 +424,56 @@ def optimized_task_graph(self): @type: TaskGraph """ - return self._run_until("optimized_task_graph") + if self._optimized_task_graph: + return self._optimized_task_graph + + logger.info("Generating optimized task graph") + existing_tasks = self.parameters.get("existing_tasks") + do_not_optimize = set(self.parameters.get("do_not_optimize", [])) + if not self.parameters.get("optimize_target_tasks", True): + do_not_optimize = set(self.target_task_set.graph.nodes).union( + do_not_optimize + ) + + # this is used for testing experimental optimization strategies + strategies = os.environ.get( + "TASKGRAPH_OPTIMIZE_STRATEGIES", self.parameters.get("optimize_strategies") + ) + if strategies: + strategies = find_object(strategies) + + if self.parameters["enable_always_target"]: + always_target_tasks = { + t.label + for t in self.full_task_graph.tasks.values() + if t.attributes.get("always_target") + if self.parameters["enable_always_target"] is True + or t.kind in self.parameters["enable_always_target"] + } + else: + always_target_tasks = set() + target_tasks = set(self.target_task_set.tasks) + requested_tasks = target_tasks | always_target_tasks # type: ignore + + optimized_task_graph, label_to_taskid = optimize_task_graph( + self.target_task_graph, + requested_tasks, + self.parameters, + do_not_optimize, + self._decision_task_id, + existing_tasks=existing_tasks, + strategy_override=strategies, + ) + + self.verify( + "optimized_task_graph", + optimized_task_graph, + self.graph_config, + self.parameters, + ) + self._optimized_task_graph = optimized_task_graph + self._label_to_taskid = label_to_taskid + return optimized_task_graph @property def label_to_taskid(self): @@ -228,7 +483,13 @@ def label_to_taskid(self): @type: dictionary """ - return self._run_until("label_to_taskid") + if self._label_to_taskid: + return self._label_to_taskid + + # ensure _label_to_taskid is populated before returning it + # don't run any further than necessary though + self.optimized_task_graph + return self._label_to_taskid @property def morphed_task_graph(self): @@ -239,25 +500,19 @@ def morphed_task_graph(self): @type: TaskGraph """ - return self._run_until("morphed_task_graph") - - @property - def graph_config(self): - """ - The configuration for this graph. - - @type: TaskGraph - """ - return self._run_until("graph_config") - - @property - def kind_graph(self): - """ - The dependency graph of kinds. + morphed_task_graph, label_to_taskid = morph( + self.optimized_task_graph, + self.label_to_taskid, + self.parameters, + self.graph_config, + ) - @type: Graph - """ - return self._run_until("kind_graph") + self.verify( + "morphed_task_graph", morphed_task_graph, self.graph_config, self.parameters + ) + self._morphed_task_graph = morphed_task_graph + self._label_to_taskid = label_to_taskid + return morphed_task_graph def _load_kinds(self, graph_config, target_kinds=None): if target_kinds: @@ -377,196 +632,6 @@ def submit_ready_kinds(): return all_tasks - def _run(self): - logger.info("Loading graph configuration.") - graph_config = load_graph_config(self.root_dir) - - yield ("graph_config", graph_config) - - graph_config.register() - - # Initial verifications that don't depend on any generation state. - self.verify("initial") - self.verify("graph_config", graph_config) - - if callable(self._parameters): - parameters = self._parameters(graph_config) - else: - parameters = self._parameters - - logger.info(f"Using {parameters}") - logger.debug(f"Dumping parameters:\n{repr(parameters)}") - - filters = parameters.get("filters", []) - if not filters: - # Default to target_tasks_method if none specified. - filters.append("target_tasks_method") - filters = [filter_tasks.filter_task_functions[f] for f in filters] - - yield self.verify("parameters", parameters) - - logger.info("Loading kinds") - # put the kinds into a graph and sort topologically so that kinds are loaded - # in post-order - target_kinds = sorted(parameters.get("target-kinds", [])) - if target_kinds: - logger.info( - "Limiting kinds to following kinds and dependencies: {}".format( - ", ".join(target_kinds) - ) - ) - kinds = { - kind.name: kind for kind in self._load_kinds(graph_config, target_kinds) - } - self.verify("kinds", kinds) - - edges = set() - for kind in kinds.values(): - for dep in kind.config.get("kind-dependencies", []): - edges.add((kind.name, dep, "kind-dependency")) - kind_graph = Graph(frozenset(kinds), frozenset(edges)) - - if target_kinds: - kind_graph = kind_graph.transitive_closure( - set(target_kinds) | {"docker-image"} - ) - - yield "kind_graph", kind_graph - - logger.info("Generating full task set") - # The short version of the below is: we only support parallel kind - # processing on Linux. - # - # Current parallel generation relies on multiprocessing, and more - # specifically: the "fork" multiprocessing method. This is not supported - # at all on Windows (it uses "spawn"). Forking is supported on macOS, - # but no longer works reliably in all cases, and our usage of it here - # causes crashes. See https://github.com/python/cpython/issues/77906 - # and http://sealiesoftware.com/blog/archive/2017/6/5/Objective-C_and_fork_in_macOS_1013.html - # for more details on that. - # Other methods of multiprocessing (both "spawn" and "forkserver") - # do not work for our use case, because they cause global variables - # to be reinitialized, which are sometimes modified earlier in graph - # generation. These issues can theoretically be worked around by - # eliminating all reliance on globals as part of task generation, but - # is far from a small amount of work in users like Gecko/Firefox. - # In the long term, the better path forward is likely to be switching - # to threading with a free-threaded python to achieve similar parallel - # processing. - if platform.system() != "Linux" or os.environ.get("TASKGRAPH_SERIAL"): - all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) - else: - all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters) - - full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) - yield self.verify("full_task_set", full_task_set, graph_config, parameters) - - logger.info("Generating full task graph") - edges = set() - for t in full_task_set: - for depname, dep in t.dependencies.items(): - if dep not in all_tasks.keys(): - raise Exception( - f"Task '{t.label}' lists a dependency that does not exist: '{dep}'" - ) - edges.add((t.label, dep, depname)) - - full_task_graph = TaskGraph( - all_tasks, Graph(frozenset(full_task_set.graph.nodes), frozenset(edges)) - ) - logger.info( - f"Full task graph contains {len(full_task_set.graph.nodes)} tasks and {len(edges)} dependencies" - ) - yield self.verify("full_task_graph", full_task_graph, graph_config, parameters) - - logger.info("Generating target task set") - target_task_set = TaskGraph( - dict(all_tasks), - Graph(frozenset(all_tasks.keys()), frozenset()), - ) - for fltr in filters: - old_len = len(target_task_set.graph.nodes) - target_tasks = set(fltr(target_task_set, parameters, graph_config)) - target_task_set = TaskGraph( - {l: all_tasks[l] for l in target_tasks}, - Graph(frozenset(target_tasks), frozenset()), - ) - logger.info( - f"Filter {fltr.__name__} pruned {old_len - len(target_tasks)} tasks ({len(target_tasks)} remain)" - ) - - yield self.verify("target_task_set", target_task_set, graph_config, parameters) - - logger.info("Generating target task graph") - # include all tasks with `always_target` set - if parameters["enable_always_target"]: - always_target_tasks = { - t.label - for t in full_task_graph.tasks.values() - if t.attributes.get("always_target") - if parameters["enable_always_target"] is True - or t.kind in parameters["enable_always_target"] - } - else: - always_target_tasks = set() - logger.info( - f"Adding {len(always_target_tasks) - len(always_target_tasks & target_tasks)} tasks with `always_target` attribute" # type: ignore - ) - requested_tasks = target_tasks | always_target_tasks # type: ignore - target_graph = full_task_graph.graph.transitive_closure(requested_tasks) - target_task_graph = TaskGraph( - {l: all_tasks[l] for l in target_graph.nodes}, - target_graph, - ) - yield self.verify( - "target_task_graph", target_task_graph, graph_config, parameters - ) - - logger.info("Generating optimized task graph") - existing_tasks = parameters.get("existing_tasks") - do_not_optimize = set(parameters.get("do_not_optimize", [])) - if not parameters.get("optimize_target_tasks", True): - do_not_optimize = set(target_task_set.graph.nodes).union(do_not_optimize) - - # this is used for testing experimental optimization strategies - strategies = os.environ.get( - "TASKGRAPH_OPTIMIZE_STRATEGIES", parameters.get("optimize_strategies") - ) - if strategies: - strategies = find_object(strategies) - - optimized_task_graph, label_to_taskid = optimize_task_graph( - target_task_graph, - requested_tasks, - parameters, - do_not_optimize, - self._decision_task_id, - existing_tasks=existing_tasks, - strategy_override=strategies, - ) - - yield self.verify( - "optimized_task_graph", optimized_task_graph, graph_config, parameters - ) - - morphed_task_graph, label_to_taskid = morph( - optimized_task_graph, label_to_taskid, parameters, graph_config - ) - - yield "label_to_taskid", label_to_taskid - yield self.verify( - "morphed_task_graph", morphed_task_graph, graph_config, parameters - ) - - def _run_until(self, name): - while name not in self._run_results: - try: - k, v = next(self._run) # type: ignore - except StopIteration: - raise AttributeError(f"No such run result {name}") - self._run_results[k] = v - return self._run_results[name] - def verify(self, name, *args, **kwargs): if self._enable_verifications: verifications(name, *args, **kwargs) diff --git a/test/test_generator.py b/test/test_generator.py index 783073b21..e86f89e74 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -38,7 +38,7 @@ def test_kind_ordering(mocker, maketgg): ("_fake1", {"kind-dependencies": []}), ] ) - tgg._run_until("full_task_set") + tgg.full_task_set assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] @@ -279,9 +279,9 @@ def _get_loader(self): ) def test_default_loader(config, expected_transforms): loader = Kind("", "", config, {})._get_loader() - assert loader is default_loader, ( - "Default Kind loader should be taskgraph.loader.default.loader" - ) + assert ( + loader is default_loader + ), "Default Kind loader should be taskgraph.loader.default.loader" loader("", "", config, {}, [], False) assert config["transforms"] == expected_transforms