Skip to content

Commit 5851916

Browse files
committed
perf: run expand_dev_sources concurrently
1 parent 126c753 commit 5851916

File tree

2 files changed

+171
-75
lines changed

2 files changed

+171
-75
lines changed

crates/pixi_command_dispatcher/src/executor.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,36 @@ pub enum Executor {
2222
}
2323

2424
pin_project_lite::pin_project! {
25+
/// A collection of futures that can be executed either concurrently or serially.
26+
///
27+
/// This type provides a unified interface for managing multiple futures with different
28+
/// execution strategies. The execution mode is determined by the [`Executor`] passed
29+
/// to [`ExecutorFutures::new`].
30+
///
31+
/// # Usage
32+
///
33+
/// Typically, you should obtain the executor from [`CommandDispatcher::executor()`]
34+
/// rather than hardcoding a specific executor:
35+
///
36+
/// ```ignore
37+
/// // Get executor from the command dispatcher
38+
/// let mut futures = ExecutorFutures::new(command_dispatcher.executor());
39+
///
40+
/// // Push futures into the collection
41+
/// for item in items {
42+
/// futures.push(process_item(item));
43+
/// }
44+
///
45+
/// // Collect results as they complete
46+
/// while let Some(result) = futures.next().await {
47+
/// // Handle result
48+
/// }
49+
/// ```
50+
///
51+
/// This ensures that:
52+
/// - Production code uses concurrent execution for better performance
53+
/// - Tests can use serial execution for deterministic behavior
54+
/// - The execution mode is configured in one place (the dispatcher builder)
2555
#[project = ExecutorFuturesProj]
2656
pub(crate) enum ExecutorFutures<Fut> {
2757
Concurrent { #[pin] futures: FuturesUnordered<Fut> },
@@ -30,6 +60,16 @@ pin_project_lite::pin_project! {
3060
}
3161

3262
impl<Fut> ExecutorFutures<Fut> {
63+
/// Creates a new `ExecutorFutures` with the specified execution strategy.
64+
///
65+
/// # Recommendation
66+
///
67+
/// Instead of hardcoding `Executor::Concurrent` or `Executor::Serial`, prefer
68+
/// obtaining the executor from [`CommandDispatcher::executor()`]:
69+
///
70+
/// ```ignore
71+
/// let mut futures = ExecutorFutures::new(command_dispatcher.executor());
72+
/// ```
3373
pub fn new(executor: Executor) -> Self {
3474
match executor {
3575
Executor::Concurrent => Self::Concurrent {
@@ -41,6 +81,11 @@ impl<Fut> ExecutorFutures<Fut> {
4181
}
4282
}
4383

84+
/// Adds a future to the collection.
85+
///
86+
/// The future will be executed according to the execution strategy:
87+
/// - `Concurrent`: The future may be polled in any order with other futures
88+
/// - `Serial`: The future will be polled in LIFO (last-in-first-out) order
4489
pub fn push(&mut self, fut: Fut) {
4590
match self {
4691
ExecutorFutures::Concurrent { futures } => futures.push(fut),

crates/pixi_command_dispatcher/src/expand_dev_sources/mod.rs

Lines changed: 126 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::{BTreeMap, HashSet};
22

3+
use futures::StreamExt;
34
use itertools::Either;
45
use miette::Diagnostic;
56
use pixi_build_discovery::EnabledProtocols;
@@ -12,6 +13,7 @@ use tracing::instrument;
1213
use crate::{
1314
BuildEnvironment, CommandDispatcher, CommandDispatcherError, CommandDispatcherErrorResultExt,
1415
GetOutputDependenciesError, GetOutputDependenciesSpec, SourceCheckoutError,
16+
executor::ExecutorFutures,
1517
};
1618

1719
/// A source package whose dependencies should be installed without building
@@ -103,8 +105,6 @@ impl ExpandDevSourcesSpec {
103105
self,
104106
command_dispatcher: CommandDispatcher,
105107
) -> Result<ExpandedDevSources, CommandDispatcherError<ExpandDevSourcesError>> {
106-
let mut result = ExpandedDevSources::default();
107-
108108
// Create a lookup set for dev_sources by output_name
109109
// We'll use this to skip dependencies that are also dev_sources
110110
// TODO: In the future, we might want to also match by source location for more
@@ -115,87 +115,138 @@ impl ExpandDevSourcesSpec {
115115
.map(|ds| ds.output_name.clone())
116116
.collect();
117117

118-
// Process each dev_source
119-
for dev_source in self.dev_sources {
120-
// Pin and checkout the source
121-
let pinned_source = command_dispatcher
122-
.pin_and_checkout(dev_source.source.clone())
123-
.await
124-
.map_err_with(|error| ExpandDevSourcesError::SourceCheckout {
125-
name: dev_source.output_name.as_source().to_string(),
126-
error,
127-
})?;
128-
129-
// Create a SourceAnchor for resolving relative paths in dependencies
130-
let source_anchor = SourceAnchor::from(SourceSpec::from(pinned_source.pinned.clone()));
131-
132-
// Get the output dependencies
133-
let spec = GetOutputDependenciesSpec {
134-
source: pinned_source.pinned,
135-
output_name: dev_source.output_name.clone(),
136-
channel_config: self.channel_config.clone(),
137-
channels: self.channels.clone(),
138-
build_environment: self.build_environment.clone(),
139-
variants: self.variants.clone(),
140-
enabled_protocols: self.enabled_protocols.clone(),
141-
};
142-
143-
let output_deps = command_dispatcher
144-
.get_output_dependencies(spec)
145-
.await
146-
.map_err_with(|error| ExpandDevSourcesError::GetOutputDependencies {
147-
name: dev_source.output_name.clone(),
148-
error,
149-
})?;
150-
151-
// Helper to process dependencies
152-
let mut process_deps = |deps: Option<DependencyMap<PackageName, PixiSpec>>| {
153-
if let Some(deps) = deps {
154-
for (name, spec) in deps.into_specs() {
155-
// Skip dependencies that are also dev_sources
156-
// TODO: Currently matching by name only. In the future, we might want to
157-
// also check if the source location matches for more precise matching.
158-
if dev_source_names.contains(&name) {
159-
continue;
160-
}
118+
// Process each dev_source concurrently
119+
let mut futures = ExecutorFutures::new(command_dispatcher.executor());
161120

162-
// Resolve relative paths for source dependencies
163-
let resolved_spec = match spec.into_source_or_binary() {
164-
Either::Left(source) => {
165-
// Resolve the source relative to the dev_source's location
166-
PixiSpec::from(source_anchor.resolve(source))
167-
}
168-
Either::Right(binary) => {
169-
// Binary specs don't need path resolution
170-
PixiSpec::from(binary)
171-
}
172-
};
173-
result.dependencies.insert(name, resolved_spec);
174-
}
175-
}
176-
};
121+
for dev_source in self.dev_sources {
122+
futures.push(process_single_dev_source(
123+
dev_source,
124+
&command_dispatcher,
125+
&self.channel_config,
126+
&self.channels,
127+
&self.build_environment,
128+
&self.variants,
129+
&self.enabled_protocols,
130+
&dev_source_names,
131+
));
132+
}
177133

178-
// Process all dependency types
179-
process_deps(output_deps.build_dependencies);
180-
process_deps(output_deps.host_dependencies);
181-
process_deps(Some(output_deps.run_dependencies));
134+
// Collect results as they complete
135+
let mut result = ExpandedDevSources::default();
136+
while let Some(dev_source_result) = futures.next().await {
137+
let (dependencies, constraints) = dev_source_result?;
182138

183-
// Merge constraints
184-
if let Some(build_constraints) = output_deps.build_constraints {
185-
for (name, spec) in build_constraints.into_specs() {
186-
result.constraints.insert(name, spec);
187-
}
139+
// Merge dependencies and constraints into the result
140+
for (name, spec) in dependencies.into_specs() {
141+
result.dependencies.insert(name, spec);
188142
}
189-
if let Some(host_constraints) = output_deps.host_constraints {
190-
for (name, spec) in host_constraints.into_specs() {
191-
result.constraints.insert(name, spec);
192-
}
193-
}
194-
for (name, spec) in output_deps.run_constraints.into_specs() {
143+
for (name, spec) in constraints.into_specs() {
195144
result.constraints.insert(name, spec);
196145
}
197146
}
198147

199148
Ok(result)
200149
}
201150
}
151+
152+
/// Process a single dev_source: checkout, get dependencies, and filter/resolve them.
153+
async fn process_single_dev_source(
154+
dev_source: DependencyOnlySource,
155+
command_dispatcher: &CommandDispatcher,
156+
channel_config: &ChannelConfig,
157+
channels: &[ChannelUrl],
158+
build_environment: &BuildEnvironment,
159+
variants: &Option<BTreeMap<String, Vec<String>>>,
160+
enabled_protocols: &EnabledProtocols,
161+
dev_source_names: &HashSet<PackageName>,
162+
) -> Result<
163+
(
164+
DependencyMap<PackageName, PixiSpec>,
165+
DependencyMap<PackageName, BinarySpec>,
166+
),
167+
CommandDispatcherError<ExpandDevSourcesError>,
168+
> {
169+
// Pin and checkout the source
170+
let pinned_source = command_dispatcher
171+
.pin_and_checkout(dev_source.source.clone())
172+
.await
173+
.map_err_with(|error| ExpandDevSourcesError::SourceCheckout {
174+
name: dev_source.output_name.as_source().to_string(),
175+
error,
176+
})?;
177+
178+
// Create a SourceAnchor for resolving relative paths in dependencies
179+
let source_anchor = SourceAnchor::from(SourceSpec::from(pinned_source.pinned.clone()));
180+
181+
// Get the output dependencies
182+
let spec = GetOutputDependenciesSpec {
183+
source: pinned_source.pinned,
184+
output_name: dev_source.output_name.clone(),
185+
channel_config: channel_config.clone(),
186+
channels: channels.to_vec(),
187+
build_environment: build_environment.clone(),
188+
variants: variants.clone(),
189+
enabled_protocols: enabled_protocols.clone(),
190+
};
191+
192+
let output_deps = command_dispatcher
193+
.get_output_dependencies(spec)
194+
.await
195+
.map_err_with(|error| ExpandDevSourcesError::GetOutputDependencies {
196+
name: dev_source.output_name.clone(),
197+
error,
198+
})?;
199+
200+
// Process dependencies
201+
let mut dependencies = DependencyMap::default();
202+
let process_deps =
203+
|deps: Option<DependencyMap<PackageName, PixiSpec>>,
204+
dependencies: &mut DependencyMap<PackageName, PixiSpec>| {
205+
if let Some(deps) = deps {
206+
for (name, spec) in deps.into_specs() {
207+
// Skip dependencies that are also dev_sources
208+
// TODO: Currently matching by name only. In the future, we might want to
209+
// also check if the source location matches for more precise matching.
210+
if dev_source_names.contains(&name) {
211+
continue;
212+
}
213+
214+
// Resolve relative paths for source dependencies
215+
let resolved_spec = match spec.into_source_or_binary() {
216+
Either::Left(source) => {
217+
// Resolve the source relative to the dev_source's location
218+
PixiSpec::from(source_anchor.resolve(source))
219+
}
220+
Either::Right(binary) => {
221+
// Binary specs don't need path resolution
222+
PixiSpec::from(binary)
223+
}
224+
};
225+
dependencies.insert(name, resolved_spec);
226+
}
227+
}
228+
};
229+
230+
// Process all dependency types
231+
process_deps(output_deps.build_dependencies, &mut dependencies);
232+
process_deps(output_deps.host_dependencies, &mut dependencies);
233+
process_deps(Some(output_deps.run_dependencies), &mut dependencies);
234+
235+
// Collect constraints
236+
let mut constraints = DependencyMap::default();
237+
if let Some(build_constraints) = output_deps.build_constraints {
238+
for (name, spec) in build_constraints.into_specs() {
239+
constraints.insert(name, spec);
240+
}
241+
}
242+
if let Some(host_constraints) = output_deps.host_constraints {
243+
for (name, spec) in host_constraints.into_specs() {
244+
constraints.insert(name, spec);
245+
}
246+
}
247+
for (name, spec) in output_deps.run_constraints.into_specs() {
248+
constraints.insert(name, spec);
249+
}
250+
251+
Ok((dependencies, constraints))
252+
}

0 commit comments

Comments
 (0)