Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/pixi_build_backend_passthrough/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl InMemoryBackend for PassthroughBackend {
fn conda_outputs(
&self,
params: CondaOutputsParams,
_output_stream: &(dyn BackendOutputStream + Send + 'static),
) -> Result<CondaOutputsResult, Box<CommunicationError>> {
Ok(CondaOutputsResult {
outputs: vec![CondaOutput {
Expand Down
1 change: 1 addition & 0 deletions crates/pixi_build_frontend/src/backend/in_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub trait InMemoryBackend: Send {
fn conda_outputs(
&self,
params: CondaOutputsParams,
output_stream: &(dyn BackendOutputStream + Send + 'static),
) -> Result<CondaOutputsResult, Box<CommunicationError>> {
unimplemented!()
}
Expand Down
7 changes: 4 additions & 3 deletions crates/pixi_build_frontend/src/backend/json_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::{
sync::{Mutex, oneshot},
};

use super::stderr::{stderr_buffer, stream_stderr};
use super::stderr::stream_stderr;
use crate::{
backend::BackendOutputStream,
error::BackendError,
Expand Down Expand Up @@ -334,16 +334,17 @@ impl JsonRpcBackend {
}

/// Call the `conda/outputs` method on the backend.
pub async fn conda_outputs(
pub async fn conda_outputs<W: BackendOutputStream + Send + 'static>(
&self,
request: CondaOutputsParams,
output_stream: W,
) -> Result<CondaOutputsResult, CommunicationError> {
// Capture all of stderr and discard it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment needs to be changed as we dont discard it anymore?

let stderr = self.stderr.as_ref().map(|stderr| {
// Cancellation signal
let (cancel_tx, cancel_rx) = oneshot::channel();
// Spawn the stderr forwarding task
let handle = tokio::spawn(stderr_buffer(stderr.clone(), cancel_rx));
let handle = tokio::spawn(stream_stderr(stderr.clone(), cancel_rx, output_stream));
(cancel_tx, handle)
});

Expand Down
11 changes: 7 additions & 4 deletions crates/pixi_build_frontend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,22 @@ impl Backend {
}

/// Returns the outputs that this backend can produce.
pub async fn conda_outputs(
pub async fn conda_outputs<W: BackendOutputStream + Send + 'static>(
&self,
params: CondaOutputsParams,
output_stream: W,
) -> Result<CondaOutputsResult, CommunicationError> {
assert!(
self.inner.capabilities().provides_conda_outputs(),
"This backend does not support the conda outputs procedure"
);
match &self.inner {
BackendImplementation::JsonRpc(json_rpc) => json_rpc.conda_outputs(params).await,
BackendImplementation::InMemory(in_memory) => {
in_memory.conda_outputs(params).map_err(|e| *e)
BackendImplementation::JsonRpc(json_rpc) => {
json_rpc.conda_outputs(params, output_stream).await
}
BackendImplementation::InMemory(in_memory) => in_memory
.conda_outputs(params, &output_stream)
.map_err(|e| *e),
}
}
}
Expand Down
29 changes: 0 additions & 29 deletions crates/pixi_build_frontend/src/backend/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,6 @@ use tokio::{
sync::{Mutex, oneshot},
};

/// Stderr stream that captures the stderr output of the backend and stores it
/// in a buffer for later use.
pub(crate) async fn stderr_buffer(
buffer: Arc<Mutex<Lines<BufReader<ChildStderr>>>>,
cancel: oneshot::Receiver<()>,
) -> Result<String, std::io::Error> {
// Create a future that continuously read from the buffer and stores the lines
// until all data is received.
let mut lines = Vec::new();
let read_and_buffer = async {
let mut buffer = buffer.lock().await;
while let Some(line) = buffer.next_line().await? {
lines.push(line);
}
Ok(lines.join("\n"))
};

// Either wait until the cancel signal is received or the `read_and_buffer`
// finishes which means there is no more data to read.
tokio::select! {
_ = cancel => {
Ok(lines.join("\n"))
}
result = read_and_buffer => {
result
}
}
}

/// Stderr stream that captures the stderr output of the backend and stores it
/// in a buffer for later use.
pub(crate) async fn stream_stderr<W: BackendOutputStream>(
Expand Down
21 changes: 14 additions & 7 deletions crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
hash::{Hash, Hasher},
path::PathBuf,
sync::Mutex,
};

use futures::{SinkExt, channel::mpsc::UnboundedSender};
use miette::Diagnostic;
use once_cell::sync::Lazy;
use pathdiff::diff_paths;
Expand All @@ -9,12 +17,6 @@ use pixi_record::{InputHash, PinnedSourceSpec};
use pixi_spec::{SourceAnchor, SourceSpec};
use rand::random;
use rattler_conda_types::{ChannelConfig, ChannelUrl};
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
hash::{Hash, Hasher},
path::PathBuf,
sync::Mutex,
};
use thiserror::Error;
use tracing::instrument;
use xxhash_rust::xxh3::Xxh3;
Expand Down Expand Up @@ -100,6 +102,7 @@ impl BuildBackendMetadataSpec {
pub(crate) async fn request(
self,
command_dispatcher: CommandDispatcher,
log_sink: UnboundedSender<String>,
) -> Result<BuildBackendMetadata, CommandDispatcherError<BuildBackendMetadataError>> {
// Ensure that the source is checked out before proceeding.
let source_checkout = command_dispatcher
Expand Down Expand Up @@ -195,6 +198,7 @@ impl BuildBackendMetadataSpec {
source_checkout,
backend,
additional_glob_hash,
log_sink,
)
.await?;

Expand Down Expand Up @@ -318,6 +322,7 @@ impl BuildBackendMetadataSpec {
source_checkout: SourceCheckout,
backend: Backend,
additional_glob_hash: Vec<u8>,
mut log_sink: UnboundedSender<String>,
) -> Result<CachedCondaMetadata, CommandDispatcherError<BuildBackendMetadataError>> {
let backend_identifier = backend.identifier().to_string();
let params = CondaOutputsParams {
Expand All @@ -338,7 +343,9 @@ impl BuildBackendMetadataSpec {
),
};
let outputs = backend
.conda_outputs(params)
.conda_outputs(params, move |line| {
let _err = futures::executor::block_on(log_sink.send(line));
})
.await
.map_err(BuildBackendMetadataError::Communication)
.map_err(CommandDispatcherError::Failed)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::hash_map::Entry, sync::Arc};

use futures::FutureExt;
use futures::{FutureExt, channel::mpsc::UnboundedSender};
use tokio_util::sync::CancellationToken;

use super::{CommandDispatcherProcessor, PendingDeduplicatingTask, TaskResult};
Expand Down Expand Up @@ -68,19 +68,23 @@ impl CommandDispatcherProcessor {
.insert(source_metadata_id, reporter_id);
}

// Open a channel to receive build output.
let (tx, rx) = futures::channel::mpsc::unbounded();

if let Some((reporter, reporter_id)) = self
.reporter
.as_deref_mut()
.and_then(Reporter::as_build_backend_metadata_reporter)
.zip(reporter_id)
{
reporter.on_started(reporter_id)
reporter.on_started(reporter_id, Box::new(rx))
}

self.queue_build_backend_metadata_task(
source_metadata_id,
task.spec,
task.cancellation_token,
tx,
);
}
}
Expand All @@ -92,13 +96,16 @@ impl CommandDispatcherProcessor {
build_backend_metadata_id: BuildBackendMetadataId,
spec: BuildBackendMetadataSpec,
cancellation_token: CancellationToken,
log_sink: UnboundedSender<String>,
) {
let dispatcher = self.create_task_command_dispatcher(
CommandDispatcherContext::BuildBackendMetadata(build_backend_metadata_id),
);

// Open a channel to receive build output.
self.pending_futures.push(
cancellation_token
.run_until_cancelled_owned(spec.request(dispatcher))
.run_until_cancelled_owned(spec.request(dispatcher, log_sink))
.map(move |result| {
TaskResult::BuildBackendMetadata(
build_backend_metadata_id,
Expand Down Expand Up @@ -133,7 +140,8 @@ impl CommandDispatcherProcessor {
.and_then(Reporter::as_build_backend_metadata_reporter)
.zip(self.build_backend_metadata_reporters.remove(&id))
{
reporter.on_finished(reporter_id);
let failed = result.is_err();
reporter.on_finished(reporter_id, failed);
}

self.build_backend_metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::hash_map::Entry;
use std::{collections::hash_map::Entry, sync::Arc};

use futures::FutureExt;
use tokio_util::sync::CancellationToken;
Expand All @@ -8,6 +8,7 @@ use crate::{
CommandDispatcherError, Reporter, SourceBuildError, SourceBuildResult, SourceBuildSpec,
command_dispatcher::{CommandDispatcherContext, SourceBuildId, SourceBuildTask},
};
use rattler_repodata_gateway::RunExportsReporter;

impl CommandDispatcherProcessor {
/// Constructs a new [`SourceBuildId`] for the given `task`.
Expand Down Expand Up @@ -61,15 +62,6 @@ impl CommandDispatcherProcessor {
.insert(source_build_id, reporter_id);
}

if let Some((reporter, reporter_id)) = self
.reporter
.as_deref_mut()
.and_then(Reporter::as_source_build_reporter)
.zip(reporter_id)
{
reporter.on_started(reporter_id)
}

self.queue_source_build_task(source_build_id, task.spec, task.cancellation_token);
}
}
Expand All @@ -82,19 +74,27 @@ impl CommandDispatcherProcessor {
spec: SourceBuildSpec,
cancellation_token: CancellationToken,
) {
let dispatcher = self
.create_task_command_dispatcher(CommandDispatcherContext::SourceBuild(source_build_id));

let dispatcher_context = CommandDispatcherContext::SourceBuild(source_build_id);
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);

let reporter_context = self.reporter_context(dispatcher_context);
let run_exports_reporter = self
.reporter
.as_mut()
.and_then(|reporter| reporter.create_run_exports_reporter(reporter_context));
let (tx, rx) = futures::channel::mpsc::unbounded::<String>();

let mut run_exports_reporter: Option<Arc<dyn RunExportsReporter>> = None;
if let Some(reporter) = self.reporter.as_mut() {
let created = reporter.create_run_exports_reporter(reporter_context);
if let Some((source_reporter, reporter_id)) = reporter
.as_source_build_reporter()
.zip(self.source_build_reporters.get(&source_build_id).copied())
{
source_reporter.on_started(reporter_id, Box::new(rx), created.clone());
}
run_exports_reporter = created;
}

self.pending_futures.push(
cancellation_token
.run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter))
.run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter.clone(), tx))
.map(move |result| {
TaskResult::SourceBuild(
source_build_id,
Expand Down Expand Up @@ -122,7 +122,8 @@ impl CommandDispatcherProcessor {
.and_then(Reporter::as_source_build_reporter)
.zip(self.source_build_reporters.remove(&id))
{
reporter.on_finished(reporter_id);
let failed = result.as_ref().is_err();
reporter.on_finished(reporter_id, failed);
}

self.source_build
Expand Down
17 changes: 13 additions & 4 deletions crates/pixi_command_dispatcher/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,14 @@ pub trait BuildBackendMetadataReporter {
) -> BuildBackendMetadataId;

/// Called when the operation has started.
fn on_started(&mut self, id: BuildBackendMetadataId);
fn on_started(
&mut self,
id: BuildBackendMetadataId,
backend_output_stream: Box<dyn Stream<Item = String> + Unpin + Send>,
);

/// Called when the operation has finished.
fn on_finished(&mut self, id: BuildBackendMetadataId);
fn on_finished(&mut self, id: BuildBackendMetadataId, failed: bool);
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
Expand Down Expand Up @@ -183,10 +187,15 @@ pub trait SourceBuildReporter {
) -> SourceBuildId;

/// Called when the operation has started.
fn on_started(&mut self, id: SourceBuildId);
fn on_started(
&mut self,
id: SourceBuildId,
backend_output_stream: Box<dyn Stream<Item = String> + Unpin + Send>,
run_exports_reporter: Option<Arc<dyn RunExportsReporter>>,
);

/// Called when the operation has finished.
fn on_finished(&mut self, id: SourceBuildId);
fn on_finished(&mut self, id: SourceBuildId, failed: bool);
}

/// A trait that is used to report the progress of a source build performed by
Expand Down
25 changes: 17 additions & 8 deletions crates/pixi_command_dispatcher/src/source_build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
sync::Arc,
};

use futures::{SinkExt, channel::mpsc::UnboundedSender};
use miette::Diagnostic;
use pixi_build_discovery::EnabledProtocols;
use pixi_build_frontend::Backend;
Expand Down Expand Up @@ -127,6 +128,7 @@ impl SourceBuildSpec {
mut self,
command_dispatcher: CommandDispatcher,
reporter: Option<Arc<dyn RunExportsReporter>>,
log_sink: UnboundedSender<String>,
) -> Result<SourceBuildResult, CommandDispatcherError<SourceBuildError>> {
// If the output directory is not set, we want to use the build cache. Read the
// build cache in that case.
Expand Down Expand Up @@ -286,6 +288,7 @@ impl SourceBuildSpec {
work_directory,
package_build_input_hash,
reporter,
log_sink,
)
.await?;

Expand Down Expand Up @@ -405,6 +408,7 @@ impl SourceBuildSpec {
work_directory: PathBuf,
package_build_input_hash: PackageBuildInputHash,
reporter: Option<Arc<dyn RunExportsReporter>>,
mut log_sink: UnboundedSender<String>,
) -> Result<BuiltPackage, CommandDispatcherError<SourceBuildError>> {
let source_anchor = SourceAnchor::from(SourceSpec::from(self.source.clone()));
let host_platform = self.build_environment.host_platform;
Expand All @@ -413,14 +417,19 @@ impl SourceBuildSpec {
// Request the metadata from the backend.
// TODO: Can we somehow cache this metadata?
let outputs = backend
.conda_outputs(CondaOutputsParams {
host_platform,
build_platform,
variant_configuration: self.variants.clone(),
variant_files: self.variant_files.clone(),
work_directory: work_directory.clone(),
channels: self.channels.clone(),
})
.conda_outputs(
CondaOutputsParams {
host_platform,
build_platform,
variant_configuration: self.variants.clone(),
variant_files: self.variant_files.clone(),
work_directory: work_directory.clone(),
channels: self.channels.clone(),
},
move |line| {
let _err = futures::executor::block_on(log_sink.send(line));
},
)
.await
.map_err(BackendSourceBuildError::BuildError)
.map_err(SourceBuildError::from)
Expand Down
Loading
Loading