Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions e2e/test_build_parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,15 @@ pass=true
# Build everything a first time
log="$OUTDIR/build-logs/${DIST}-build.log"
fromager \
--debug \
--log-file "$log" \
--work-dir "$OUTDIR/work-dir" \
--sdists-repo "$OUTDIR/sdists-repo" \
--wheels-repo "$OUTDIR/wheels-repo" \
--settings-dir="$SCRIPTDIR/build-parallel" \
build-parallel "$OUTDIR/graph.json"

if ! grep -q "cython-3.1.1: ready to build" "$log"; then
echo "Did not find message indicating build of cython would start" 1>&2
pass=false
fi
if ! grep -q "cython: requires exclusive build" "$log"; then
if ! grep -q "cython==3.1.1: requires exclusive build" "$log"; then
echo "Did not find message indicating build of cython would run on its own" 1>&2
pass=false
fi
Expand Down
178 changes: 72 additions & 106 deletions src/fromager/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,124 +614,90 @@ def build_parallel(
logger.info("reading dependency graph from %s", graph_file)
graph: dependency_graph.DependencyGraph
graph = dependency_graph.DependencyGraph.from_file(graph_file)
logger.info("found %d packages to build", len(graph.nodes) - 1)
built_entries: list[BuildSequenceEntry] = []
topo = graph.get_topological_sorter()

exclusive_nodes: set[str] = {
node.key
for node in graph.get_all_nodes()
if node.key != dependency_graph.ROOT
and wkctx.settings.package_build_info(node.canonicalized_name).exclusive_build
}

# Isolate the logic for building some nodes in parallel so we can
# reuse it multiple times in the loop below.
def _build_some_nodes(
nodes_to_build: list[dependency_graph.DependencyNode],
) -> None:
futures: list[concurrent.futures.Future[BuildSequenceEntry]] = []
reqs: list[Requirement] = []
for node in nodes_to_build:
req = Requirement(f"{node.canonicalized_name}=={node.version}")
reqs.append(req)
future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=req,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
future.add_done_callback(update_progressbar_cb)
futures.append(future)

# Wait for all builds to complete
for node, future in zip(nodes_to_build, futures, strict=True):
try:
entry = future.result()
built_entries.append(entry)
# progress bar is updated in callback
except Exception as e:
logger.error(f"Failed to build {node.key}: {e}")
raise

# Track what has been built
built_node_keys: set[str] = set()

# Get all nodes that need to be built (excluding prebuilt ones and the root node)
# Sort the nodes to build by their key one time to avoid
# redoing the sort every iteration and to make the output deterministic.
nodes_to_build: DependencyNodeList = sorted(
(n for n in graph.nodes.values() if n.key != dependency_graph.ROOT),
key=lambda n: n.key,
)
logger.info("found %d packages to build", len(nodes_to_build))

# A node can be built when all of its build dependencies are built
entries: list[BuildSequenceEntry] = []
# invalidate uv cache
wkctx.uv_clean_cache(*reqs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uv has published a new version that supports concurrent use of uv cache clean and uv pip install. With the new version we can move the cleanup into the build wheel process again and simplify this code.


with progress.progress_context(total=len(nodes_to_build)) as progressbar:
with (
progress.progress_context(total=len(graph.nodes) - 1) as progressbar,
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor,
):

def update_progressbar_cb(future: concurrent.futures.Future) -> None:
"""Immediately update the progress when when a task is done"""
progressbar.update()

while nodes_to_build:
# Find nodes that can be built (all build dependencies are built)
buildable_nodes: DependencyNodeList = []
for node in nodes_to_build:
with req_ctxvar_context(
Requirement(node.canonicalized_name), node.version
):
# Get all build dependencies (build-system, build-backend, build-sdist)
build_deps: DependencyNodeList = [
edge.destination_node
for edge in node.children
if edge.req_type.is_build_requirement
]
# A node can be built when all of its build dependencies are built
unbuilt_deps: set[str] = set(
dep.key for dep in build_deps if dep.key not in built_node_keys
)
if not unbuilt_deps:
logger.info(
"ready to build, have all build dependencies: %s",
sorted(set(dep.key for dep in build_deps)),
)
buildable_nodes.append(node)
else:
logger.info(
"waiting for build dependencies: %s",
sorted(unbuilt_deps),
)

if not buildable_nodes:
# If we can't build anything but still have nodes, we have a cycle
remaining: list[str] = [n.key for n in nodes_to_build]
logger.info("have already built: %s", sorted(built_node_keys))
raise ValueError(f"Circular dependency detected among: {remaining}")

logger.info(
"ready to build: %s",
sorted(n.key for n in buildable_nodes),
while topo.is_active():
# We get all of the ready nodes, and then split them into exclusive
# and non-exclusive nodes because get_ready() does not return the
# same value twice and we want to ensure we build all of the nodes
# but we have to build the exclusive nodes on their own.
buildable_nodes: list[dependency_graph.DependencyNode] = list(
topo.get_ready()
)

# Check if any buildable node requires exclusive build (exclusive_build == True)
exclusive_nodes: DependencyNodeList = [
node
for node in buildable_nodes
if wkctx.settings.package_build_info(
node.canonicalized_name
).exclusive_build
]
if exclusive_nodes:
# Only build the first exclusive node this round
buildable_nodes = [exclusive_nodes[0]]
logger.info(
f"{exclusive_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round."
)

# Build up to max_workers nodes concurrently (or all if max_workers is None)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = []
reqs: list[Requirement] = []
non_exclusive_nodes: list[dependency_graph.DependencyNode] = []
for node in buildable_nodes:
if node.key in exclusive_nodes:
logger.info(
f"{node.key}: requires exclusive build, running it alone"
)
_build_some_nodes([node])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about you call _build_parallel() here? You don't need the pool worker to build a single node in serial.

else:
non_exclusive_nodes.append(node)
if non_exclusive_nodes:
logger.info(
"starting to build: %s", sorted(n.key for n in buildable_nodes)
"ready to build: %s", sorted(n.key for n in non_exclusive_nodes)
)
for node in buildable_nodes:
req = Requirement(f"{node.canonicalized_name}=={node.version}")
reqs.append(req)
future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=req,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
future.add_done_callback(update_progressbar_cb)
futures.append(future)

# Wait for all builds to complete
for node, future in zip(buildable_nodes, futures, strict=True):
try:
entry = future.result()
entries.append(entry)
built_node_keys.add(node.key)
nodes_to_build.remove(node)
# progress bar is updated in callback
except Exception as e:
logger.error(f"Failed to build {node.key}: {e}")
raise

# invalidate uv cache
wkctx.uv_clean_cache(*reqs)
_build_some_nodes(non_exclusive_nodes)
# Now mark all of the nodes for this iteration as done so the things
# that depend on them can be built.
for node in buildable_nodes:
topo.done(node)

metrics.summarize(wkctx, "Building in parallel")
_summary(wkctx, entries)
_summary(wkctx, built_entries)


build_parallel._fromager_show_build_settings = True # type: ignore
18 changes: 18 additions & 0 deletions src/fromager/dependency_graph.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import graphlib
import json
import logging
import pathlib
Expand Down Expand Up @@ -296,3 +297,20 @@ def _depth_first_traversal(
yield from self._depth_first_traversal(
edge.destination_node.children, visited, match_dep_types
)

def get_topological_sorter(self) -> graphlib.TopologicalSorter[DependencyNode]:
"""Returns a topological sorter for the build graph.

For simplicity, we treat all dependencies as build dependencies. This
ensures that the installation dependencies of actual build dependencies
are built before something tries to install the build dependency.
"""
sorter: graphlib.TopologicalSorter[DependencyNode] = (
graphlib.TopologicalSorter()
)
Comment on lines +308 to +310
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this give correct values at all? Graphlib requires hashable input type. The version of DependencyNode in this PR does neither implement __eq__ nor __hash__. My PR #763 changes the classes to frozen, hashable dataclasses to get correct behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have split off the dataclass changes from PR 763 and created #780 .

for node in self.get_all_nodes():
if node.key == ROOT:
continue
sorter.add(node, *(edge.destination_node for edge in node.children))
sorter.prepare()
return sorter
Loading