Skip to content

Commit 7c3f6bd

Browse files
committed
attempt at doing the json first
1 parent 3941437 commit 7c3f6bd

File tree

4 files changed

+596
-56
lines changed

4 files changed

+596
-56
lines changed

crates/lib/src/deploy.rs

Lines changed: 44 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use ostree_ext::ostree::{self, Sysroot};
2121
use ostree_ext::sysroot::SysrootLock;
2222
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;
2323

24+
use crate::progress_aggregator::ProgressAggregatorBuilder;
2425
use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep};
26+
use crate::progress_renderer::ProgressFilter;
2527
use crate::spec::ImageReference;
2628
use crate::spec::{BootOrder, HostSpec};
2729
use crate::status::labels_of_config;
@@ -138,7 +140,7 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
138140
}
139141
}
140142

141-
/// Write container fetch progress to standard output.
143+
/// Write container fetch progress using JSON-first architecture.
142144
async fn handle_layer_progress_print(
143145
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
144146
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
@@ -152,35 +154,23 @@ async fn handle_layer_progress_print(
152154
) -> ProgressWriter {
153155
let start = std::time::Instant::now();
154156
let mut total_read = 0u64;
155-
let bar = indicatif::MultiProgress::new();
156-
if quiet {
157-
bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
158-
}
159-
let layers_bar = bar.add(indicatif::ProgressBar::new(
160-
n_layers_to_fetch.try_into().unwrap(),
161-
));
162-
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
163-
// let byte_bar = indicatif::ProgressBar::new(0);
164-
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
165-
layers_bar.set_style(
166-
indicatif::ProgressStyle::default_bar()
167-
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
168-
.unwrap(),
169-
);
170-
let taskname = "Fetching layers";
171-
layers_bar.set_prefix(taskname);
172-
layers_bar.set_message("");
173-
byte_bar.set_prefix("Fetching");
174-
byte_bar.set_style(
175-
indicatif::ProgressStyle::default_bar()
176-
.template(
177-
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
178-
)
179-
.unwrap()
180-
);
157+
158+
// Create JSON-first progress aggregator for pulling tasks
159+
let visual_filter = if quiet {
160+
None
161+
} else {
162+
Some(ProgressFilter::TasksMatching(vec!["pulling".to_string()]))
163+
};
164+
165+
let mut aggregator = ProgressAggregatorBuilder::new()
166+
.with_json(prog.clone())
167+
.with_visual(visual_filter.unwrap_or(ProgressFilter::TasksMatching(vec![])))
168+
.build();
181169

182170
let mut subtasks = vec![];
183171
let mut subtask: SubTaskBytes = Default::default();
172+
let mut current_layer_step = 0u64;
173+
184174
loop {
185175
tokio::select! {
186176
// Always handle layer changes first.
@@ -192,12 +182,6 @@ async fn handle_layer_progress_print(
192182
let short_digest = &layer.digest().digest()[0..21];
193183
let layer_size = layer.size();
194184
if l.is_starting() {
195-
// Reset the progress bar
196-
byte_bar.reset_elapsed();
197-
byte_bar.reset_eta();
198-
byte_bar.set_length(layer_size);
199-
byte_bar.set_message(format!("{layer_type} {short_digest}"));
200-
201185
subtask = SubTaskBytes {
202186
subtask: layer_type.into(),
203187
description: format!("{layer_type}: {short_digest}").clone().into(),
@@ -207,24 +191,26 @@ async fn handle_layer_progress_print(
207191
bytes_total: layer_size,
208192
};
209193
} else {
210-
byte_bar.set_position(layer_size);
211-
layers_bar.inc(1);
212194
total_read = total_read.saturating_add(layer_size);
195+
current_layer_step += 1;
213196
// Emit an event where bytes == total to signal completion.
214197
subtask.bytes = layer_size;
215198
subtasks.push(subtask.clone());
216-
prog.send(Event::ProgressBytes {
199+
200+
// Send progress event via JSON-first aggregator
201+
let event = Event::ProgressBytes {
217202
task: "pulling".into(),
218203
description: format!("Pulling Image: {digest}").into(),
219204
id: (*digest).into(),
220205
bytes_cached: bytes_total - bytes_to_download,
221206
bytes: total_read,
222207
bytes_total: bytes_to_download,
223208
steps_cached: (layers_total - n_layers_to_fetch) as u64,
224-
steps: layers_bar.position(),
209+
steps: current_layer_step,
225210
steps_total: n_layers_to_fetch as u64,
226211
subtasks: subtasks.clone(),
227-
}).await;
212+
};
213+
let _ = aggregator.send_event(event).await;
228214
}
229215
} else {
230216
// If the receiver is disconnected, then we're done
@@ -241,40 +227,42 @@ async fn handle_layer_progress_print(
241227
bytes.as_ref().cloned()
242228
};
243229
if let Some(bytes) = bytes {
244-
byte_bar.set_position(bytes.fetched);
245-
subtask.bytes = byte_bar.position();
246-
prog.send_lossy(Event::ProgressBytes {
230+
subtask.bytes = bytes.fetched;
231+
232+
// Send lossy progress event via JSON-first aggregator
233+
let event = Event::ProgressBytes {
247234
task: "pulling".into(),
248235
description: format!("Pulling Image: {digest}").into(),
249236
id: (*digest).into(),
250237
bytes_cached: bytes_total - bytes_to_download,
251-
bytes: total_read + byte_bar.position(),
238+
bytes: total_read + bytes.fetched,
252239
bytes_total: bytes_to_download,
253240
steps_cached: (layers_total - n_layers_to_fetch) as u64,
254-
steps: layers_bar.position(),
241+
steps: current_layer_step,
255242
steps_total: n_layers_to_fetch as u64,
256243
subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(),
257-
}).await;
244+
};
245+
let _ = aggregator.send_event(event).await;
258246
}
259247
}
260248
}
261249
}
262-
byte_bar.finish_and_clear();
263-
layers_bar.finish_and_clear();
264-
if let Err(e) = bar.clear() {
265-
tracing::warn!("clearing bar: {e}");
266-
}
250+
251+
// Finish progress aggregator
252+
aggregator.finish();
253+
267254
let end = std::time::Instant::now();
268255
let elapsed = end.duration_since(start);
269256
let persec = total_read as f64 / elapsed.as_secs_f64();
270257
let persec = indicatif::HumanBytes(persec as u64);
271-
if let Err(e) = bar.println(&format!(
272-
"Fetched layers: {} in {} ({}/s)",
273-
indicatif::HumanBytes(total_read),
274-
indicatif::HumanDuration(elapsed),
275-
persec,
276-
)) {
277-
tracing::warn!("writing to stdout: {e}");
258+
259+
if !quiet {
260+
println!(
261+
"Fetched layers: {} in {} ({}/s)",
262+
indicatif::HumanBytes(total_read),
263+
indicatif::HumanDuration(elapsed),
264+
persec,
265+
);
278266
}
279267

280268
// Since the progress notifier closed, we know import has started

crates/lib/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ mod lints;
1919
mod lsm;
2020
pub(crate) mod metadata;
2121
mod podman;
22+
mod progress_aggregator;
2223
mod progress_jsonl;
24+
mod progress_renderer;
2325
mod reboot;
2426
mod reexec;
2527
pub mod spec;
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
//! Progress aggregator that implements JSON-first progress architecture
2+
//! All progress flows through JSON events before being rendered or output
3+
4+
use anyhow::Result;
5+
use std::io::IsTerminal;
6+
7+
use crate::progress_jsonl::{Event, ProgressWriter};
8+
use crate::progress_renderer::{ProgressFilter, ProgressRenderer};
9+
10+
/// Unified progress system that emits JSON events and optionally renders them visually
11+
/// This implements the JSON-first architecture where indicatif becomes a consumer of JSON events
12+
pub struct ProgressAggregator {
13+
json_writer: Option<ProgressWriter>,
14+
renderer: Option<ProgressRenderer>,
15+
}
16+
17+
impl ProgressAggregator {
18+
/// Create a new progress aggregator
19+
pub fn new(json_writer: Option<ProgressWriter>, visual_filter: Option<ProgressFilter>) -> Self {
20+
let renderer = if std::io::stderr().is_terminal() && visual_filter.is_some() {
21+
Some(ProgressRenderer::new(visual_filter.unwrap()))
22+
} else {
23+
None
24+
};
25+
26+
Self {
27+
json_writer,
28+
renderer,
29+
}
30+
}
31+
32+
/// Send a progress event - this is the core method that implements JSON-first architecture
33+
pub async fn send_event(&mut self, event: Event<'_>) -> Result<()> {
34+
// 1. Always emit JSON first (if enabled)
35+
if let Some(ref writer) = self.json_writer {
36+
writer.send_lossy(event.clone()).await;
37+
}
38+
39+
// 2. Then render visually (if enabled)
40+
if let Some(ref mut renderer) = self.renderer {
41+
renderer.handle_event(&event)?;
42+
}
43+
44+
Ok(())
45+
}
46+
47+
/// Finish all progress and clean up
48+
pub fn finish(&mut self) {
49+
if let Some(ref mut renderer) = self.renderer {
50+
renderer.finish();
51+
}
52+
}
53+
}
54+
55+
/// Helper to create progress aggregators for common use cases
56+
pub struct ProgressAggregatorBuilder {
57+
json_writer: Option<ProgressWriter>,
58+
visual_filter: Option<ProgressFilter>,
59+
}
60+
61+
impl ProgressAggregatorBuilder {
62+
pub fn new() -> Self {
63+
Self {
64+
json_writer: None,
65+
visual_filter: None,
66+
}
67+
}
68+
69+
/// Enable JSON output to the given writer
70+
pub fn with_json(mut self, writer: ProgressWriter) -> Self {
71+
self.json_writer = Some(writer);
72+
self
73+
}
74+
75+
/// Enable visual progress with the given filter
76+
pub fn with_visual(mut self, filter: ProgressFilter) -> Self {
77+
self.visual_filter = Some(filter);
78+
self
79+
}
80+
81+
/// Build the aggregator
82+
pub fn build(self) -> ProgressAggregator {
83+
ProgressAggregator::new(self.json_writer, self.visual_filter)
84+
}
85+
}
86+
87+
impl Default for ProgressAggregatorBuilder {
88+
fn default() -> Self {
89+
Self::new()
90+
}
91+
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
use crate::progress_jsonl::Event;
97+
use std::borrow::Cow;
98+
99+
#[tokio::test]
100+
async fn test_json_first_architecture() -> Result<()> {
101+
// Create an aggregator that outputs both JSON and visual progress
102+
let mut aggregator = ProgressAggregatorBuilder::new()
103+
.with_visual(ProgressFilter::All)
104+
.build();
105+
106+
// Send a progress event
107+
let event = Event::ProgressBytes {
108+
task: Cow::Borrowed("test"),
109+
description: Cow::Borrowed("Testing progress"),
110+
id: Cow::Borrowed("test-id"),
111+
bytes_cached: 0,
112+
bytes: 50,
113+
bytes_total: 100,
114+
steps_cached: 0,
115+
steps: 1,
116+
steps_total: 2,
117+
subtasks: vec![],
118+
};
119+
120+
aggregator.send_event(event).await?;
121+
122+
Ok(())
123+
}
124+
125+
#[tokio::test]
126+
async fn test_task_filtering() -> Result<()> {
127+
// Create an aggregator that only shows "pulling" tasks
128+
let mut aggregator = ProgressAggregatorBuilder::new()
129+
.for_tasks(vec!["pulling".to_string()])
130+
.build();
131+
132+
// Send a pulling event (should be shown)
133+
let pulling_event = Event::ProgressBytes {
134+
task: Cow::Borrowed("pulling"),
135+
description: Cow::Borrowed("Pulling image"),
136+
id: Cow::Borrowed("image:latest"),
137+
bytes_cached: 0,
138+
bytes: 25,
139+
bytes_total: 100,
140+
steps_cached: 0,
141+
steps: 1,
142+
steps_total: 3,
143+
subtasks: vec![],
144+
};
145+
146+
// Send an installing event (should be filtered out visually)
147+
let installing_event = Event::ProgressSteps {
148+
task: Cow::Borrowed("installing"),
149+
description: Cow::Borrowed("Installing package"),
150+
id: Cow::Borrowed("package"),
151+
steps_cached: 0,
152+
steps: 1,
153+
steps_total: 5,
154+
subtasks: vec![],
155+
};
156+
157+
aggregator.send_event(pulling_event).await?;
158+
aggregator.send_event(installing_event).await?;
159+
160+
Ok(())
161+
}
162+
}

0 commit comments

Comments
 (0)