Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/test_build_parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ if ! grep -q "cython-3.1.1: ready to build" "$log"; then
echo "Did not find message indicating build of cython would start" 1>&2
pass=false
fi
if ! grep -q "cython: requires exclusive build" "$log"; then
if ! grep -q "cython-3.1.1: requires exclusive build" "$log"; then
echo "Did not find message indicating build of cython would run on its own" 1>&2
pass=false
fi
Expand Down
173 changes: 73 additions & 100 deletions src/fromager/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ def dict_factory(x):
}


BuildSequenceEntryFuture: typing.TypeAlias = concurrent.futures.Future[
BuildSequenceEntry
]
DependencyNodeSet: typing.TypeAlias = set[dependency_graph.DependencyNode]


@click.command()
@click.option(
"--wheel-server-url",
Expand Down Expand Up @@ -550,6 +556,10 @@ def _build_parallel(
)


def _nodes_to_string(nodes: typing.Iterable[dependency_graph.DependencyNode]) -> str:
return ", ".join(sorted(node.key for node in nodes))


@click.command()
@click.option(
"-f",
Expand Down Expand Up @@ -607,123 +617,86 @@ def build_parallel(

# Load the dependency graph
logger.info("reading dependency graph from %s", graph_file)
graph: dependency_graph.DependencyGraph
graph = dependency_graph.DependencyGraph.from_file(graph_file)

# Track what has been built
built_node_keys: set[str] = set()

# Get all nodes that need to be built (excluding prebuilt ones and the root node)
# Sort the nodes to build by their key one time to avoid
# redoing the sort every iteration and to make the output deterministic.
nodes_to_build: DependencyNodeList = sorted(
(n for n in graph.nodes.values() if n.key != dependency_graph.ROOT),
key=lambda n: n.key,
logger.info("found %i packages to build", len(graph))

topo = graph.get_build_topology(context=wkctx)
topo.prepare()
logger.info(
"build topology has %i exclusive nodes: %s",
len(topo.exclusive_nodes),
_nodes_to_string(topo.exclusive_nodes),
)
logger.info(
"build topology has %i build requirements: %s",
len(topo.dependency_nodes),
_nodes_to_string(topo.dependency_nodes),
)
logger.info("found %d packages to build", len(nodes_to_build))

# A node can be built when all of its build dependencies are built
entries: list[BuildSequenceEntry] = []
future2node: dict[BuildSequenceEntryFuture, dependency_graph.DependencyNode] = {}
built_entries: list[BuildSequenceEntry] = []
rounds: int = 0

with progress.progress_context(total=len(nodes_to_build)) as progressbar:
with (
progress.progress_context(total=len(graph)) as progressbar,
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor,
):

def update_progressbar_cb(future: concurrent.futures.Future) -> None:
"""Immediately update the progress when when a task is done"""
def done_cb(future: concurrent.futures.Future) -> None:
"""Immediately update the progress and mark node as done"""
progressbar.update()

while nodes_to_build:
# Find nodes that can be built (all build dependencies are built)
buildable_nodes: DependencyNodeList = []
for node in nodes_to_build:
with req_ctxvar_context(
Requirement(node.canonicalized_name), node.version
):
# Get all build dependencies (build-system, build-backend, build-sdist)
build_deps: DependencyNodeList = [
edge.destination_node
for edge in node.children
if edge.req_type.is_build_requirement
]
# A node can be built when all of its build dependencies are built
unbuilt_deps: set[str] = set(
dep.key for dep in build_deps if dep.key not in built_node_keys
)
if not unbuilt_deps:
logger.info(
"ready to build, have all build dependencies: %s",
sorted(set(dep.key for dep in build_deps)),
)
buildable_nodes.append(node)
else:
logger.info(
"waiting for build dependencies: %s",
sorted(unbuilt_deps),
)

if not buildable_nodes:
# If we can't build anything but still have nodes, we have a cycle
remaining: list[str] = [n.key for n in nodes_to_build]
logger.info("have already built: %s", sorted(built_node_keys))
raise ValueError(f"Circular dependency detected among: {remaining}")

while topo.is_active():
rounds += 1
nodes_to_build = topo.get_available()
logger.info(
"ready to build: %s",
sorted(n.key for n in buildable_nodes),
"round %i: starting to build %i node(s): %s",
rounds,
len(nodes_to_build),
_nodes_to_string(nodes_to_build),
)

# Check if any buildable node requires exclusive build (exclusive_build == True)
exclusive_nodes: DependencyNodeList = [
node
for node in buildable_nodes
if wkctx.settings.package_build_info(
node.canonicalized_name
).exclusive_build
]
if exclusive_nodes:
# Only build the first exclusive node this round
buildable_nodes = [exclusive_nodes[0]]
logger.info(
f"{exclusive_nodes[0].canonicalized_name}: requires exclusive build, running it alone this round."
)

# Build up to max_workers nodes concurrently (or all if max_workers is None)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures: list[concurrent.futures.Future[tuple[pathlib.Path, bool]]] = []
reqs: list[Requirement] = []
logger.info(
"starting to build: %s", sorted(n.key for n in buildable_nodes)
for node in nodes_to_build:
with req_ctxvar_context(node.requirement, node.version):
if node in topo.exclusive_nodes:
logger.info("requires exclusive build")
if len(nodes_to_build) != 1:
# self-check, should never trigger
raise RuntimeError(
f"{nodes_to_build=} has exclusive node and > 1 nodes"
)

logger.info("ready to build")

future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=node.requirement,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
for node in buildable_nodes:
req = Requirement(f"{node.canonicalized_name}=={node.version}")
reqs.append(req)
future = executor.submit(
_build_parallel,
wkctx=wkctx,
resolved_version=node.version,
req=req,
source_download_url=node.download_url,
force=force,
cache_wheel_server_url=cache_wheel_server_url,
)
future.add_done_callback(update_progressbar_cb)
futures.append(future)
future.add_done_callback(done_cb)
future2node[future] = node

# Wait for all builds to complete
for node, future in zip(buildable_nodes, futures, strict=True):
# Wait for all builds to complete
for future in concurrent.futures.as_completed(future2node):
node = future2node.pop(future)
with req_ctxvar_context(node.requirement, node.version):
try:
entry = future.result()
entries.append(entry)
built_node_keys.add(node.key)
nodes_to_build.remove(node)
# progress bar is updated in callback
except Exception as e:
logger.error(f"Failed to build {node.key}: {e}")
logger.error("failed to build: %s", e)
raise
else:
# success
built_entries.append(entry)
finally:
# mark node as done, progress bar is updated in callback.
topo.done(node)

metrics.summarize(wkctx, "Building in parallel")
_summary(wkctx, entries)
_summary(wkctx, built_entries)


build_parallel._fromager_show_build_settings = True # type: ignore
33 changes: 33 additions & 0 deletions src/fromager/commands/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,36 @@ def migrate_graph(
graph.serialize(f)
else:
graph.serialize(sys.stdout)


@graph.command()
@click.argument(
"graph-file",
type=clickext.ClickPath(),
)
@click.pass_obj
def build_graph(
wkctx: context.WorkContext,
graph_file: pathlib.Path,
):
"Print build graph steps for parallel-build"
graph = DependencyGraph.from_file(graph_file)
topo = graph.get_build_topology(context=wkctx)
topo.prepare()

def n2s(nodes: typing.Iterable[DependencyNode]) -> str:
return ", ".join(sorted(node.key for node in nodes))

print(f"Build dependencies ({len(topo.dependency_nodes)}):", n2s(topo.dependency_nodes))
if topo.exclusive_nodes:
print(f"Exclusive builds ({len(topo.exclusive_nodes)}):", n2s(topo.exclusive_nodes))

print("\nBuild rounds:")
rounds: int = 0
while topo.is_active():
rounds += 1
nodes_to_build = topo.get_available()
print(f"- #{rounds}:", n2s(nodes_to_build))
topo.done(*nodes_to_build)

print(f"\nBuilding {len(graph)} packages in {rounds} rounds.")
Loading
Loading