diff --git a/e2e/test_build_parallel.sh b/e2e/test_build_parallel.sh index 9a89e0ab..6593aa49 100755 --- a/e2e/test_build_parallel.sh +++ b/e2e/test_build_parallel.sh @@ -33,6 +33,7 @@ pass=true # Build everything a first time log="$OUTDIR/build-logs/${DIST}-build.log" fromager \ + --debug \ --log-file "$log" \ --work-dir "$OUTDIR/work-dir" \ --sdists-repo "$OUTDIR/sdists-repo" \ @@ -40,11 +41,7 @@ fromager \ --settings-dir="$SCRIPTDIR/build-parallel" \ build-parallel "$OUTDIR/graph.json" -if ! grep -q "cython-3.1.1: ready to build" "$log"; then - echo "Did not find message indicating build of cython would start" 1>&2 - pass=false -fi -if ! grep -q "cython: requires exclusive build" "$log"; then +if ! grep -q "cython==3.1.1: requires exclusive build" "$log"; then echo "Did not find message indicating build of cython would run on its own" 1>&2 pass=false fi diff --git a/src/fromager/commands/build.py b/src/fromager/commands/build.py index 2f365176..00a5001c 100644 --- a/src/fromager/commands/build.py +++ b/src/fromager/commands/build.py @@ -614,124 +614,90 @@ def build_parallel( logger.info("reading dependency graph from %s", graph_file) graph: dependency_graph.DependencyGraph graph = dependency_graph.DependencyGraph.from_file(graph_file) + logger.info("found %d packages to build", len(graph.nodes) - 1) + built_entries: list[BuildSequenceEntry] = [] + topo = graph.get_topological_sorter() + + exclusive_nodes: set[str] = { + node.key + for node in graph.get_all_nodes() + if node.key != dependency_graph.ROOT + and wkctx.settings.package_build_info(node.canonicalized_name).exclusive_build + } + + # Isolate the logic for building some nodes in parallel so we can + # reuse it multiple times in the loop below. + def _build_some_nodes( + nodes_to_build: list[dependency_graph.DependencyNode], + ) -> None: + futures: list[concurrent.futures.Future[BuildSequenceEntry]] = [] + reqs: list[Requirement] = [] + for node in nodes_to_build: + req = Requirement(f"{node.canonicalized_name}=={node.version}") + reqs.append(req) + future = executor.submit( + _build_parallel, + wkctx=wkctx, + resolved_version=node.version, + req=req, + source_download_url=node.download_url, + force=force, + cache_wheel_server_url=cache_wheel_server_url, + ) + future.add_done_callback(update_progressbar_cb) + futures.append(future) + + # Wait for all builds to complete + for node, future in zip(nodes_to_build, futures, strict=True): + try: + entry = future.result() + built_entries.append(entry) + # progress bar is updated in callback + except Exception as e: + logger.error(f"Failed to build {node.key}: {e}") + raise - # Track what has been built - built_node_keys: set[str] = set() - - # Get all nodes that need to be built (excluding prebuilt ones and the root node) - # Sort the nodes to build by their key one time to avoid - # redoing the sort every iteration and to make the output deterministic. - nodes_to_build: DependencyNodeList = sorted( - (n for n in graph.nodes.values() if n.key != dependency_graph.ROOT), - key=lambda n: n.key, - ) - logger.info("found %d packages to build", len(nodes_to_build)) - - # A node can be built when all of its build dependencies are built - entries: list[BuildSequenceEntry] = [] + # invalidate uv cache + wkctx.uv_clean_cache(*reqs) - with progress.progress_context(total=len(nodes_to_build)) as progressbar: + with ( + progress.progress_context(total=len(graph.nodes) - 1) as progressbar, + concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor, + ): def update_progressbar_cb(future: concurrent.futures.Future) -> None: """Immediately update the progress when when a task is done""" progressbar.update() - while nodes_to_build: - # Find nodes that can be built (all build dependencies are built) - buildable_nodes: DependencyNodeList = [] - for node in nodes_to_build: - with req_ctxvar_context( - Requirement(node.canonicalized_name), node.version - ): - # Get all build dependencies (build-system, build-backend, build-sdist) - build_deps: DependencyNodeList = [ - edge.destination_node - for edge in node.children - if edge.req_type.is_build_requirement - ] - # A node can be built when all of its build dependencies are built - unbuilt_deps: set[str] = set( - dep.key for dep in build_deps if dep.key not in built_node_keys - ) - if not unbuilt_deps: - logger.info( - "ready to build, have all build dependencies: %s", - sorted(set(dep.key for dep in build_deps)), - ) - buildable_nodes.append(node) - else: - logger.info( - "waiting for build dependencies: %s", - sorted(unbuilt_deps), - ) - - if not buildable_nodes: - # If we can't build anything but still have nodes, we have a cycle - remaining: list[str] = [n.key for n in nodes_to_build] - logger.info("have already built: %s", sorted(built_node_keys)) - raise ValueError(f"Circular dependency detected among: {remaining}") - - logger.info( - "ready to build: %s", - sorted(n.key for n in buildable_nodes), + while topo.is_active(): + # We get all of the ready nodes, and then split them into exclusive + # and non-exclusive nodes because get_ready() does not return the + # same value twice and we want to ensure we build all of the nodes + # but we have to build the exclusive nodes on their own. + buildable_nodes: list[dependency_graph.DependencyNode] = list( + topo.get_ready() ) - - # Check if any buildable node requires exclusive build (exclusive_build == True) - exclusive_nodes: DependencyNodeList = [ - node - for node in buildable_nodes - if wkctx.settings.package_build_info( - node.canonicalized_name - ).exclusive_build - ] - if exclusive_nodes: - # Only build the first exclusive node this round - buildable_nodes = [exclusive_nodes[0]] - logger.info( - f"{exclusive_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round." - ) - - # Build up to max_workers nodes concurrently (or all if max_workers is None) - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) as executor: - futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = [] - reqs: list[Requirement] = [] + non_exclusive_nodes: list[dependency_graph.DependencyNode] = [] + for node in buildable_nodes: + if node.key in exclusive_nodes: + logger.info( + f"{node.key}: requires exclusive build, running it alone" + ) + _build_some_nodes([node]) + else: + non_exclusive_nodes.append(node) + if non_exclusive_nodes: logger.info( - "starting to build: %s", sorted(n.key for n in buildable_nodes) + "ready to build: %s", sorted(n.key for n in non_exclusive_nodes) ) - for node in buildable_nodes: - req = Requirement(f"{node.canonicalized_name}=={node.version}") - reqs.append(req) - future = executor.submit( - _build_parallel, - wkctx=wkctx, - resolved_version=node.version, - req=req, - source_download_url=node.download_url, - force=force, - cache_wheel_server_url=cache_wheel_server_url, - ) - future.add_done_callback(update_progressbar_cb) - futures.append(future) - - # Wait for all builds to complete - for node, future in zip(buildable_nodes, futures, strict=True): - try: - entry = future.result() - entries.append(entry) - built_node_keys.add(node.key) - nodes_to_build.remove(node) - # progress bar is updated in callback - except Exception as e: - logger.error(f"Failed to build {node.key}: {e}") - raise - - # invalidate uv cache - wkctx.uv_clean_cache(*reqs) + _build_some_nodes(non_exclusive_nodes) + # Now mark all of the nodes for this iteration as done so the things + # that depend on them can be built. + for node in buildable_nodes: + topo.done(node) metrics.summarize(wkctx, "Building in parallel") - _summary(wkctx, entries) + _summary(wkctx, built_entries) build_parallel._fromager_show_build_settings = True # type: ignore diff --git a/src/fromager/dependency_graph.py b/src/fromager/dependency_graph.py index af3043d9..cbbd27c7 100644 --- a/src/fromager/dependency_graph.py +++ b/src/fromager/dependency_graph.py @@ -1,3 +1,4 @@ +import graphlib import json import logging import pathlib @@ -296,3 +297,20 @@ def _depth_first_traversal( yield from self._depth_first_traversal( edge.destination_node.children, visited, match_dep_types ) + + def get_topological_sorter(self) -> graphlib.TopologicalSorter[DependencyNode]: + """Returns a topological sorter for the build graph. + + For simplicity, we treat all dependencies as build dependencies. This + ensures that the installation dependencies of actual build dependencies + are built before something tries to install the build dependency. + """ + sorter: graphlib.TopologicalSorter[DependencyNode] = ( + graphlib.TopologicalSorter() + ) + for node in self.get_all_nodes(): + if node.key == ROOT: + continue + sorter.add(node, *(edge.destination_node for edge in node.children)) + sorter.prepare() + return sorter