diff --git a/crates/lib/src/cli.rs b/crates/lib/src/cli.rs index 9a04257f5..4abab1276 100644 --- a/crates/lib/src/cli.rs +++ b/crates/lib/src/cli.rs @@ -34,13 +34,14 @@ use crate::spec::ImageReference; use crate::utils::sigpolicy_from_opt; /// Shared progress options -#[derive(Debug, Parser, PartialEq, Eq)] +#[derive(Debug, Parser, PartialEq, Eq, Clone, Serialize, Deserialize)] pub(crate) struct ProgressOptions { /// File descriptor number which must refer to an open pipe (anonymous or named). /// /// Interactive progress will be written to this file descriptor as "JSON lines" /// format, where each value is separated by a newline. #[clap(long, hide = true)] + #[serde(default)] pub(crate) progress_fd: Option, } @@ -1422,4 +1423,86 @@ mod tests { ])); assert_eq!(args.as_slice(), ["container", "image", "pull"]); } + + #[test] + fn test_progress_fd_install_parsing() { + // Test install to-disk with progress-fd + let opts = Opt::try_parse_from([ + "bootc", + "install", + "to-disk", + "--progress-fd", + "3", + "/dev/sda", + ]) + .unwrap(); + + if let Opt::Install(crate::cli::InstallOpts::ToDisk(install_opts)) = opts { + assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 3); + } else { + panic!("Expected install to-disk command"); + } + + // Test install to-filesystem with progress-fd + let opts = Opt::try_parse_from([ + "bootc", + "install", + "to-filesystem", + "--progress-fd", + "4", + "/mnt/root", + ]) + .unwrap(); + + if let Opt::Install(crate::cli::InstallOpts::ToFilesystem(install_opts)) = opts { + assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 4); + } else { + panic!("Expected install to-filesystem command"); + } + + // Test install to-existing-root with progress-fd + let opts = + Opt::try_parse_from(["bootc", "install", "to-existing-root", "--progress-fd", "5"]) + .unwrap(); + + if let Opt::Install(crate::cli::InstallOpts::ToExistingRoot(install_opts)) = opts { + assert_eq!(install_opts.progress.progress_fd.unwrap().get_raw_fd(), 5); + } else { + panic!("Expected install to-existing-root command"); + } + } + + #[test] + fn test_progress_fd_validation() { + // Test that invalid FD values are rejected + let result = Opt::try_parse_from([ + "bootc", + "install", + "to-disk", + "--progress-fd", + "1", + "/dev/sda", + ]); + assert!(result.is_err()); + + let result = Opt::try_parse_from([ + "bootc", + "install", + "to-disk", + "--progress-fd", + "2", + "/dev/sda", + ]); + assert!(result.is_err()); + + let result = Opt::try_parse_from([ + "bootc", + "install", + "to-disk", + "--progress-fd", + "invalid", + "/dev/sda", + ]); + assert!(result.is_err()); + } } diff --git a/crates/lib/src/deploy.rs b/crates/lib/src/deploy.rs index 7be49acb9..eaad02cd8 100644 --- a/crates/lib/src/deploy.rs +++ b/crates/lib/src/deploy.rs @@ -21,7 +21,9 @@ use ostree_ext::ostree::{self, Sysroot}; use ostree_ext::sysroot::SysrootLock; use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten; +use crate::progress_aggregator::ProgressAggregatorBuilder; use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep}; +use crate::progress_renderer::ProgressFilter; use crate::spec::ImageReference; use crate::spec::{BootOrder, HostSpec}; use crate::status::labels_of_config; @@ -138,7 +140,7 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str { } } -/// Write container fetch progress to standard output. +/// Write container fetch progress using JSON-first architecture. async fn handle_layer_progress_print( mut layers: tokio::sync::mpsc::Receiver, mut layer_bytes: tokio::sync::watch::Receiver>, @@ -152,35 +154,26 @@ async fn handle_layer_progress_print( ) -> ProgressWriter { let start = std::time::Instant::now(); let mut total_read = 0u64; - let bar = indicatif::MultiProgress::new(); - if quiet { - bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); - } - let layers_bar = bar.add(indicatif::ProgressBar::new( - n_layers_to_fetch.try_into().unwrap(), - )); - let byte_bar = bar.add(indicatif::ProgressBar::new(0)); - // let byte_bar = indicatif::ProgressBar::new(0); - // byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); - layers_bar.set_style( - indicatif::ProgressStyle::default_bar() - .template("{prefix} {bar} {pos}/{len} {wide_msg}") - .unwrap(), - ); - let taskname = "Fetching layers"; - layers_bar.set_prefix(taskname); - layers_bar.set_message(""); - byte_bar.set_prefix("Fetching"); - byte_bar.set_style( - indicatif::ProgressStyle::default_bar() - .template( - " └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}", - ) - .unwrap() - ); + + // Create JSON-first progress aggregator for pulling tasks + let visual_filter = if quiet { + None + } else { + Some(ProgressFilter::TasksMatching(vec!["pulling".to_string()])) + }; + + let mut aggregator = { + let mut builder = ProgressAggregatorBuilder::new().with_json(prog.clone()); + if let Some(filter) = visual_filter { + builder = builder.with_visual(filter); + } + builder.build() + }; let mut subtasks = vec![]; let mut subtask: SubTaskBytes = Default::default(); + let mut current_layer_step = 0u64; + loop { tokio::select! { // Always handle layer changes first. @@ -192,12 +185,6 @@ async fn handle_layer_progress_print( let short_digest = &layer.digest().digest()[0..21]; let layer_size = layer.size(); if l.is_starting() { - // Reset the progress bar - byte_bar.reset_elapsed(); - byte_bar.reset_eta(); - byte_bar.set_length(layer_size); - byte_bar.set_message(format!("{layer_type} {short_digest}")); - subtask = SubTaskBytes { subtask: layer_type.into(), description: format!("{layer_type}: {short_digest}").clone().into(), @@ -207,13 +194,14 @@ async fn handle_layer_progress_print( bytes_total: layer_size, }; } else { - byte_bar.set_position(layer_size); - layers_bar.inc(1); total_read = total_read.saturating_add(layer_size); + current_layer_step += 1; // Emit an event where bytes == total to signal completion. subtask.bytes = layer_size; subtasks.push(subtask.clone()); - prog.send(Event::ProgressBytes { + + // Send progress event via JSON-first aggregator + let event = Event::ProgressBytes { task: "pulling".into(), description: format!("Pulling Image: {digest}").into(), id: (*digest).into(), @@ -221,10 +209,11 @@ async fn handle_layer_progress_print( bytes: total_read, bytes_total: bytes_to_download, steps_cached: (layers_total - n_layers_to_fetch) as u64, - steps: layers_bar.position(), + steps: current_layer_step, steps_total: n_layers_to_fetch as u64, subtasks: subtasks.clone(), - }).await; + }; + let _ = aggregator.send_event(event).await; } } else { // If the receiver is disconnected, then we're done @@ -241,40 +230,42 @@ async fn handle_layer_progress_print( bytes.as_ref().cloned() }; if let Some(bytes) = bytes { - byte_bar.set_position(bytes.fetched); - subtask.bytes = byte_bar.position(); - prog.send_lossy(Event::ProgressBytes { + subtask.bytes = bytes.fetched; + + // Send lossy progress event via JSON-first aggregator + let event = Event::ProgressBytes { task: "pulling".into(), description: format!("Pulling Image: {digest}").into(), id: (*digest).into(), bytes_cached: bytes_total - bytes_to_download, - bytes: total_read + byte_bar.position(), + bytes: total_read + bytes.fetched, bytes_total: bytes_to_download, steps_cached: (layers_total - n_layers_to_fetch) as u64, - steps: layers_bar.position(), + steps: current_layer_step, steps_total: n_layers_to_fetch as u64, subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(), - }).await; + }; + let _ = aggregator.send_event(event).await; } } } } - byte_bar.finish_and_clear(); - layers_bar.finish_and_clear(); - if let Err(e) = bar.clear() { - tracing::warn!("clearing bar: {e}"); - } + + // Finish progress aggregator + aggregator.finish(); + let end = std::time::Instant::now(); let elapsed = end.duration_since(start); let persec = total_read as f64 / elapsed.as_secs_f64(); let persec = indicatif::HumanBytes(persec as u64); - if let Err(e) = bar.println(&format!( - "Fetched layers: {} in {} ({}/s)", - indicatif::HumanBytes(total_read), - indicatif::HumanDuration(elapsed), - persec, - )) { - tracing::warn!("writing to stdout: {e}"); + + if !quiet { + println!( + "Fetched layers: {} in {} ({}/s)", + indicatif::HumanBytes(total_read), + indicatif::HumanDuration(elapsed), + persec, + ); } // Since the progress notifier closed, we know import has started diff --git a/crates/lib/src/install.rs b/crates/lib/src/install.rs index cef60d091..a33b3967f 100644 --- a/crates/lib/src/install.rs +++ b/crates/lib/src/install.rs @@ -53,6 +53,7 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "install-to-disk")] use self::baseline::InstallBlockDeviceOpts; use crate::boundimage::{BoundImage, ResolvedBoundImage}; +use crate::cli::ProgressOptions; use crate::containerenv::ContainerExecutionInfo; use crate::deploy::{prepare_for_pull, pull_from_prepared, PreparedImportMeta, PreparedPullResult}; use crate::lsm; @@ -242,6 +243,10 @@ pub(crate) struct InstallToDiskOpts { #[clap(long)] #[serde(default)] pub(crate) via_loopback: bool, + + #[clap(flatten)] + #[serde(flatten)] + pub(crate) progress: ProgressOptions, } #[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -317,6 +322,9 @@ pub(crate) struct InstallToFilesystemOpts { #[clap(flatten)] pub(crate) config_opts: InstallConfigOpts, + + #[clap(flatten)] + pub(crate) progress: ProgressOptions, } #[derive(Debug, Clone, clap::Parser, PartialEq, Eq)] @@ -348,6 +356,9 @@ pub(crate) struct InstallToExistingRootOpts { /// via e.g. `-v /:/target`. #[clap(default_value = ALONGSIDE_ROOT_MOUNT)] pub(crate) root_path: Utf8PathBuf, + + #[clap(flatten)] + pub(crate) progress: ProgressOptions, } /// Global state captured from the container. @@ -755,6 +766,7 @@ async fn install_container( root_setup: &RootSetup, sysroot: &ostree::Sysroot, has_ostree: bool, + prog: ProgressWriter, ) -> Result<(ostree::Deployment, InstallAleph)> { let sepolicy = state.load_policy()?; let sepolicy = sepolicy.as_ref(); @@ -793,15 +805,14 @@ async fn install_container( let repo = &sysroot.repo(); repo.set_disable_fsync(true); - let pulled_image = match prepare_for_pull(repo, &spec_imgref, Some(&state.target_imgref)) - .await? - { - PreparedPullResult::AlreadyPresent(existing) => existing, - PreparedPullResult::Ready(image_meta) => { - check_disk_space(root_setup.physical_root.as_fd(), &image_meta, &spec_imgref)?; - pull_from_prepared(&spec_imgref, false, ProgressWriter::default(), image_meta).await? - } - }; + let pulled_image = + match prepare_for_pull(repo, &spec_imgref, Some(&state.target_imgref)).await? { + PreparedPullResult::AlreadyPresent(existing) => existing, + PreparedPullResult::Ready(image_meta) => { + check_disk_space(root_setup.physical_root.as_fd(), &image_meta, &spec_imgref)?; + pull_from_prepared(&spec_imgref, false, prog, image_meta).await? + } + }; repo.set_disable_fsync(false); @@ -1335,10 +1346,11 @@ async fn install_with_sysroot( bound_images: BoundImages, has_ostree: bool, imgstore: &crate::imgstorage::Storage, + prog: ProgressWriter, ) -> Result<()> { // And actually set up the container in that root, returning a deployment and // the aleph state (see below). - let (_deployment, aleph) = install_container(state, rootfs, &sysroot, has_ostree).await?; + let (_deployment, aleph) = install_container(state, rootfs, &sysroot, has_ostree, prog).await?; // Write the aleph data that captures the system state at the time of provisioning for aid in future debugging. rootfs .physical_root @@ -1420,6 +1432,7 @@ async fn install_to_filesystem_impl( state: &State, rootfs: &mut RootSetup, cleanup: Cleanup, + prog: ProgressWriter, ) -> Result<()> { if matches!(state.selinux_state, SELinuxFinalState::ForceTargetDisabled) { rootfs.kargs.push("selinux=0".to_string()); @@ -1461,6 +1474,7 @@ async fn install_to_filesystem_impl( bound_images, has_ostree, &imgstore, + prog, ) .await?; @@ -1496,6 +1510,7 @@ fn installation_complete() { #[context("Installing to disk")] #[cfg(feature = "install-to-disk")] pub(crate) async fn install_to_disk(mut opts: InstallToDiskOpts) -> Result<()> { + let prog: ProgressWriter = opts.progress.try_into()?; let mut block_opts = opts.block_opts; let target_blockdev_meta = block_opts .device @@ -1538,7 +1553,7 @@ pub(crate) async fn install_to_disk(mut opts: InstallToDiskOpts) -> Result<()> { (rootfs, loopback_dev) }; - install_to_filesystem_impl(&state, &mut rootfs, Cleanup::Skip).await?; + install_to_filesystem_impl(&state, &mut rootfs, Cleanup::Skip, prog).await?; // Drop all data about the root except the bits we need to ensure any file descriptors etc. are closed. let (root_path, luksdev) = rootfs.into_storage(); @@ -1720,6 +1735,7 @@ pub(crate) async fn install_to_filesystem( targeting_host_root: bool, cleanup: Cleanup, ) -> Result<()> { + let prog: ProgressWriter = opts.progress.try_into()?; // Gather global state, destructuring the provided options. // IMPORTANT: We might re-execute the current process in this function (for SELinux among other things) // IMPORTANT: and hence anything that is done before MUST BE IDEMPOTENT. @@ -1924,7 +1940,7 @@ pub(crate) async fn install_to_filesystem( skip_finalize, }; - install_to_filesystem_impl(&state, &mut rootfs, cleanup).await?; + install_to_filesystem_impl(&state, &mut rootfs, cleanup, prog).await?; // Drop all data about the root except the path to ensure any file descriptors etc. are closed. drop(rootfs); @@ -1952,6 +1968,7 @@ pub(crate) async fn install_to_existing_root(opts: InstallToExistingRootOpts) -> source_opts: opts.source_opts, target_opts: opts.target_opts, config_opts: opts.config_opts, + progress: opts.progress, }; install_to_filesystem(opts, true, cleanup).await diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index 37bce1482..78daf74db 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -19,7 +19,9 @@ mod lints; mod lsm; pub(crate) mod metadata; mod podman; +mod progress_aggregator; mod progress_jsonl; +mod progress_renderer; mod reboot; mod reexec; pub mod spec; diff --git a/crates/lib/src/progress_aggregator.rs b/crates/lib/src/progress_aggregator.rs new file mode 100644 index 000000000..3c18a187f --- /dev/null +++ b/crates/lib/src/progress_aggregator.rs @@ -0,0 +1,162 @@ +//! Progress aggregator that implements JSON-first progress architecture +//! All progress flows through JSON events before being rendered or output + +use anyhow::Result; +use std::io::IsTerminal; + +use crate::progress_jsonl::{Event, ProgressWriter}; +use crate::progress_renderer::{ProgressFilter, ProgressRenderer}; + +/// Unified progress system that emits JSON events and optionally renders them visually +/// This implements the JSON-first architecture where indicatif becomes a consumer of JSON events +pub struct ProgressAggregator { + json_writer: Option, + renderer: Option, +} + +impl ProgressAggregator { + /// Create a new progress aggregator + pub fn new(json_writer: Option, visual_filter: Option) -> Self { + let renderer = if std::io::stderr().is_terminal() && visual_filter.is_some() { + Some(ProgressRenderer::new(visual_filter.unwrap())) + } else { + None + }; + + Self { + json_writer, + renderer, + } + } + + /// Send a progress event - this is the core method that implements JSON-first architecture + pub async fn send_event(&mut self, event: Event<'_>) -> Result<()> { + // 1. Always emit JSON first (if enabled) + if let Some(ref writer) = self.json_writer { + writer.send_lossy(event.clone()).await; + } + + // 2. Then render visually (if enabled) + if let Some(ref mut renderer) = self.renderer { + renderer.handle_event(&event)?; + } + + Ok(()) + } + + /// Finish all progress and clean up + pub fn finish(&mut self) { + if let Some(ref mut renderer) = self.renderer { + renderer.finish(); + } + } +} + +/// Helper to create progress aggregators for common use cases +pub struct ProgressAggregatorBuilder { + json_writer: Option, + visual_filter: Option, +} + +impl ProgressAggregatorBuilder { + pub fn new() -> Self { + Self { + json_writer: None, + visual_filter: None, + } + } + + /// Enable JSON output to the given writer + pub fn with_json(mut self, writer: ProgressWriter) -> Self { + self.json_writer = Some(writer); + self + } + + /// Enable visual progress with the given filter + pub fn with_visual(mut self, filter: ProgressFilter) -> Self { + self.visual_filter = Some(filter); + self + } + + /// Build the aggregator + pub fn build(self) -> ProgressAggregator { + ProgressAggregator::new(self.json_writer, self.visual_filter) + } +} + +impl Default for ProgressAggregatorBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::progress_jsonl::Event; + use std::borrow::Cow; + + #[tokio::test] + async fn test_json_first_architecture() -> Result<()> { + // Create an aggregator that outputs both JSON and visual progress + let mut aggregator = ProgressAggregatorBuilder::new() + .with_visual(ProgressFilter::All) + .build(); + + // Send a progress event + let event = Event::ProgressBytes { + task: Cow::Borrowed("test"), + description: Cow::Borrowed("Testing progress"), + id: Cow::Borrowed("test-id"), + bytes_cached: 0, + bytes: 50, + bytes_total: 100, + steps_cached: 0, + steps: 1, + steps_total: 2, + subtasks: vec![], + }; + + aggregator.send_event(event).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_task_filtering() -> Result<()> { + // Create an aggregator that only shows "pulling" tasks + let mut aggregator = ProgressAggregatorBuilder::new() + .with_visual(ProgressFilter::TasksMatching(vec!["pulling".to_string()])) + .build(); + + // Send a pulling event (should be shown) + let pulling_event = Event::ProgressBytes { + task: Cow::Borrowed("pulling"), + description: Cow::Borrowed("Pulling image"), + id: Cow::Borrowed("image:latest"), + bytes_cached: 0, + bytes: 25, + bytes_total: 100, + steps_cached: 0, + steps: 1, + steps_total: 3, + subtasks: vec![], + }; + + // Send an installing event (should be filtered out visually) + let installing_event = Event::ProgressSteps { + task: Cow::Borrowed("installing"), + description: Cow::Borrowed("Installing package"), + id: Cow::Borrowed("package"), + steps_cached: 0, + steps: 1, + steps_total: 5, + subtasks: vec![], + }; + + aggregator.send_event(pulling_event).await?; + aggregator.send_event(installing_event).await?; + + Ok(()) + } +} diff --git a/crates/lib/src/progress_jsonl.rs b/crates/lib/src/progress_jsonl.rs index 4b719b170..e49d5234d 100644 --- a/crates/lib/src/progress_jsonl.rs +++ b/crates/lib/src/progress_jsonl.rs @@ -4,7 +4,7 @@ use anyhow::Result; use canon_json::CanonJsonSerialize; use schemars::JsonSchema; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::os::fd::{FromRawFd, OwnedFd, RawFd}; use std::str::FromStr; @@ -137,7 +137,7 @@ pub enum Event<'t> { }, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub(crate) struct RawProgressFd(RawFd); impl FromStr for RawProgressFd { @@ -153,6 +153,13 @@ impl FromStr for RawProgressFd { } } +impl RawProgressFd { + #[cfg(test)] + pub(crate) fn get_raw_fd(&self) -> RawFd { + self.0 + } +} + #[derive(Debug)] struct ProgressWriterInner { /// true if we sent the initial Start message diff --git a/crates/lib/src/progress_renderer.rs b/crates/lib/src/progress_renderer.rs new file mode 100644 index 000000000..321a79125 --- /dev/null +++ b/crates/lib/src/progress_renderer.rs @@ -0,0 +1,383 @@ +//! Progress renderer that consumes JSON events and displays them via indicatif +//! This implements the JSON-first architecture where all progress flows through +//! JSON events before being rendered visually. + +use anyhow::Result; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use crate::progress_jsonl::{Event, SubTaskBytes, SubTaskStep}; + +#[derive(Debug, Clone)] +pub enum ProgressFilter { + /// Show all progress events + All, + /// Only show tasks matching these patterns + TasksMatching(Vec), +} + +impl Default for ProgressFilter { + fn default() -> Self { + Self::All + } +} + +/// Renders JSON progress events as indicatif progress bars +/// This bridges the gap between stateless JSON events and stateful visual display +pub struct ProgressRenderer { + multi: MultiProgress, + bars: HashMap, + subtask_bars: HashMap, + active_tasks: HashSet, + current_task_type: Option, + filter: ProgressFilter, + + // Style templates + steps_style: ProgressStyle, + subtask_style: ProgressStyle, +} + +impl ProgressRenderer { + pub fn new(filter: ProgressFilter) -> Self { + let multi = MultiProgress::new(); + + let steps_style = ProgressStyle::with_template( + "{prefix} {bar} {pos}/{len} {wide_msg}", + ) + .unwrap(); + + // Match the old indicatif styling for byte progress with indentation + let subtask_style = ProgressStyle::with_template( + " └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}", + ) + .unwrap(); + + Self { + multi, + bars: HashMap::new(), + subtask_bars: HashMap::new(), + active_tasks: HashSet::new(), + current_task_type: None, + filter, + steps_style, + subtask_style, + } + } + + /// Process a JSON progress event and update the visual display + pub fn handle_event(&mut self, event: &Event<'_>) -> Result<()> { + match event { + Event::Start { .. } => { + // Reset state on start + self.clear_all_bars(); + } + Event::ProgressBytes { + task, + description, + id, + bytes, + bytes_total, + steps, + steps_total, + subtasks, + .. + } => { + if self.should_render_task(task) { + self.ensure_clean_context(task); + self.update_bytes_progress( + task, + description, + id, + *bytes, + *bytes_total, + *steps, + *steps_total, + subtasks, + )?; + } + } + Event::ProgressSteps { + task, + description, + id, + steps, + steps_total, + subtasks, + .. + } => { + if self.should_render_task(task) { + self.ensure_clean_context(task); + self.update_steps_progress( + task, + description, + id, + *steps, + *steps_total, + subtasks, + )?; + } + } + } + Ok(()) + } + + fn should_render_task(&self, task: &str) -> bool { + match &self.filter { + ProgressFilter::All => true, + ProgressFilter::TasksMatching(patterns) => { + patterns.iter().any(|pattern| task.contains(pattern)) + } + } + } + + fn ensure_clean_context(&mut self, task: &str) { + // For all filters, just track the current task type + self.current_task_type = Some(task.to_string()); + } + + fn update_bytes_progress( + &mut self, + task: &str, + _description: &str, + id: &str, + _bytes: u64, + _bytes_total: u64, + steps: u64, + steps_total: u64, + subtasks: &[SubTaskBytes<'_>], + ) -> Result<()> { + let bar_id = format!("{}:{}", task, id); + + // Check if we need to create a new bar + let needs_new_bar = !self.bars.contains_key(&bar_id); + if needs_new_bar { + let pb = self.multi.add(ProgressBar::new(steps_total.max(1))); + pb.set_style(self.steps_style.clone()); + // Match old format: set prefix to "Fetching layers" for pulling tasks + if task == "pulling" { + pb.set_prefix("Fetching layers"); + } else { + pb.set_prefix(task.to_string()); + } + pb.set_message(""); + self.active_tasks.insert(bar_id.clone()); + self.bars.insert(bar_id.clone(), pb); + } + + // Get the bar and update it + let bar = self.bars.get(&bar_id).unwrap(); + + // Update main progress - use steps for the main bar + if steps_total > 0 { + bar.set_length(steps_total); + } + bar.set_position(steps); + + // Update or create subtask bars + self.update_subtask_bars(subtasks, &bar_id)?; + + // Mark as complete if done - match old behavior by finishing and clearing + if steps >= steps_total { + if let Some(bar) = self.bars.get(&bar_id) { + bar.finish_and_clear(); + } + } + + Ok(()) + } + + fn update_steps_progress( + &mut self, + task: &str, + description: &str, + id: &str, + steps: u64, + steps_total: u64, + subtasks: &[SubTaskStep<'_>], + ) -> Result<()> { + let bar_id = format!("{}:{}", task, id); + + // Check if we need to create a new bar + let needs_new_bar = !self.bars.contains_key(&bar_id); + if needs_new_bar { + let pb = self.multi.add(ProgressBar::new(steps_total.max(1))); + pb.set_style(self.steps_style.clone()); + pb.set_message(format!("{} ({})", description, task)); + self.active_tasks.insert(bar_id.clone()); + self.bars.insert(bar_id.clone(), pb); + } + + // Get the bar and update it + let bar = self.bars.get(&bar_id).unwrap(); + + // Update main progress + bar.set_length(steps_total); + bar.set_position(steps); + + // Update subtask display (for steps, we just show completion status) + for subtask in subtasks { + let status = if subtask.completed { "✓" } else { "◯" }; + let subtask_id = format!("{}:subtask:{}", bar_id, subtask.id); + + if let Some(subtask_bar) = self.subtask_bars.get(&subtask_id) { + if subtask.completed { + subtask_bar.finish_with_message(format!(" ✓ {}", subtask.description)); + } + } else if !subtask.completed { + // Create a spinner for active subtasks + let subtask_bar = self.multi.insert_after(&bar, ProgressBar::new_spinner()); + subtask_bar + .set_style(ProgressStyle::with_template(" {spinner:.yellow} {msg}").unwrap()); + subtask_bar.set_message(format!("{} {}", status, subtask.description)); + subtask_bar.enable_steady_tick(Duration::from_millis(120)); + self.subtask_bars.insert(subtask_id, subtask_bar); + } + } + + // Mark as complete if done - match old behavior by finishing and clearing + if steps >= steps_total { + if let Some(bar) = self.bars.get(&bar_id) { + bar.finish_and_clear(); + } + } + + Ok(()) + } + + fn update_subtask_bars( + &mut self, + subtasks: &[SubTaskBytes<'_>], + parent_id: &str, + ) -> Result<()> { + let parent_bar = self.bars.get(parent_id).cloned(); + + for subtask in subtasks { + let subtask_id = format!("{}:subtask:{}", parent_id, subtask.id); + + let subtask_bar = self + .subtask_bars + .entry(subtask_id.clone()) + .or_insert_with(|| { + let pb = if let Some(ref parent) = parent_bar { + self.multi + .insert_after(parent, ProgressBar::new(subtask.bytes_total.max(1))) + } else { + self.multi.add(ProgressBar::new(subtask.bytes_total.max(1))) + }; + pb.set_style(self.subtask_style.clone()); + // Match old format: set prefix to "Fetching" for byte progress + pb.set_prefix("Fetching"); + pb.set_message(format!("{} {}", subtask.description, subtask.id)); + pb + }); + + // Update subtask progress + if subtask.bytes_total > 0 { + subtask_bar.set_length(subtask.bytes_total); + } + subtask_bar.set_position(subtask.bytes); + + // Mark as complete if done - but don't show checkmarks, just clear like the old version + if subtask.bytes_total > 0 && subtask.bytes >= subtask.bytes_total { + subtask_bar.finish_and_clear(); + } + } + + Ok(()) + } + + fn clear_all_bars(&mut self) { + // Finish all active bars + for bar in self.bars.values() { + bar.finish_and_clear(); + } + for bar in self.subtask_bars.values() { + bar.finish_and_clear(); + } + + self.bars.clear(); + self.subtask_bars.clear(); + self.active_tasks.clear(); + } + + /// Finish and clean up all progress bars + pub fn finish(&mut self) { + self.clear_all_bars(); + // Clear the multi-progress like the old version did + if let Err(e) = self.multi.clear() { + tracing::warn!("clearing progress bars: {e}"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::progress_jsonl::{Event, SubTaskBytes}; + use std::borrow::Cow; + + #[test] + fn test_filter_matching() { + let renderer = + ProgressRenderer::new(ProgressFilter::TasksMatching(vec!["pulling".to_string()])); + + // Should render pulling tasks + assert!(renderer.should_render_task("pulling")); + assert!(renderer.should_render_task("pulling_layers")); + + // Should not render other tasks + assert!(!renderer.should_render_task("installing")); + assert!(!renderer.should_render_task("upgrading")); + } + + #[test] + fn test_all_tasks() { + let mut renderer = ProgressRenderer::new(ProgressFilter::All); + + // All tasks should be renderable + assert!(renderer.should_render_task("pulling")); + assert!(renderer.should_render_task("installing")); + + // But ensure_clean_context should track current task + renderer.ensure_clean_context("pulling"); + assert_eq!(renderer.current_task_type, Some("pulling".to_string())); + + renderer.ensure_clean_context("installing"); + assert_eq!(renderer.current_task_type, Some("installing".to_string())); + } + + #[tokio::test] + async fn test_progress_rendering() -> Result<()> { + let mut renderer = ProgressRenderer::new(ProgressFilter::All); + + // Test bytes progress + let event = Event::ProgressBytes { + task: Cow::Borrowed("pulling"), + description: Cow::Borrowed("Pulling container image"), + id: Cow::Borrowed("example.com/image:latest"), + bytes_cached: 0, + bytes: 1024, + bytes_total: 4096, + steps_cached: 0, + steps: 1, + steps_total: 3, + subtasks: vec![SubTaskBytes { + subtask: Cow::Borrowed("layer"), + description: Cow::Borrowed("Layer"), + id: Cow::Borrowed("sha256:abc123"), + bytes_cached: 0, + bytes: 512, + bytes_total: 1024, + }], + }; + + renderer.handle_event(&event)?; + + // Verify bars were created + assert!(!renderer.bars.is_empty()); + assert!(!renderer.subtask_bars.is_empty()); + + Ok(()) + } +}