Skip to content

Commit d1e724f

Browse files
authored
chore: always enable stderr printing on verbose mode in the backends (#4814)
1 parent 6610c49 commit d1e724f

File tree

12 files changed

+159
-90
lines changed

12 files changed

+159
-90
lines changed

crates/pixi_build_backend_passthrough/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ impl InMemoryBackend for PassthroughBackend {
6666
fn conda_outputs(
6767
&self,
6868
params: CondaOutputsParams,
69+
_output_stream: &(dyn BackendOutputStream + Send + 'static),
6970
) -> Result<CondaOutputsResult, Box<CommunicationError>> {
7071
Ok(CondaOutputsResult {
7172
outputs: vec![CondaOutput {

crates/pixi_build_frontend/src/backend/in_memory/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pub trait InMemoryBackend: Send {
5555
fn conda_outputs(
5656
&self,
5757
params: CondaOutputsParams,
58+
output_stream: &(dyn BackendOutputStream + Send + 'static),
5859
) -> Result<CondaOutputsResult, Box<CommunicationError>> {
5960
unimplemented!()
6061
}

crates/pixi_build_frontend/src/backend/json_rpc.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::{
3232
sync::{Mutex, oneshot},
3333
};
3434

35-
use super::stderr::{stderr_buffer, stream_stderr};
35+
use super::stderr::stream_stderr;
3636
use crate::{
3737
backend::BackendOutputStream,
3838
error::BackendError,
@@ -287,7 +287,7 @@ impl JsonRpcBackend {
287287
request: CondaBuildV1Params,
288288
output_stream: W,
289289
) -> Result<CondaBuildV1Result, CommunicationError> {
290-
// Capture all of stderr and discard it
290+
// Capture all of stderr and stream it
291291
let stderr = self.stderr.as_ref().map(|stderr| {
292292
// Cancellation signal
293293
let (cancel_tx, cancel_rx) = oneshot::channel();
@@ -334,16 +334,17 @@ impl JsonRpcBackend {
334334
}
335335

336336
/// Call the `conda/outputs` method on the backend.
337-
pub async fn conda_outputs(
337+
pub async fn conda_outputs<W: BackendOutputStream + Send + 'static>(
338338
&self,
339339
request: CondaOutputsParams,
340+
output_stream: W,
340341
) -> Result<CondaOutputsResult, CommunicationError> {
341-
// Capture all of stderr and discard it
342+
// Capture all of stderr and stream it
342343
let stderr = self.stderr.as_ref().map(|stderr| {
343344
// Cancellation signal
344345
let (cancel_tx, cancel_rx) = oneshot::channel();
345346
// Spawn the stderr forwarding task
346-
let handle = tokio::spawn(stderr_buffer(stderr.clone(), cancel_rx));
347+
let handle = tokio::spawn(stream_stderr(stderr.clone(), cancel_rx, output_stream));
347348
(cancel_tx, handle)
348349
});
349350

crates/pixi_build_frontend/src/backend/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,22 @@ impl Backend {
152152
}
153153

154154
/// Returns the outputs that this backend can produce.
155-
pub async fn conda_outputs(
155+
pub async fn conda_outputs<W: BackendOutputStream + Send + 'static>(
156156
&self,
157157
params: CondaOutputsParams,
158+
output_stream: W,
158159
) -> Result<CondaOutputsResult, CommunicationError> {
159160
assert!(
160161
self.inner.capabilities().provides_conda_outputs(),
161162
"This backend does not support the conda outputs procedure"
162163
);
163164
match &self.inner {
164-
BackendImplementation::JsonRpc(json_rpc) => json_rpc.conda_outputs(params).await,
165-
BackendImplementation::InMemory(in_memory) => {
166-
in_memory.conda_outputs(params).map_err(|e| *e)
165+
BackendImplementation::JsonRpc(json_rpc) => {
166+
json_rpc.conda_outputs(params, output_stream).await
167167
}
168+
BackendImplementation::InMemory(in_memory) => in_memory
169+
.conda_outputs(params, &output_stream)
170+
.map_err(|e| *e),
168171
}
169172
}
170173
}

crates/pixi_build_frontend/src/backend/stderr.rs

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,6 @@ use tokio::{
77
sync::{Mutex, oneshot},
88
};
99

10-
/// Stderr stream that captures the stderr output of the backend and stores it
11-
/// in a buffer for later use.
12-
pub(crate) async fn stderr_buffer(
13-
buffer: Arc<Mutex<Lines<BufReader<ChildStderr>>>>,
14-
cancel: oneshot::Receiver<()>,
15-
) -> Result<String, std::io::Error> {
16-
// Create a future that continuously read from the buffer and stores the lines
17-
// until all data is received.
18-
let mut lines = Vec::new();
19-
let read_and_buffer = async {
20-
let mut buffer = buffer.lock().await;
21-
while let Some(line) = buffer.next_line().await? {
22-
lines.push(line);
23-
}
24-
Ok(lines.join("\n"))
25-
};
26-
27-
// Either wait until the cancel signal is received or the `read_and_buffer`
28-
// finishes which means there is no more data to read.
29-
tokio::select! {
30-
_ = cancel => {
31-
Ok(lines.join("\n"))
32-
}
33-
result = read_and_buffer => {
34-
result
35-
}
36-
}
37-
}
38-
3910
/// Stderr stream that captures the stderr output of the backend and stores it
4011
/// in a buffer for later use.
4112
pub(crate) async fn stream_stderr<W: BackendOutputStream>(

crates/pixi_command_dispatcher/src/build_backend_metadata/mod.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
use std::{
2+
collections::{BTreeMap, BTreeSet, HashSet},
3+
hash::{Hash, Hasher},
4+
path::PathBuf,
5+
sync::Mutex,
6+
};
7+
8+
use futures::{SinkExt, channel::mpsc::UnboundedSender};
19
use miette::Diagnostic;
210
use once_cell::sync::Lazy;
311
use pathdiff::diff_paths;
@@ -9,12 +17,6 @@ use pixi_record::{InputHash, PinnedSourceSpec};
917
use pixi_spec::{SourceAnchor, SourceSpec};
1018
use rand::random;
1119
use rattler_conda_types::{ChannelConfig, ChannelUrl};
12-
use std::{
13-
collections::{BTreeMap, BTreeSet, HashSet},
14-
hash::{Hash, Hasher},
15-
path::PathBuf,
16-
sync::Mutex,
17-
};
1820
use thiserror::Error;
1921
use tracing::instrument;
2022
use xxhash_rust::xxh3::Xxh3;
@@ -100,6 +102,7 @@ impl BuildBackendMetadataSpec {
100102
pub(crate) async fn request(
101103
self,
102104
command_dispatcher: CommandDispatcher,
105+
log_sink: UnboundedSender<String>,
103106
) -> Result<BuildBackendMetadata, CommandDispatcherError<BuildBackendMetadataError>> {
104107
// Ensure that the source is checked out before proceeding.
105108
let source_checkout = command_dispatcher
@@ -195,6 +198,7 @@ impl BuildBackendMetadataSpec {
195198
source_checkout,
196199
backend,
197200
additional_glob_hash,
201+
log_sink,
198202
)
199203
.await?;
200204

@@ -318,6 +322,7 @@ impl BuildBackendMetadataSpec {
318322
source_checkout: SourceCheckout,
319323
backend: Backend,
320324
additional_glob_hash: Vec<u8>,
325+
mut log_sink: UnboundedSender<String>,
321326
) -> Result<CachedCondaMetadata, CommandDispatcherError<BuildBackendMetadataError>> {
322327
let backend_identifier = backend.identifier().to_string();
323328
let params = CondaOutputsParams {
@@ -338,7 +343,9 @@ impl BuildBackendMetadataSpec {
338343
),
339344
};
340345
let outputs = backend
341-
.conda_outputs(params)
346+
.conda_outputs(params, move |line| {
347+
let _err = futures::executor::block_on(log_sink.send(line));
348+
})
342349
.await
343350
.map_err(BuildBackendMetadataError::Communication)
344351
.map_err(CommandDispatcherError::Failed)?;

crates/pixi_command_dispatcher/src/command_dispatcher_processor/build_backend_metadata.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{collections::hash_map::Entry, sync::Arc};
22

3-
use futures::FutureExt;
3+
use futures::{FutureExt, channel::mpsc::UnboundedSender};
44
use tokio_util::sync::CancellationToken;
55

66
use super::{CommandDispatcherProcessor, PendingDeduplicatingTask, TaskResult};
@@ -68,19 +68,23 @@ impl CommandDispatcherProcessor {
6868
.insert(source_metadata_id, reporter_id);
6969
}
7070

71+
// Open a channel to receive build output.
72+
let (log_sink, rx) = futures::channel::mpsc::unbounded();
73+
7174
if let Some((reporter, reporter_id)) = self
7275
.reporter
7376
.as_deref_mut()
7477
.and_then(Reporter::as_build_backend_metadata_reporter)
7578
.zip(reporter_id)
7679
{
77-
reporter.on_started(reporter_id)
80+
reporter.on_started(reporter_id, Box::new(rx))
7881
}
7982

8083
self.queue_build_backend_metadata_task(
8184
source_metadata_id,
8285
task.spec,
8386
task.cancellation_token,
87+
log_sink,
8488
);
8589
}
8690
}
@@ -92,13 +96,16 @@ impl CommandDispatcherProcessor {
9296
build_backend_metadata_id: BuildBackendMetadataId,
9397
spec: BuildBackendMetadataSpec,
9498
cancellation_token: CancellationToken,
99+
log_sink: UnboundedSender<String>,
95100
) {
96101
let dispatcher = self.create_task_command_dispatcher(
97102
CommandDispatcherContext::BuildBackendMetadata(build_backend_metadata_id),
98103
);
104+
105+
// Open a channel to receive build output.
99106
self.pending_futures.push(
100107
cancellation_token
101-
.run_until_cancelled_owned(spec.request(dispatcher))
108+
.run_until_cancelled_owned(spec.request(dispatcher, log_sink))
102109
.map(move |result| {
103110
TaskResult::BuildBackendMetadata(
104111
build_backend_metadata_id,
@@ -133,7 +140,8 @@ impl CommandDispatcherProcessor {
133140
.and_then(Reporter::as_build_backend_metadata_reporter)
134141
.zip(self.build_backend_metadata_reporters.remove(&id))
135142
{
136-
reporter.on_finished(reporter_id);
143+
let failed = result.is_err();
144+
reporter.on_finished(reporter_id, failed);
137145
}
138146

139147
self.build_backend_metadata

crates/pixi_command_dispatcher/src/command_dispatcher_processor/source_build.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::hash_map::Entry;
1+
use std::{collections::hash_map::Entry, sync::Arc};
22

33
use futures::FutureExt;
44
use tokio_util::sync::CancellationToken;
@@ -8,6 +8,7 @@ use crate::{
88
CommandDispatcherError, Reporter, SourceBuildError, SourceBuildResult, SourceBuildSpec,
99
command_dispatcher::{CommandDispatcherContext, SourceBuildId, SourceBuildTask},
1010
};
11+
use rattler_repodata_gateway::RunExportsReporter;
1112

1213
impl CommandDispatcherProcessor {
1314
/// Constructs a new [`SourceBuildId`] for the given `task`.
@@ -61,15 +62,6 @@ impl CommandDispatcherProcessor {
6162
.insert(source_build_id, reporter_id);
6263
}
6364

64-
if let Some((reporter, reporter_id)) = self
65-
.reporter
66-
.as_deref_mut()
67-
.and_then(Reporter::as_source_build_reporter)
68-
.zip(reporter_id)
69-
{
70-
reporter.on_started(reporter_id)
71-
}
72-
7365
self.queue_source_build_task(source_build_id, task.spec, task.cancellation_token);
7466
}
7567
}
@@ -82,19 +74,27 @@ impl CommandDispatcherProcessor {
8274
spec: SourceBuildSpec,
8375
cancellation_token: CancellationToken,
8476
) {
85-
let dispatcher = self
86-
.create_task_command_dispatcher(CommandDispatcherContext::SourceBuild(source_build_id));
87-
8877
let dispatcher_context = CommandDispatcherContext::SourceBuild(source_build_id);
78+
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);
79+
8980
let reporter_context = self.reporter_context(dispatcher_context);
90-
let run_exports_reporter = self
91-
.reporter
92-
.as_mut()
93-
.and_then(|reporter| reporter.create_run_exports_reporter(reporter_context));
81+
let (tx, rx) = futures::channel::mpsc::unbounded::<String>();
82+
83+
let mut run_exports_reporter: Option<Arc<dyn RunExportsReporter>> = None;
84+
if let Some(reporter) = self.reporter.as_mut() {
85+
let created = reporter.create_run_exports_reporter(reporter_context);
86+
if let Some((source_reporter, reporter_id)) = reporter
87+
.as_source_build_reporter()
88+
.zip(self.source_build_reporters.get(&source_build_id).copied())
89+
{
90+
source_reporter.on_started(reporter_id, Box::new(rx));
91+
}
92+
run_exports_reporter = created;
93+
}
9494

9595
self.pending_futures.push(
9696
cancellation_token
97-
.run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter))
97+
.run_until_cancelled_owned(spec.build(dispatcher, run_exports_reporter.clone(), tx))
9898
.map(move |result| {
9999
TaskResult::SourceBuild(
100100
source_build_id,
@@ -122,7 +122,8 @@ impl CommandDispatcherProcessor {
122122
.and_then(Reporter::as_source_build_reporter)
123123
.zip(self.source_build_reporters.remove(&id))
124124
{
125-
reporter.on_finished(reporter_id);
125+
let failed = result.as_ref().is_err();
126+
reporter.on_finished(reporter_id, failed);
126127
}
127128

128129
self.source_build

crates/pixi_command_dispatcher/src/reporter.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,14 @@ pub trait BuildBackendMetadataReporter {
143143
) -> BuildBackendMetadataId;
144144

145145
/// Called when the operation has started.
146-
fn on_started(&mut self, id: BuildBackendMetadataId);
146+
fn on_started(
147+
&mut self,
148+
id: BuildBackendMetadataId,
149+
backend_output_stream: Box<dyn Stream<Item = String> + Unpin + Send>,
150+
);
147151

148152
/// Called when the operation has finished.
149-
fn on_finished(&mut self, id: BuildBackendMetadataId);
153+
fn on_finished(&mut self, id: BuildBackendMetadataId, failed: bool);
150154
}
151155

152156
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
@@ -183,10 +187,14 @@ pub trait SourceBuildReporter {
183187
) -> SourceBuildId;
184188

185189
/// Called when the operation has started.
186-
fn on_started(&mut self, id: SourceBuildId);
190+
fn on_started(
191+
&mut self,
192+
id: SourceBuildId,
193+
backend_output_stream: Box<dyn Stream<Item = String> + Unpin + Send>,
194+
);
187195

188196
/// Called when the operation has finished.
189-
fn on_finished(&mut self, id: SourceBuildId);
197+
fn on_finished(&mut self, id: SourceBuildId, failed: bool);
190198
}
191199

192200
/// A trait that is used to report the progress of a source build performed by

0 commit comments

Comments
 (0)