diff --git a/crates/pixi_build_backend_passthrough/src/lib.rs b/crates/pixi_build_backend_passthrough/src/lib.rs index 045aada3b5..651eed79c3 100644 --- a/crates/pixi_build_backend_passthrough/src/lib.rs +++ b/crates/pixi_build_backend_passthrough/src/lib.rs @@ -66,6 +66,7 @@ impl InMemoryBackend for PassthroughBackend { fn conda_outputs( &self, params: CondaOutputsParams, + _output_stream: &(dyn BackendOutputStream + Send + 'static), ) -> Result> { Ok(CondaOutputsResult { outputs: vec![CondaOutput { diff --git a/crates/pixi_build_frontend/src/backend/in_memory/mod.rs b/crates/pixi_build_frontend/src/backend/in_memory/mod.rs index a2e4d61057..195c2c210c 100644 --- a/crates/pixi_build_frontend/src/backend/in_memory/mod.rs +++ b/crates/pixi_build_frontend/src/backend/in_memory/mod.rs @@ -55,6 +55,7 @@ pub trait InMemoryBackend: Send { fn conda_outputs( &self, params: CondaOutputsParams, + output_stream: &(dyn BackendOutputStream + Send + 'static), ) -> Result> { unimplemented!() } diff --git a/crates/pixi_build_frontend/src/backend/json_rpc.rs b/crates/pixi_build_frontend/src/backend/json_rpc.rs index db00e50d33..121a30eba2 100644 --- a/crates/pixi_build_frontend/src/backend/json_rpc.rs +++ b/crates/pixi_build_frontend/src/backend/json_rpc.rs @@ -32,7 +32,7 @@ use tokio::{ sync::{Mutex, oneshot}, }; -use super::stderr::{stderr_buffer, stream_stderr}; +use super::stderr::stream_stderr; use crate::{ backend::BackendOutputStream, error::BackendError, @@ -334,16 +334,17 @@ impl JsonRpcBackend { } /// Call the `conda/outputs` method on the backend. - pub async fn conda_outputs( + pub async fn conda_outputs( &self, request: CondaOutputsParams, + output_stream: W, ) -> Result { // Capture all of stderr and discard it let stderr = self.stderr.as_ref().map(|stderr| { // Cancellation signal let (cancel_tx, cancel_rx) = oneshot::channel(); // Spawn the stderr forwarding task - let handle = tokio::spawn(stderr_buffer(stderr.clone(), cancel_rx)); + let handle = tokio::spawn(stream_stderr(stderr.clone(), cancel_rx, output_stream)); (cancel_tx, handle) }); diff --git a/crates/pixi_build_frontend/src/backend/mod.rs b/crates/pixi_build_frontend/src/backend/mod.rs index 64511826bf..59e911cc11 100644 --- a/crates/pixi_build_frontend/src/backend/mod.rs +++ b/crates/pixi_build_frontend/src/backend/mod.rs @@ -152,19 +152,22 @@ impl Backend { } /// Returns the outputs that this backend can produce. - pub async fn conda_outputs( + pub async fn conda_outputs( &self, params: CondaOutputsParams, + output_stream: W, ) -> Result { assert!( self.inner.capabilities().provides_conda_outputs(), "This backend does not support the conda outputs procedure" ); match &self.inner { - BackendImplementation::JsonRpc(json_rpc) => json_rpc.conda_outputs(params).await, - BackendImplementation::InMemory(in_memory) => { - in_memory.conda_outputs(params).map_err(|e| *e) + BackendImplementation::JsonRpc(json_rpc) => { + json_rpc.conda_outputs(params, output_stream).await } + BackendImplementation::InMemory(in_memory) => in_memory + .conda_outputs(params, &output_stream) + .map_err(|e| *e), } } } diff --git a/crates/pixi_build_frontend/src/backend/stderr.rs b/crates/pixi_build_frontend/src/backend/stderr.rs index a87dfbd155..050078ebfe 100644 --- a/crates/pixi_build_frontend/src/backend/stderr.rs +++ b/crates/pixi_build_frontend/src/backend/stderr.rs @@ -7,35 +7,6 @@ use tokio::{ sync::{Mutex, oneshot}, }; -/// Stderr stream that captures the stderr output of the backend and stores it -/// in a buffer for later use. -pub(crate) async fn stderr_buffer( - buffer: Arc>>>, - cancel: oneshot::Receiver<()>, -) -> Result { - // Create a future that continuously read from the buffer and stores the lines - // until all data is received. - let mut lines = Vec::new(); - let read_and_buffer = async { - let mut buffer = buffer.lock().await; - while let Some(line) = buffer.next_line().await? { - lines.push(line); - } - Ok(lines.join("\n")) - }; - - // Either wait until the cancel signal is received or the `read_and_buffer` - // finishes which means there is no more data to read. - tokio::select! { - _ = cancel => { - Ok(lines.join("\n")) - } - result = read_and_buffer => { - result - } - } -} - /// Stderr stream that captures the stderr output of the backend and stores it /// in a buffer for later use. pub(crate) async fn stream_stderr( diff --git a/crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs b/crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs index 78fb2b6cda..e3eeb6873c 100644 --- a/crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs +++ b/crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs @@ -1,3 +1,11 @@ +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + hash::{Hash, Hasher}, + path::PathBuf, + sync::Mutex, +}; + +use futures::{SinkExt, channel::mpsc::UnboundedSender}; use miette::Diagnostic; use once_cell::sync::Lazy; use pathdiff::diff_paths; @@ -9,12 +17,6 @@ use pixi_record::{InputHash, PinnedSourceSpec}; use pixi_spec::{SourceAnchor, SourceSpec}; use rand::random; use rattler_conda_types::{ChannelConfig, ChannelUrl}; -use std::{ - collections::{BTreeMap, BTreeSet, HashSet}, - hash::{Hash, Hasher}, - path::PathBuf, - sync::Mutex, -}; use thiserror::Error; use tracing::instrument; use xxhash_rust::xxh3::Xxh3; @@ -100,6 +102,7 @@ impl BuildBackendMetadataSpec { pub(crate) async fn request( self, command_dispatcher: CommandDispatcher, + log_sink: UnboundedSender, ) -> Result> { // Ensure that the source is checked out before proceeding. let source_checkout = command_dispatcher @@ -195,6 +198,7 @@ impl BuildBackendMetadataSpec { source_checkout, backend, additional_glob_hash, + log_sink, ) .await?; @@ -318,6 +322,7 @@ impl BuildBackendMetadataSpec { source_checkout: SourceCheckout, backend: Backend, additional_glob_hash: Vec, + mut log_sink: UnboundedSender, ) -> Result> { let backend_identifier = backend.identifier().to_string(); let params = CondaOutputsParams { @@ -338,7 +343,9 @@ impl BuildBackendMetadataSpec { ), }; let outputs = backend - .conda_outputs(params) + .conda_outputs(params, move |line| { + let _err = futures::executor::block_on(log_sink.send(line)); + }) .await .map_err(BuildBackendMetadataError::Communication) .map_err(CommandDispatcherError::Failed)?; diff --git a/crates/pixi_command_dispatcher/src/command_dispatcher_processor/build_backend_metadata.rs b/crates/pixi_command_dispatcher/src/command_dispatcher_processor/build_backend_metadata.rs index c21ae60239..3b6f2487dc 100644 --- a/crates/pixi_command_dispatcher/src/command_dispatcher_processor/build_backend_metadata.rs +++ b/crates/pixi_command_dispatcher/src/command_dispatcher_processor/build_backend_metadata.rs @@ -1,6 +1,6 @@ use std::{collections::hash_map::Entry, sync::Arc}; -use futures::FutureExt; +use futures::{FutureExt, channel::mpsc::UnboundedSender}; use tokio_util::sync::CancellationToken; use super::{CommandDispatcherProcessor, PendingDeduplicatingTask, TaskResult}; @@ -68,19 +68,23 @@ impl CommandDispatcherProcessor { .insert(source_metadata_id, reporter_id); } + // Open a channel to receive build output. + let (tx, rx) = futures::channel::mpsc::unbounded(); + if let Some((reporter, reporter_id)) = self .reporter .as_deref_mut() .and_then(Reporter::as_build_backend_metadata_reporter) .zip(reporter_id) { - reporter.on_started(reporter_id) + reporter.on_started(reporter_id, Box::new(rx)) } self.queue_build_backend_metadata_task( source_metadata_id, task.spec, task.cancellation_token, + tx, ); } } @@ -92,13 +96,16 @@ impl CommandDispatcherProcessor { build_backend_metadata_id: BuildBackendMetadataId, spec: BuildBackendMetadataSpec, cancellation_token: CancellationToken, + log_sink: UnboundedSender, ) { let dispatcher = self.create_task_command_dispatcher( CommandDispatcherContext::BuildBackendMetadata(build_backend_metadata_id), ); + + // Open a channel to receive build output. self.pending_futures.push( cancellation_token - .run_until_cancelled_owned(spec.request(dispatcher)) + .run_until_cancelled_owned(spec.request(dispatcher, log_sink)) .map(move |result| { TaskResult::BuildBackendMetadata( build_backend_metadata_id, @@ -133,7 +140,8 @@ impl CommandDispatcherProcessor { .and_then(Reporter::as_build_backend_metadata_reporter) .zip(self.build_backend_metadata_reporters.remove(&id)) { - reporter.on_finished(reporter_id); + let failed = result.is_err(); + reporter.on_finished(reporter_id, failed); } self.build_backend_metadata diff --git a/crates/pixi_command_dispatcher/src/command_dispatcher_processor/source_build.rs b/crates/pixi_command_dispatcher/src/command_dispatcher_processor/source_build.rs index 6307200ee6..99634ad901 100644 --- a/crates/pixi_command_dispatcher/src/command_dispatcher_processor/source_build.rs +++ b/crates/pixi_command_dispatcher/src/command_dispatcher_processor/source_build.rs @@ -1,4 +1,4 @@ -use std::collections::hash_map::Entry; +use std::{collections::hash_map::Entry, sync::Arc}; use futures::FutureExt; use tokio_util::sync::CancellationToken; @@ -8,6 +8,7 @@ use crate::{ CommandDispatcherError, Reporter, SourceBuildError, SourceBuildResult, SourceBuildSpec, command_dispatcher::{CommandDispatcherContext, SourceBuildId, SourceBuildTask}, }; +use rattler_repodata_gateway::RunExportsReporter; impl CommandDispatcherProcessor { /// Constructs a new [`SourceBuildId`] for the given `task`. @@ -61,15 +62,6 @@ impl CommandDispatcherProcessor { .insert(source_build_id, reporter_id); } - if let Some((reporter, reporter_id)) = self - .reporter - .as_deref_mut() - .and_then(Reporter::as_source_build_reporter) - .zip(reporter_id) - { - reporter.on_started(reporter_id) - } - self.queue_source_build_task(source_build_id, task.spec, task.cancellation_token); } } @@ -82,19 +74,27 @@ impl CommandDispatcherProcessor { spec: SourceBuildSpec, cancellation_token: CancellationToken, ) { - let dispatcher = self - .create_task_command_dispatcher(CommandDispatcherContext::SourceBuild(source_build_id)); - let dispatcher_context = CommandDispatcherContext::SourceBuild(source_build_id); + let dispatcher = self.create_task_command_dispatcher(dispatcher_context); + let reporter_context = self.reporter_context(dispatcher_context); - let run_exports_reporter = self - .reporter - .as_mut() - .and_then(|reporter| reporter.create_run_exports_reporter(reporter_context)); + let (tx, rx) = futures::channel::mpsc::unbounded::(); + + let mut run_exports_reporter: Option> = None; + if let Some(reporter) = self.reporter.as_mut() { + let created = reporter.create_run_exports_reporter(reporter_context); + if let Some((source_reporter, reporter_id)) = reporter + .as_source_build_reporter() + .zip(self.source_build_reporters.get(&source_build_id).copied()) + { + source_reporter.on_started(reporter_id, Box::new(rx), created.clone()); + } + run_exports_reporter = created; + } self.pending_futures.push( cancellation_token - .run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter)) + .run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter.clone(), tx)) .map(move |result| { TaskResult::SourceBuild( source_build_id, @@ -122,7 +122,8 @@ impl CommandDispatcherProcessor { .and_then(Reporter::as_source_build_reporter) .zip(self.source_build_reporters.remove(&id)) { - reporter.on_finished(reporter_id); + let failed = result.as_ref().is_err(); + reporter.on_finished(reporter_id, failed); } self.source_build diff --git a/crates/pixi_command_dispatcher/src/reporter.rs b/crates/pixi_command_dispatcher/src/reporter.rs index cea2d504de..5c488b9009 100644 --- a/crates/pixi_command_dispatcher/src/reporter.rs +++ b/crates/pixi_command_dispatcher/src/reporter.rs @@ -143,10 +143,14 @@ pub trait BuildBackendMetadataReporter { ) -> BuildBackendMetadataId; /// Called when the operation has started. - fn on_started(&mut self, id: BuildBackendMetadataId); + fn on_started( + &mut self, + id: BuildBackendMetadataId, + backend_output_stream: Box + Unpin + Send>, + ); /// Called when the operation has finished. - fn on_finished(&mut self, id: BuildBackendMetadataId); + fn on_finished(&mut self, id: BuildBackendMetadataId, failed: bool); } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)] @@ -183,10 +187,15 @@ pub trait SourceBuildReporter { ) -> SourceBuildId; /// Called when the operation has started. - fn on_started(&mut self, id: SourceBuildId); + fn on_started( + &mut self, + id: SourceBuildId, + backend_output_stream: Box + Unpin + Send>, + run_exports_reporter: Option>, + ); /// Called when the operation has finished. - fn on_finished(&mut self, id: SourceBuildId); + fn on_finished(&mut self, id: SourceBuildId, failed: bool); } /// A trait that is used to report the progress of a source build performed by diff --git a/crates/pixi_command_dispatcher/src/source_build/mod.rs b/crates/pixi_command_dispatcher/src/source_build/mod.rs index 986fcb3f15..a7d97c5b6e 100644 --- a/crates/pixi_command_dispatcher/src/source_build/mod.rs +++ b/crates/pixi_command_dispatcher/src/source_build/mod.rs @@ -4,6 +4,7 @@ use std::{ sync::Arc, }; +use futures::{SinkExt, channel::mpsc::UnboundedSender}; use miette::Diagnostic; use pixi_build_discovery::EnabledProtocols; use pixi_build_frontend::Backend; @@ -127,6 +128,7 @@ impl SourceBuildSpec { mut self, command_dispatcher: CommandDispatcher, reporter: Option>, + log_sink: UnboundedSender, ) -> Result> { // If the output directory is not set, we want to use the build cache. Read the // build cache in that case. @@ -286,6 +288,7 @@ impl SourceBuildSpec { work_directory, package_build_input_hash, reporter, + log_sink, ) .await?; @@ -405,6 +408,7 @@ impl SourceBuildSpec { work_directory: PathBuf, package_build_input_hash: PackageBuildInputHash, reporter: Option>, + mut log_sink: UnboundedSender, ) -> Result> { let source_anchor = SourceAnchor::from(SourceSpec::from(self.source.clone())); let host_platform = self.build_environment.host_platform; @@ -413,14 +417,19 @@ impl SourceBuildSpec { // Request the metadata from the backend. // TODO: Can we somehow cache this metadata? let outputs = backend - .conda_outputs(CondaOutputsParams { - host_platform, - build_platform, - variant_configuration: self.variants.clone(), - variant_files: self.variant_files.clone(), - work_directory: work_directory.clone(), - channels: self.channels.clone(), - }) + .conda_outputs( + CondaOutputsParams { + host_platform, + build_platform, + variant_configuration: self.variants.clone(), + variant_files: self.variant_files.clone(), + work_directory: work_directory.clone(), + channels: self.channels.clone(), + }, + move |line| { + let _err = futures::executor::block_on(log_sink.send(line)); + }, + ) .await .map_err(BackendSourceBuildError::BuildError) .map_err(SourceBuildError::from) diff --git a/crates/pixi_command_dispatcher/tests/integration/event_reporter.rs b/crates/pixi_command_dispatcher/tests/integration/event_reporter.rs index 8fb6cf855e..beb05b42e1 100644 --- a/crates/pixi_command_dispatcher/tests/integration/event_reporter.rs +++ b/crates/pixi_command_dispatcher/tests/integration/event_reporter.rs @@ -17,6 +17,7 @@ use pixi_command_dispatcher::{ }, }; use pixi_git::resolver::RepositoryReference; +use rattler_repodata_gateway::RunExportsReporter; use serde::Serialize; #[derive(Debug, Serialize)] @@ -381,13 +382,23 @@ impl BuildBackendMetadataReporter for EventReporter { next_id } - fn on_started(&mut self, id: BuildBackendMetadataId) { + fn on_started( + &mut self, + id: BuildBackendMetadataId, + mut backend_output_stream: Box + Unpin + Send>, + ) { let event = Event::BuildBackendMetadataStarted { id }; eprintln!("{}", serde_json::to_string_pretty(&event).unwrap()); self.events.push(event); + + tokio::spawn(async move { + while let Some(line) = backend_output_stream.next().await { + eprintln!("{line}"); + } + }); } - fn on_finished(&mut self, id: BuildBackendMetadataId) { + fn on_finished(&mut self, id: BuildBackendMetadataId, _failed: bool) { let event = Event::BuildBackendMetadataFinished { id }; eprintln!("{}", serde_json::to_string_pretty(&event).unwrap()); self.events.push(event); @@ -477,13 +488,25 @@ impl SourceBuildReporter for EventReporter { next_id } - fn on_started(&mut self, id: SourceBuildId) { + fn on_started( + &mut self, + id: SourceBuildId, + backend_output_stream: Box + Unpin + Send>, + _run_exports_reporter: Option>, + ) { let event = Event::SourceBuildStarted { id }; eprintln!("{}", serde_json::to_string_pretty(&event).unwrap()); self.events.push(event); + + tokio::spawn(async move { + let mut output_stream = backend_output_stream; + while let Some(line) = output_stream.next().await { + eprintln!("{line}"); + } + }); } - fn on_finished(&mut self, id: SourceBuildId) { + fn on_finished(&mut self, id: SourceBuildId, _failed: bool) { let event = Event::SourceBuildFinished { id }; eprintln!("{}", serde_json::to_string_pretty(&event).unwrap()); self.events.push(event); diff --git a/crates/pixi_reporters/src/sync_reporter.rs b/crates/pixi_reporters/src/sync_reporter.rs index e4219fc404..6422df2e27 100644 --- a/crates/pixi_reporters/src/sync_reporter.rs +++ b/crates/pixi_reporters/src/sync_reporter.rs @@ -12,6 +12,7 @@ use pixi_command_dispatcher::{ use pixi_progress::ProgressBarPlacement; use rattler::{install::Transaction, package_cache::CacheReporter}; use rattler_conda_types::{PrefixRecord, RepoDataRecord}; +use rattler_repodata_gateway::RunExportsReporter; use tokio::sync::mpsc::UnboundedReceiver; use crate::{ @@ -44,6 +45,8 @@ impl SyncReporter { let mut inner = self.combined_inner.lock(); inner.preparing_progress_bar.clear(); inner.install_progress_bar.clear(); + inner.build_output_receiver = None; + inner.active_run_exports_reporter = None; } /// Creates a new InstallReporter that shares this SyncReporter instance @@ -154,15 +157,55 @@ impl SourceBuildReporter for SyncReporter { SourceBuildId(id) } - fn on_started(&mut self, id: SourceBuildId) { + fn on_started( + &mut self, + id: SourceBuildId, + mut backend_output_stream: Box + Unpin + Send>, + run_exports_reporter: Option>, + ) { // Notify the progress bar that the build has started. - let mut inner = self.combined_inner.lock(); - inner.preparing_progress_bar.on_build_start(id.0); + let print_backend_output = tracing::event_enabled!(tracing::Level::WARN); + let progress_bar = self.multi_progress.clone(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + + { + let mut inner = self.combined_inner.lock(); + inner.preparing_progress_bar.on_build_start(id.0); + inner.active_run_exports_reporter = run_exports_reporter; + if !print_backend_output { + inner.build_output_receiver = Some(rx); + } + } + + tokio::spawn(async move { + while let Some(line) = backend_output_stream.next().await { + if print_backend_output { + progress_bar.suspend(|| eprintln!("{line}")); + } else if tx.send(line).is_err() { + break; + } + } + }); } - fn on_finished(&mut self, id: SourceBuildId) { - let mut inner = self.combined_inner.lock(); - inner.preparing_progress_bar.on_build_finished(id.0); + fn on_finished(&mut self, id: SourceBuildId, failed: bool) { + let build_output_receiver = { + let mut inner = self.combined_inner.lock(); + inner.preparing_progress_bar.on_build_finished(id.0); + inner.active_run_exports_reporter = None; + inner.build_output_receiver.take() + }; + + if failed { + let progress_bar = self.multi_progress.clone(); + if let Some(mut build_output_receiver) = build_output_receiver { + tokio::spawn(async move { + while let Some(line) = build_output_receiver.recv().await { + progress_bar.suspend(|| eprintln!("{line}")); + } + }); + } + } } } @@ -176,6 +219,7 @@ pub struct CombinedInstallReporterInner { install_progress_bar: MainProgressBar, build_output_receiver: Option>, + active_run_exports_reporter: Option>, } #[derive(PartialEq, Eq)] @@ -229,6 +273,7 @@ impl CombinedInstallReporterInner { operation_link_id: HashMap::new(), cache_entry_id: HashMap::new(), build_output_receiver: None, + active_run_exports_reporter: None, } }