Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/test_build_parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fromager \
--settings-dir="$SCRIPTDIR/build-parallel" \
build-parallel "$OUTDIR/graph.json"

if ! grep -q "cython-3.1.1: ready to build" "$log"; then
if ! grep -q "ready to build: cython==3.1.1" "$log"; then
echo "Did not find message indicating build of cython would start" 1>&2
pass=false
fi
Expand Down
180 changes: 79 additions & 101 deletions src/fromager/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
logger = logging.getLogger(__name__)

DependencyNodeList = list[dependency_graph.DependencyNode]
DependencyNodeSet = set[dependency_graph.DependencyNode]


@dataclasses.dataclass(order=True, frozen=True)
Expand Down Expand Up @@ -612,126 +613,103 @@ def build_parallel(

# Load the dependency graph
logger.info("reading dependency graph from %s", graph_file)
graph: dependency_graph.DependencyGraph
graph = dependency_graph.DependencyGraph.from_file(graph_file)

# Track what has been built
built_node_keys: set[str] = set()
# topological sort graph of node -> direct build dependencies
topo = graph.get_build_topology()

# Get all nodes that need to be built (excluding prebuilt ones and the root node)
# Sort the nodes to build by their key one time to avoid
# redoing the sort every iteration and to make the output deterministic.
nodes_to_build: DependencyNodeList = sorted(
(n for n in graph.nodes.values() if n.key != dependency_graph.ROOT),
key=lambda n: n.key,
)
logger.info("found %d packages to build", len(nodes_to_build))
# size and exclusive nodes
topo_size: int = 0
exclusive_nodes: DependencyNodeSet = set()
for node in graph.get_all_nodes():
if node.key == dependency_graph.ROOT:
continue
topo_size += 1
if wkctx.settings.package_build_info(node.canonicalized_name).exclusive_build:
exclusive_nodes.add(node)

# A node can be built when all of its build dependencies are built
entries: list[BuildSequenceEntry] = []
logger.info("found %d packages to build", topo_size)

# Track node keys ready to build, `get_ready` does not return the same node twice
ready_nodes: DependencyNodeSet = set()
ready_exclusive_nodes: DependencyNodeSet
working_nodes: DependencyNodeList

with progress.progress_context(total=len(nodes_to_build)) as progressbar:
built_entries: list[BuildSequenceEntry] = []
rounds: int = 0

# Build up to max_workers nodes concurrently (or all if max_workers is None)
with (
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor,
progress.progress_context(total=topo_size) as progressbar,
):

def update_progressbar_cb(future: concurrent.futures.Future) -> None:
"""Immediately update the progress when when a task is done"""
progressbar.update()

while nodes_to_build:
# Find nodes that can be built (all build dependencies are built)
buildable_nodes: DependencyNodeList = []
for node in nodes_to_build:
with req_ctxvar_context(
Requirement(node.canonicalized_name), node.version
):
# Get all build dependencies (build-system, build-backend, build-sdist)
build_deps: DependencyNodeList = [
edge.destination_node
for edge in node.children
if edge.req_type.is_build_requirement
]
# A node can be built when all of its build dependencies are built
unbuilt_deps: set[str] = set(
dep.key for dep in build_deps if dep.key not in built_node_keys
)
if not unbuilt_deps:
logger.info(
"ready to build, have all build dependencies: %s",
sorted(set(dep.key for dep in build_deps)),
)
buildable_nodes.append(node)
else:
logger.info(
"waiting for build dependencies: %s",
sorted(unbuilt_deps),
)

if not buildable_nodes:
# If we can't build anything but still have nodes, we have a cycle
remaining: list[str] = [n.key for n in nodes_to_build]
logger.info("have already built: %s", sorted(built_node_keys))
raise ValueError(f"Circular dependency detected among: {remaining}")

logger.info(
"ready to build: %s",
sorted(n.key for n in buildable_nodes),
)
while topo.is_active():
rounds += 1
# Get new available nodes ready to build
ready_nodes.update(topo.get_ready())

# Check if any buildable node requires exclusive build (exclusive_build == True)
exclusive_nodes: DependencyNodeList = [
node
for node in buildable_nodes
if wkctx.settings.package_build_info(
node.canonicalized_name
).exclusive_build
]
if exclusive_nodes:
# Only build the first exclusive node this round
buildable_nodes = [exclusive_nodes[0]]
ready_exclusive_nodes = sorted(ready_nodes.intersection(exclusive_nodes))
if ready_exclusive_nodes:
# Only build the first exclusive node this round. The other
# nodes are kept in ready_node_keys and not marked as 'done'.
working_nodes = [ready_exclusive_nodes[0]]
logger.info(
f"{exclusive_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round."
f"{working_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round."
)
else:
working_nodes = sorted(ready_nodes)

logger.info(
"round #%i, ready to build: %s",
rounds,
", ".join(n.key for n in working_nodes),
)

# Build up to max_workers nodes concurrently (or all if max_workers is None)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = []
reqs: list[Requirement] = []
logger.info(
"starting to build: %s", sorted(n.key for n in buildable_nodes)
futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = []
reqs: list[Requirement] = []

for node in working_nodes:
req = Requirement(f"{node.canonicalized_name}=={node.version}")
reqs.append(req)
future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=req,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
for node in buildable_nodes:
req = Requirement(f"{node.canonicalized_name}=={node.version}")
reqs.append(req)
future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=req,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
future.add_done_callback(update_progressbar_cb)
futures.append(future)

# Wait for all builds to complete
for node, future in zip(buildable_nodes, futures, strict=True):
try:
entry = future.result()
entries.append(entry)
built_node_keys.add(node.key)
nodes_to_build.remove(node)
# progress bar is updated in callback
except Exception as e:
logger.error(f"Failed to build {node.key}: {e}")
raise

# invalidate uv cache
wkctx.uv_clean_cache(*reqs)
future.add_done_callback(update_progressbar_cb)
futures.append(future)

# Wait for all builds to complete
for node, future in zip(working_nodes, futures, strict=True):
try:
entry = future.result()
except Exception as e:
logger.error(f"Failed to build {node.key}: {e}")
raise
else:
built_entries.append(entry)
# mark node as done
logger.info("%s: done", node.key)
topo.done(node)
ready_nodes.remove(node)
# progress bar is updated in callback

# invalidate uv cache
wkctx.uv_clean_cache(*reqs)

metrics.summarize(wkctx, "Building in parallel")
_summary(wkctx, entries)
_summary(wkctx, built_entries)


build_parallel._fromager_show_build_settings = True # type: ignore
Loading
Loading