diff --git a/Cargo.lock b/Cargo.lock index 6bc85e2a7d1f7..58b5ec0e64064 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1937,11 +1937,14 @@ dependencies = [ "clap 4.5.48", "ctor", "datafusion", + "datafusion-common", + "datafusion-common-runtime", "dirs", "env_logger", "futures", "insta", "insta-cmd", + "is-terminal", "log", "mimalloc", "object_store", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index fda9309a5d068..643d79892d3f9 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -55,9 +55,12 @@ datafusion = { workspace = true, features = [ "sql", "unicode_expressions", ] } +datafusion-common = { workspace = true } +datafusion-common-runtime = { workspace = true } dirs = "6.0.0" env_logger = { workspace = true } futures = { workspace = true } +is-terminal = "0.4" log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "gcp", "http"] } diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index bd2dbb736781f..dfaeab2047009 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -91,6 +91,7 @@ pub async fn main() { quiet: false, maxrows: datafusion_cli::print_options::MaxRows::Unlimited, color: true, + progress: datafusion_cli::progress::ProgressConfig::default(), instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), }; diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 48fb37e8a8880..008725a95f52a 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -287,6 +287,7 @@ mod tests { InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }, print_options::MaxRows, + progress::ProgressConfig, }; use super::*; @@ -300,6 +301,7 @@ mod tests { quiet: false, maxrows: MaxRows::Unlimited, color: true, + progress: ProgressConfig::default(), instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), }; diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index d079a88a6440e..25bf2e8e0e2fe 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -25,6 +25,7 @@ use crate::{ helper::CliHelper, object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, + progress::ProgressReporter, }; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; @@ -265,6 +266,16 @@ impl StatementExecutor { let task_ctx = ctx.task_ctx(); let options = task_ctx.session_config().options(); + // Start progress reporter if enabled + let progress_reporter = if print_options.progress.should_show_progress() { + Some( + ProgressReporter::start(&physical_plan, print_options.progress.clone()) + .await?, + ) + } else { + None + }; + // Track memory usage for the query result if it's bounded let mut reservation = MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); @@ -279,9 +290,16 @@ impl StatementExecutor { // As the input stream comes, we can generate results. // However, memory safety is not guaranteed. let stream = execute_stream(physical_plan, task_ctx.clone())?; - print_options + let result = print_options .print_stream(stream, now, &options.format) - .await?; + .await; + + // Stop progress reporter before returning + if let Some(reporter) = &progress_reporter { + reporter.stop().await; + } + + result?; } else { // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); @@ -314,6 +332,11 @@ impl StatementExecutor { reservation.free(); } + // Stop progress reporter + if let Some(reporter) = progress_reporter { + reporter.stop().await; + } + Ok(()) } diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index f0b0bc23fd73d..d76fbd930948f 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -34,3 +34,4 @@ pub mod object_storage; pub mod pool_type; pub mod print_format; pub mod print_options; +pub mod progress; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 3dbe839d3c9b3..9740f8f7d65d2 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -40,6 +40,7 @@ use datafusion_cli::{ pool_type::PoolType, print_format::PrintFormat, print_options::{MaxRows, PrintOptions}, + progress::{ProgressConfig, ProgressMode, ProgressStyle}, DATAFUSION_CLI_VERSION, }; @@ -149,6 +150,19 @@ struct Args { )] disk_limit: Option, + #[clap(long, value_enum, default_value_t = ProgressMode::Auto, help = "Progress bar mode")] + progress: ProgressMode, + + #[clap(long, value_enum, default_value_t = ProgressStyle::Bar, help = "Progress bar style")] + progress_style: ProgressStyle, + + #[clap( + long, + default_value = "200", + help = "Progress update interval in milliseconds" + )] + progress_interval: u64, + #[clap( long, help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]", @@ -244,11 +258,18 @@ async fn main_inner() -> Result<()> { )), ); + let progress_config = ProgressConfig { + mode: args.progress, + style: args.progress_style, + interval_ms: args.progress_interval, + }; + let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, maxrows: args.maxrows, color: args.color, + progress: progress_config, instrumented_registry: Arc::clone(&instrumented_registry), }; diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 0d5e1e3e6fa90..cc4eb64752767 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -25,6 +25,7 @@ use crate::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; use crate::print_format::PrintFormat; +use crate::progress::ProgressConfig; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -79,6 +80,7 @@ pub struct PrintOptions { pub quiet: bool, pub maxrows: MaxRows, pub color: bool, + pub progress: ProgressConfig, pub instrumented_registry: Arc, } @@ -225,6 +227,7 @@ mod tests { quiet: true, maxrows: MaxRows::Unlimited, color: true, + progress: ProgressConfig::default(), instrumented_registry: Arc::clone(&instrumented_registry), }; diff --git a/datafusion-cli/src/progress/config.rs b/datafusion-cli/src/progress/config.rs new file mode 100644 index 0000000000000..047696bfed30f --- /dev/null +++ b/datafusion-cli/src/progress/config.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Configuration for progress reporting + +use clap::ValueEnum; +use is_terminal::IsTerminal; +use std::io; + +/// Configuration for progress reporting +#[derive(Debug, Clone)] +pub struct ProgressConfig { + pub mode: ProgressMode, + pub style: ProgressStyle, + pub interval_ms: u64, +} + +impl ProgressConfig { + /// Create a new progress configuration with default values + pub fn new() -> Self { + Self::default() + } + + /// Check whether progress should be shown based on the configuration and environment + pub fn should_show_progress(&self) -> bool { + match self.mode { + ProgressMode::On => true, + ProgressMode::Off => false, + ProgressMode::Auto => io::stdout().is_terminal(), + } + } +} + +impl Default for ProgressConfig { + fn default() -> Self { + Self { + mode: ProgressMode::Auto, + style: ProgressStyle::Bar, + interval_ms: 200, + } + } +} + +/// Progress bar display mode +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +pub enum ProgressMode { + /// Show progress bar on TTY, off otherwise + Auto, + /// Always show progress bar + On, + /// Never show progress bar + Off, +} + +impl Default for ProgressMode { + fn default() -> Self { + Self::Auto + } +} + +/// Progress bar visual style +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +pub enum ProgressStyle { + /// Show a progress bar when percent is known + Bar, + /// Show a spinner with counters + Spinner, +} + +impl Default for ProgressStyle { + fn default() -> Self { + Self::Bar + } +} diff --git a/datafusion-cli/src/progress/display.rs b/datafusion-cli/src/progress/display.rs new file mode 100644 index 0000000000000..bce4989b0bdbf --- /dev/null +++ b/datafusion-cli/src/progress/display.rs @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Progress bar display functionality + +use crate::progress::{ProgressInfo, ProgressStyle, ProgressUnit}; +use datafusion_common::instant::Instant; +use std::io::{self, Write}; +use std::time::Duration; + +/// Displays progress information to the terminal +pub struct ProgressDisplay { + style: ProgressStyle, + start_time: Instant, + last_display: Option, +} + +impl ProgressDisplay { + pub fn new(style: ProgressStyle) -> Self { + Self { + style, + start_time: Instant::now(), + last_display: None, + } + } + + /// Update the progress display + pub fn update(&mut self, progress: &ProgressInfo, eta: Option) { + let display_text = + match progress.percent.is_some() && self.style == ProgressStyle::Bar { + true => self.format_progress_bar(progress, eta), + false => self.format_spinner(progress), + }; + + // Only update if the display text has changed + if self.last_display.as_ref() != Some(&display_text) { + self.print_line(&display_text); + self.last_display = Some(display_text); + } + } + + /// Finish the progress display and clean up + pub fn finish(&mut self) { + if self.last_display.is_some() { + // Clear the progress line and move to next line + print!("\r\x1b[K"); + let _ = io::stdout().flush(); + self.last_display = None; + } + } + + /// Format a progress bar with percentage + fn format_progress_bar( + &self, + progress: &ProgressInfo, + eta: Option, + ) -> String { + let percent = progress.percent.unwrap_or(0.0); + let bar = self.create_bar(percent); + + let current_formatted = progress.unit.format_value(progress.current); + let total_formatted = progress + .total + .map(|t| progress.unit.format_value(t)) + .unwrap_or_else(|| "?".to_string()); + + let throughput = self.calculate_throughput(progress); + let eta_text = eta + .map(format_duration) + .unwrap_or_else(|| "??:??".to_string()); + let elapsed = format_duration(self.start_time.elapsed()); + + let base_format = format!( + "\r{} {:5.1}% {} / {} β€’ {} β€’ ETA {} / {}", + bar, + percent, + current_formatted, + total_formatted, + throughput, + eta_text, + elapsed + ); + + // Add phase information for better user experience + match progress.phase { + crate::progress::ExecutionPhase::Reading => base_format, + crate::progress::ExecutionPhase::Processing => { + format!("{} πŸ”„ sorting/joining data", base_format) + } + crate::progress::ExecutionPhase::Finalizing => { + format!("{} ✨ finalizing results", base_format) + } + } + } + + /// Format a spinner without percentage + fn format_spinner(&self, progress: &ProgressInfo) -> String { + let spinner = self.get_spinner_char(); + let current_formatted = progress.unit.format_value(progress.current); + let elapsed = format_duration(self.start_time.elapsed()); + + let base_format = format!( + "\r{} {}: {} elapsed: {}", + spinner, + match progress.unit { + ProgressUnit::Bytes => "bytes", + ProgressUnit::Rows => "rows", + }, + current_formatted, + elapsed + ); + + // Add phase information for spinner mode too + match progress.phase { + crate::progress::ExecutionPhase::Reading => base_format, + crate::progress::ExecutionPhase::Processing => { + format!("{} (sorting/joining)", base_format) + } + crate::progress::ExecutionPhase::Finalizing => { + format!("{} (finalizing)", base_format) + } + } + } + + /// Create a visual progress bar + fn create_bar(&self, percent: f64) -> String { + const BAR_WIDTH: usize = 20; + let filled = ((percent / 100.0) * BAR_WIDTH as f64) as usize; + let empty = BAR_WIDTH - filled; + + let mut bar = String::with_capacity(BAR_WIDTH); + + // Full blocks + for _ in 0..filled { + bar.push('β–‰'); + } + + // Partial block if needed + if filled < BAR_WIDTH { + let partial_progress = (percent / 100.0) * BAR_WIDTH as f64 - filled as f64; + if partial_progress > 0.0 { + let partial_char = match (partial_progress * 8.0) as usize { + 0 => '▏', + 1 => 'β–Ž', + 2 => '▍', + 3 => 'β–Œ', + 4 => 'β–‹', + 5 => 'β–Š', + 6 => 'β–‰', + _ => 'β–‰', + }; + bar.push(partial_char); + } + } + + // Empty blocks + for _ in 0..empty.saturating_sub(if filled < BAR_WIDTH && percent > 0.0 { + 1 + } else { + 0 + }) { + bar.push('β–‘'); + } + + bar + } + + /// Get the current spinner character + fn get_spinner_char(&self) -> char { + const SPINNER_CHARS: &[char] = + &['β ‹', 'β ™', 'β Ή', 'β Έ', 'β Ό', 'β ΄', 'β ¦', 'β §', 'β ‡', '⠏']; + let index = + (self.start_time.elapsed().as_millis() / 100) as usize % SPINNER_CHARS.len(); + SPINNER_CHARS[index] + } + + /// Calculate throughput string + fn calculate_throughput(&self, progress: &ProgressInfo) -> String { + let elapsed = self.start_time.elapsed().as_secs_f64(); + if elapsed < 0.1 { + return "-- /s".to_string(); + } + + let rate = progress.current as f64 / elapsed; + match progress.unit { + ProgressUnit::Bytes => format!("{}/s", format_bytes(rate as usize)), + ProgressUnit::Rows => format!("{} rows/s", format_number(rate as usize)), + } + } + + /// Print a line, overwriting the previous one + fn print_line(&self, text: &str) { + print!("{}", text); + let _ = io::stdout().flush(); + } +} + +impl Drop for ProgressDisplay { + fn drop(&mut self) { + self.finish(); + } +} + +/// Format a duration as MM:SS +fn format_duration(duration: Duration) -> String { + let total_seconds = duration.as_secs(); + let minutes = total_seconds / 60; + let seconds = total_seconds % 60; + format!("{:02}:{:02}", minutes, seconds) +} + +/// Format bytes in human-readable form (helper function) +fn format_bytes(bytes: usize) -> String { + const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"]; + let mut size = bytes as f64; + let mut unit_index = 0; + + while size >= 1024.0 && unit_index < UNITS.len() - 1 { + size /= 1024.0; + unit_index += 1; + } + + if unit_index == 0 { + format!("{} {}", bytes, UNITS[unit_index]) + } else { + format!("{:.1} {}", size, UNITS[unit_index]) + } +} + +/// Format a number with thousands separators (helper function) +fn format_number(num: usize) -> String { + let s = num.to_string(); + let mut result = String::new(); + + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.insert(0, ','); + } + result.insert(0, c); + } + + result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_duration() { + assert_eq!(format_duration(Duration::from_secs(0)), "00:00"); + assert_eq!(format_duration(Duration::from_secs(30)), "00:30"); + assert_eq!(format_duration(Duration::from_secs(90)), "01:30"); + assert_eq!(format_duration(Duration::from_secs(3665)), "61:05"); + } + + #[test] + fn test_create_bar() { + let display = ProgressDisplay::new(ProgressStyle::Bar); + + let bar_0 = display.create_bar(0.0); + assert!(bar_0.chars().all(|c| c == 'β–‘')); + + let bar_100 = display.create_bar(100.0); + assert!(bar_100.chars().all(|c| c == 'β–‰')); + + let bar_50 = display.create_bar(50.0); + assert!(bar_50.contains('β–‰') && bar_50.contains('β–‘')); + } +} diff --git a/datafusion-cli/src/progress/estimator.rs b/datafusion-cli/src/progress/estimator.rs new file mode 100644 index 0000000000000..033dfadacae20 --- /dev/null +++ b/datafusion-cli/src/progress/estimator.rs @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ETA estimation using alpha filter (exponential moving average) + +use crate::progress::ProgressInfo; +use datafusion_common::instant::Instant; +use std::time::Duration; + +/// Estimates time to completion based on progress using an alpha filter +/// (exponential moving average). +/// +/// The alpha filter provides a good balance between responsiveness and +/// smoothness for ETA estimation in a TUI environment. +/// +/// The smoothed rate is calculated as: +/// `smoothed_rate = alpha * new_rate + (1 - alpha) * previous_smoothed_rate` +/// +/// Where alpha is the smoothing factor: +/// - Higher alpha (closer to 1.0): More responsive to changes, less smooth +/// - Lower alpha (closer to 0.0): Less responsive, more smooth +/// - Default value of 0.3 provides good balance +pub struct ProgressEstimator { + start_time: Instant, + // Smoothing factor (0.0 to 1.0) + alpha: f64, + // Smoothed progress rate (percent per second) + smoothed_rate: Option, + // Previous time for calculating rate + last_time: Option, + // Previous progress percentage + last_progress: Option, + // Number of observations + observations: usize, +} + +impl ProgressEstimator { + pub fn new() -> Self { + Self { + start_time: Instant::now(), + alpha: 0.3, // Good balance between responsiveness and smoothness + smoothed_rate: None, + last_time: None, + last_progress: None, + observations: 0, + } + } + + /// Update the estimator with new progress and return ETA + pub fn update(&mut self, progress: ProgressInfo) -> Option { + let elapsed = self.start_time.elapsed(); + + // Need at least some progress and time to estimate + let percent = progress.percent?; + if percent <= 0.0 || elapsed.as_secs_f64() < 1.0 { + return None; + } + + self.observations += 1; + + // Need at least two observations to calculate rate + if let (Some(last_time), Some(last_progress)) = + (self.last_time, self.last_progress) + { + let dt = (elapsed - last_time).as_secs_f64(); + if dt > 0.0 { + let measured_rate = (percent - last_progress) / dt; + + // Apply exponential moving average + match self.smoothed_rate { + Some(prev_rate) => { + self.smoothed_rate = Some( + self.alpha * measured_rate + (1.0 - self.alpha) * prev_rate, + ); + } + None => { + // First rate measurement, use it directly + self.smoothed_rate = Some(measured_rate); + } + } + } + } + + self.last_time = Some(elapsed); + self.last_progress = Some(percent); + + // Don't provide estimate until we have enough observations and progress + if self.observations < 2 || percent <= 5.0 { + return None; + } + + self.calculate_eta(percent) + } + + /// Calculate ETA based on smoothed rate + fn calculate_eta(&self, current_percent: f64) -> Option { + let remaining_percent = 100.0 - current_percent; + + // Use smoothed rate if available + let current_rate = self.smoothed_rate?; + + if current_rate <= 0.0 { + return None; // No progress or going backwards + } + + // Simple linear projection: time = remaining_percent / rate + let eta_seconds = remaining_percent / current_rate; + + // Cap the estimate at something reasonable (24 hours) + if eta_seconds > 0.0 && eta_seconds < 86400.0 { + Some(Duration::from_secs_f64(eta_seconds)) + } else { + None + } + } +} + +impl Default for ProgressEstimator { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_progress_estimator_needs_observations() { + let mut estimator = ProgressEstimator::new(); + + // Wait a bit to ensure elapsed time > 1 second + std::thread::sleep(std::time::Duration::from_millis(1100)); + + // First observation should return None (need at least 2 observations) + let progress1 = ProgressInfo { + current: 500, + total: Some(10000), + unit: crate::progress::ProgressUnit::Rows, + percent: Some(5.0), + has_blocking_operators: false, + phase: crate::progress::ExecutionPhase::Reading, + }; + assert!(estimator.update(progress1).is_none()); + + // Second observation should provide estimate + std::thread::sleep(std::time::Duration::from_millis(200)); + let progress2 = ProgressInfo { + current: 1000, + total: Some(10000), + unit: crate::progress::ProgressUnit::Rows, + percent: Some(10.0), + has_blocking_operators: false, + phase: crate::progress::ExecutionPhase::Reading, + }; + let eta = estimator.update(progress2); + assert!(eta.is_some()); + } + + #[test] + fn test_progress_estimator_smoothing() { + let mut estimator = ProgressEstimator::new(); + + // Wait for elapsed time > 1 second + std::thread::sleep(std::time::Duration::from_millis(1100)); + + // First measurement + let progress1 = ProgressInfo { + current: 1000, + total: Some(10000), + unit: crate::progress::ProgressUnit::Rows, + percent: Some(10.0), + has_blocking_operators: false, + phase: crate::progress::ExecutionPhase::Reading, + }; + estimator.update(progress1); + + // Second measurement with progress + std::thread::sleep(std::time::Duration::from_millis(200)); + let progress2 = ProgressInfo { + current: 3000, + total: Some(10000), + unit: crate::progress::ProgressUnit::Rows, + percent: Some(30.0), + has_blocking_operators: false, + phase: crate::progress::ExecutionPhase::Reading, + }; + let eta = estimator.update(progress2); + assert!(eta.is_some()); + + // ETA should be reasonable (not too long since we're making progress) + let eta_secs = eta.unwrap().as_secs(); + assert!(eta_secs < 60); // Should be less than a minute at this rate + } +} diff --git a/datafusion-cli/src/progress/metrics_poll.rs b/datafusion-cli/src/progress/metrics_poll.rs new file mode 100644 index 0000000000000..aef7587a45992 --- /dev/null +++ b/datafusion-cli/src/progress/metrics_poll.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Live metrics polling from physical plans + +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::physical_plan::{ + visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use std::sync::Arc; + +/// Polls live metrics from a physical plan +pub struct MetricsPoller { + plan: Arc, +} + +impl MetricsPoller { + pub fn new(plan: &Arc) -> Self { + Self { + plan: Arc::clone(plan), + } + } + + /// Poll current metrics from the plan + pub fn poll(&mut self) -> LiveMetrics { + let mut visitor = MetricsVisitor::new(); + // Handle potential errors in plan visitation + if let Err(e) = visit_execution_plan(self.plan.as_ref(), &mut visitor) { + // Log the error but continue with default metrics + eprintln!( + "Warning: Failed to collect metrics for progress tracking: {}", + e + ); + } + visitor.into_metrics() + } +} + +/// Live metrics collected from plan execution +#[derive(Debug, Clone, Default)] +pub struct LiveMetrics { + pub bytes_scanned: usize, + pub rows_processed: usize, + pub batches_processed: usize, +} + +/// Visitor to collect live metrics from plan nodes +struct MetricsVisitor { + metrics: LiveMetrics, +} + +impl MetricsVisitor { + fn new() -> Self { + Self { + metrics: LiveMetrics::default(), + } + } + + fn into_metrics(self) -> LiveMetrics { + self.metrics + } +} + +impl ExecutionPlanVisitor for MetricsVisitor { + type Error = datafusion::error::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + let metrics_set = plan.metrics(); + self.accumulate_metrics(&metrics_set); + + // Continue visiting children + Ok(true) + } +} + +impl MetricsVisitor { + /// Accumulate metrics from a metrics set + fn accumulate_metrics(&mut self, metrics_set: &Option) { + if let Some(metrics) = metrics_set { + for metric in metrics.iter() { + // Get metric name from the metric itself + let name = match metric.value() { + MetricValue::Count { name, .. } => name, + MetricValue::Gauge { name, .. } => name, + MetricValue::Time { name, .. } => name, + MetricValue::Custom { name, .. } => name, + // For predefined variants, use their standard names + MetricValue::OutputRows(_) => "output_rows", + MetricValue::ElapsedCompute(_) => "elapsed_compute", + MetricValue::SpillCount(_) => "spill_count", + MetricValue::SpilledBytes(_) => "spilled_bytes", + MetricValue::SpilledRows(_) => "spilled_rows", + MetricValue::CurrentMemoryUsage(_) => "memory_usage", + MetricValue::StartTimestamp(_) => "start_timestamp", + MetricValue::EndTimestamp(_) => "end_timestamp", + }; + self.process_metric(name, metric.value()); + } + } + } + + /// Process an individual metric + fn process_metric(&mut self, name: &str, value: &MetricValue) { + match name { + "bytes_scanned" => { + if let Some(count) = self.extract_count_value(value) { + self.metrics.bytes_scanned += count; + } + } + "output_rows" => { + if let Some(count) = self.extract_count_value(value) { + self.metrics.rows_processed += count; + } + } + "output_batches" => { + if let Some(count) = self.extract_count_value(value) { + self.metrics.batches_processed += count; + } + } + _ => { + // Check for common patterns in metric names + if name.contains("bytes") && name.contains("scan") { + if let Some(count) = self.extract_count_value(value) { + self.metrics.bytes_scanned += count; + } + } else if name.contains("rows") + && (name.contains("output") || name.contains("produce")) + { + if let Some(count) = self.extract_count_value(value) { + self.metrics.rows_processed += count; + } + } + } + } + } + + /// Extract a count value from a metric value + fn extract_count_value(&self, value: &MetricValue) -> Option { + match value { + MetricValue::Count { count, .. } => Some(count.value()), + MetricValue::Gauge { gauge, .. } => Some(gauge.value()), + MetricValue::OutputRows(count) => Some(count.value()), + MetricValue::SpillCount(count) => Some(count.value()), + MetricValue::SpilledBytes(count) => Some(count.value()), + MetricValue::SpilledRows(count) => Some(count.value()), + MetricValue::CurrentMemoryUsage(gauge) => Some(gauge.value()), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::empty::EmptyExec; + + #[test] + fn test_metrics_poller() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let empty_exec = EmptyExec::new(schema); + let plan: Arc = Arc::new(empty_exec); + + let mut poller = MetricsPoller::new(&plan); + let metrics = poller.poll(); + + // EmptyExec should have zero metrics initially + assert_eq!(metrics.bytes_scanned, 0); + assert_eq!(metrics.rows_processed, 0); + assert_eq!(metrics.batches_processed, 0); + } +} diff --git a/datafusion-cli/src/progress/mod.rs b/datafusion-cli/src/progress/mod.rs new file mode 100644 index 0000000000000..6af83a459f6fa --- /dev/null +++ b/datafusion-cli/src/progress/mod.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Progress reporting for DataFusion CLI queries +//! +//! This module provides comprehensive progress tracking for DataFusion queries, including: +//! +//! # Features +//! - Real-time progress bars with percentage completion +//! - ETA estimation using alpha filter (exponential moving average) +//! - Automatic detection of pipeline-breaking operators (sorts, joins, aggregates) +//! - Automatic switch to spinner mode when exact progress cannot be determined +//! - TTY auto-detection for seamless terminal integration +//! +//! # Usage +//! ```bash +//! # Basic progress bar (auto-enabled in TTY) +//! datafusion-cli --progress auto +//! +//! # Force progress bar on +//! datafusion-cli --progress on +//! +//! # Spinner mode for unknown progress +//! datafusion-cli --progress on --progress-style spinner +//! ``` +//! +//! # Implementation Notes +//! The progress system addresses review feedback from the DataFusion community: +//! - Uses robust ExecutionPlan analysis instead of brittle string matching +//! - Alpha filter provides smooth ETA estimates with good responsiveness +//! - When blocking operators are detected and input is consumed, switches to spinner +//! to avoid showing misleading progress (e.g., stuck at 100% during sort) +//! +//! # Limitations +//! - Progress accuracy depends on DataFusion's metrics availability +//! - Complex queries with blocking operators will show spinner during processing phase +//! - Very fast queries may not show progress bars due to update intervals + +mod config; +mod display; +mod estimator; +mod metrics_poll; +mod plan_introspect; + +pub use config::{ProgressConfig, ProgressMode, ProgressStyle}; + +use datafusion::error::Result; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common_runtime::SpawnedTask; +use std::sync::Arc; +use std::time::Duration; + +/// Main progress reporter that coordinates metrics collection, ETA estimation, and display +pub struct ProgressReporter { + _handle: SpawnedTask<()>, +} + +impl ProgressReporter { + /// Start a new progress reporter for the given physical plan + pub async fn start( + physical_plan: &Arc, + config: ProgressConfig, + ) -> Result { + // Clone the plan for the background task + let plan = Arc::clone(physical_plan); + + let _handle = SpawnedTask::spawn(async move { + let reporter = ProgressReporterInner::new(plan, config); + reporter.run().await; + }); + + Ok(Self { _handle }) + } + + /// Stop the progress reporter + /// Note: The task is automatically aborted when this struct is dropped + pub async fn stop(&self) { + // Task will be aborted automatically when this struct is dropped + } +} + +/// Internal implementation of the progress reporter +struct ProgressReporterInner { + plan: Arc, + config: ProgressConfig, +} + +impl ProgressReporterInner { + fn new(plan: Arc, config: ProgressConfig) -> Self { + Self { plan, config } + } + + async fn run(self) { + // Early exit if progress is disabled + if !self.config.should_show_progress() { + return; + } + + let introspector = plan_introspect::PlanIntrospector::new(&self.plan); + let totals = introspector.get_totals(); + + let mut poller = metrics_poll::MetricsPoller::new(&self.plan); + let mut estimator = estimator::ProgressEstimator::new(); + let mut display = display::ProgressDisplay::new(self.config.style); + + let interval = Duration::from_millis(self.config.interval_ms); + let mut ticker = tokio::time::interval(interval); + + loop { + ticker.tick().await; + let metrics = poller.poll(); + let progress = self.calculate_progress(&totals, &metrics); + + let eta = estimator.update(progress.clone()); + display.update(&progress, eta); + + // Check for completion - when we have exact totals and current >= total + if let (Some(total), current) = (progress.total, progress.current) { + if current >= total && (totals.has_exact_bytes || totals.has_exact_rows) { + // Query has completed, exit the progress loop + display.finish(); + break; + } + } + + // For queries without known totals, rely on task termination when query completes + } + } + + fn calculate_progress( + &self, + totals: &plan_introspect::PlanTotals, + metrics: &metrics_poll::LiveMetrics, + ) -> ProgressInfo { + let (current, total, unit) = + if totals.total_bytes > 0 && metrics.bytes_scanned > 0 { + ( + metrics.bytes_scanned, + totals.total_bytes, + ProgressUnit::Bytes, + ) + } else if totals.total_rows > 0 && metrics.rows_processed > 0 { + ( + metrics.rows_processed, + totals.total_rows, + ProgressUnit::Rows, + ) + } else { + return ProgressInfo { + current: metrics.rows_processed, + total: None, + unit: ProgressUnit::Rows, + percent: None, + has_blocking_operators: totals.has_blocking_operators, + phase: ExecutionPhase::Reading, + }; + }; + + let raw_percent = if total > 0 { + ((current as f64 / total as f64) * 100.0).min(100.0) + } else { + 0.0 + }; + + // Determine execution phase and whether to show progress bar or spinner + let (percent, phase) = + self.determine_execution_phase(raw_percent, totals.has_blocking_operators); + + ProgressInfo { + current, + total: Some(total), + unit, + percent, + has_blocking_operators: totals.has_blocking_operators, + phase, + } + } + + /// Determine which execution phase we're in and whether to show a progress bar. + /// + /// When blocking operators (sorts, joins, aggregates) are detected and input + /// has been mostly consumed (>95%), we switch to spinner mode by returning + /// `percent: None`. This is more honest than showing a misleading progress bar + /// stuck at 100% while the blocking operator processes data. + fn determine_execution_phase( + &self, + raw_percent: f64, + has_blocking_operators: bool, + ) -> (Option, ExecutionPhase) { + if !has_blocking_operators { + // No blocking operators, simple linear progress + return (Some(raw_percent), ExecutionPhase::Reading); + } + + // With blocking operators, we need to be honest about what we know + if raw_percent < 95.0 { + // Still reading data, can show accurate progress + (Some(raw_percent), ExecutionPhase::Reading) + } else { + // Input mostly consumed, but blocking operator is likely still processing. + // Switch to spinner mode to avoid showing misleading "stuck at 100%" progress. + (None, ExecutionPhase::Processing) + } + } +} + +/// Information about current progress +#[derive(Debug, Clone)] +pub struct ProgressInfo { + pub current: usize, + pub total: Option, + pub unit: ProgressUnit, + pub percent: Option, + pub has_blocking_operators: bool, + pub phase: ExecutionPhase, +} + +/// Tracks which phase of execution we're in +#[derive(Debug, Clone, PartialEq)] +pub enum ExecutionPhase { + /// Reading and processing data from sources + Reading, + /// Pipeline-breaking operation (sort, join, aggregate) + Processing, + /// Writing final output + Finalizing, +} + +/// Unit of measurement for progress +#[derive(Debug, Clone, Copy)] +pub enum ProgressUnit { + Bytes, + Rows, +} + +impl ProgressUnit { + pub fn format_value(&self, value: usize) -> String { + match self { + ProgressUnit::Bytes => format_bytes(value), + ProgressUnit::Rows => format_number(value), + } + } +} + +/// Format a byte count in human-readable form +fn format_bytes(bytes: usize) -> String { + const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"]; + let mut size = bytes as f64; + let mut unit_index = 0; + + while size >= 1024.0 && unit_index < UNITS.len() - 1 { + size /= 1024.0; + unit_index += 1; + } + + if unit_index == 0 { + format!("{} {}", bytes, UNITS[unit_index]) + } else { + format!("{:.1} {}", size, UNITS[unit_index]) + } +} + +/// Format a number with appropriate separators +fn format_number(num: usize) -> String { + let s = num.to_string(); + let mut result = String::new(); + + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.insert(0, ','); + } + result.insert(0, c); + } + + result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_bytes() { + assert_eq!(format_bytes(500), "500 B"); + assert_eq!(format_bytes(1024), "1.0 KB"); + assert_eq!(format_bytes(1536), "1.5 KB"); + assert_eq!(format_bytes(1048576), "1.0 MB"); + assert_eq!(format_bytes(1073741824), "1.0 GB"); + } + + #[test] + fn test_format_number() { + assert_eq!(format_number(100), "100"); + assert_eq!(format_number(1000), "1,000"); + assert_eq!(format_number(1000000), "1,000,000"); + assert_eq!(format_number(1234567), "1,234,567"); + } +} diff --git a/datafusion-cli/src/progress/plan_introspect.rs b/datafusion-cli/src/progress/plan_introspect.rs new file mode 100644 index 0000000000000..36b8bbb16e342 --- /dev/null +++ b/datafusion-cli/src/progress/plan_introspect.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Plan introspection for extracting total size estimates + +use datafusion::common::Statistics; +use datafusion::physical_plan::{ + visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; +use std::sync::Arc; + +/// Extracts total size estimates from a physical plan +pub struct PlanIntrospector { + plan: Arc, +} + +impl PlanIntrospector { + pub fn new(plan: &Arc) -> Self { + Self { + plan: Arc::clone(plan), + } + } + + /// Extract total bytes and rows from the plan's statistics + pub fn get_totals(&self) -> PlanTotals { + let mut visitor = TotalsVisitor::new(); + // Handle potential errors in plan visitation + if let Err(e) = visit_execution_plan(self.plan.as_ref(), &mut visitor) { + // Log the error but continue with default totals + eprintln!( + "Warning: Failed to analyze execution plan for progress tracking: {}", + e + ); + } + visitor.into_totals() + } +} + +/// Accumulated totals from plan statistics +#[derive(Debug, Clone)] +pub struct PlanTotals { + pub total_bytes: usize, + pub total_rows: usize, + pub has_exact_bytes: bool, + pub has_exact_rows: bool, + pub has_blocking_operators: bool, +} + +impl PlanTotals { + fn new() -> Self { + Self { + total_bytes: 0, + total_rows: 0, + has_exact_bytes: false, + has_exact_rows: false, + has_blocking_operators: false, + } + } +} + +/// Visitor to collect statistics from plan nodes +struct TotalsVisitor { + totals: PlanTotals, +} + +impl TotalsVisitor { + fn new() -> Self { + Self { + totals: PlanTotals::new(), + } + } + + fn into_totals(self) -> PlanTotals { + self.totals + } +} + +impl ExecutionPlanVisitor for TotalsVisitor { + type Error = datafusion::error::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + // Focus on leaf nodes that actually read data + if self.is_leaf_node(plan) { + if let Ok(stats) = plan.partition_statistics(None) { + self.accumulate_statistics(&stats); + } + } + + // Check for pipeline-breaking (blocking) operators + if self.is_blocking_operator(plan) { + self.totals.has_blocking_operators = true; + } + + // Continue visiting children + Ok(true) + } +} + +impl TotalsVisitor { + /// Check if this plan node is a leaf node (no children) + /// Leaf nodes are typically data sources that actually read data + fn is_leaf_node(&self, plan: &dyn ExecutionPlan) -> bool { + plan.children().is_empty() + } + + /// Check if this plan node is a blocking/pipeline-breaking operator + /// These operators consume all input before producing any output + fn is_blocking_operator(&self, plan: &dyn ExecutionPlan) -> bool { + let name = plan.name(); + + // Check for known blocking operators using multiple strategies for robustness + self.is_known_blocking_operator(name) + || self.has_blocking_characteristics(plan) + || self.uses_blocking_patterns(name) + } + + /// Check for explicitly known blocking operators + fn is_known_blocking_operator(&self, name: &str) -> bool { + matches!( + name, + "SortExec" | "SortMergeJoinExec" | "HashJoinExec" | + "AggregateExec" | "WindowAggExec" | "GlobalLimitExec" | + "SortPreservingMergeExec" | "CoalescePartitionsExec" | + "SortPreservingRepartitionExec" | "RepartitionExec" | + // Additional known blocking operators from real usage + "SortPreservingMergeSort" | "PartialSortExec" | "TopKExec" | + "UnionExec" | "CrossJoinExec" | "NestedLoopJoinExec" | + "SymmetricHashJoinExec" | "BoundedWindowAggExec" + ) + } + + /// Check if plan has characteristics that suggest blocking behavior + fn has_blocking_characteristics(&self, plan: &dyn ExecutionPlan) -> bool { + // Operators that require full input to determine output ordering are typically blocking + // This is a heuristic that may need refinement + let properties = plan.properties(); + + // If an operator has a specific output ordering that differs significantly + // from input ordering, it's likely blocking + if let Some(output_ordering) = properties.output_ordering() { + if !output_ordering.is_empty() { + // If we have multiple children with potentially different orderings, + // and we need to produce a specific order, likely blocking + return plan.children().len() > 1; + } + } + + false + } + + /// Check for blocking patterns in operator names + fn uses_blocking_patterns(&self, name: &str) -> bool { + // Pattern-based detection for operators we might not know about + name.ends_with("SortExec") + || name.ends_with("JoinExec") + || name.ends_with("AggregateExec") + || name.ends_with("WindowExec") + || name.contains("Sort") && name.contains("Exec") + || name.contains("Merge") && name.contains("Exec") + || name.contains("Union") && name.contains("Exec") + } + + /// Accumulate statistics from a plan node + fn accumulate_statistics(&mut self, stats: &Statistics) { + // Accumulate byte sizes + if let Some(bytes) = stats.total_byte_size.get_value() { + self.totals.total_bytes += *bytes; + } + if stats.total_byte_size.is_exact().unwrap_or(false) { + self.totals.has_exact_bytes = true; + } + + // Accumulate row counts + if let Some(rows) = stats.num_rows.get_value() { + self.totals.total_rows += *rows; + } + if stats.num_rows.is_exact().unwrap_or(false) { + self.totals.has_exact_rows = true; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::empty::EmptyExec; + + #[test] + fn test_plan_introspector() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let empty_exec = EmptyExec::new(schema); + let plan: Arc = Arc::new(empty_exec); + + let introspector = PlanIntrospector::new(&plan); + let totals = introspector.get_totals(); + + // EmptyExec should have zero totals + assert_eq!(totals.total_bytes, 0); + assert_eq!(totals.total_rows, 0); + } +} diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index cdb442c47f49c..5f8fad15baef9 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -338,6 +338,201 @@ SELECT COUNT(*) FROM hits; /// that triggers error. /// Example: /// RUST_BACKTRACE=1 cargo run --features backtrace -- -c 'select pow(1,'foo');' +#[rstest] +#[case("off")] +#[case("on")] +#[case("auto")] +fn test_progress_bar_mode(#[case] mode: &str) { + let mut settings = make_settings(); + settings.set_snapshot_suffix(mode); + settings.add_filter(r"β–‰+β–Ž*", "[PROGRESS_BAR]"); + settings.add_filter(r"[β”€β”β”Œβ”˜β””β”€β”‚β”¬β”΄β”Ό]+", "[SPINNER]"); + settings.add_filter(r"\d+\.\d+ [KMGT]?B", "[SIZE]"); + settings.add_filter(r"\d+,?\d* rows", "[ROWS]"); + settings.add_filter(r"\d+\.\d+%", "[PERCENT]"); + settings.add_filter(r"ETA \d{2}:\d{2} / \d{2}:\d{2}", "[ETA]"); + settings.add_filter(r"elapsed: \d{2}:\d{2}", "[ELAPSED]"); + settings.add_filter(r"rows: \d+,?\d*", "rows: [COUNT]"); + settings.add_filter(r"\d+\.\d+", "[DECIMAL]"); + // Remove ALL progress bar output traces for deterministic tests + settings.add_filter(r"β ‹ rows: \[COUNT\] \[ELAPSED\]", ""); + settings.add_filter(r"\[PROGRESS_LINE\]", ""); + settings.add_filter(r"[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]", ""); + settings.add_filter(r"␍", ""); + settings.add_filter(r"␛\[K", ""); + settings.add_filter(r"\[K\]", ""); + // Remove all carriage returns and escape sequences completely + settings.add_filter(r"\r", ""); + settings.add_filter(r"\x1b\[K", ""); + settings.add_filter(r"\x0d", ""); + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + cmd.args([ + "--progress", + mode, + "--command", + "SELECT COUNT(*) FROM generate_series(1, 1000)", + "-q", + ]); + + assert_cmd_snapshot!(cmd); +} + +#[rstest] +#[case("bar")] +#[case("spinner")] +fn test_progress_bar_style(#[case] style: &str) { + let mut settings = make_settings(); + settings.set_snapshot_suffix(style); + settings.add_filter(r"β–‰+β–Ž*", "[PROGRESS_BAR]"); + settings.add_filter(r"[β”€β”β”Œβ”˜β””β”€β”‚β”¬β”΄β”Ό]+", "[SPINNER]"); + settings.add_filter(r"\d+\.\d+ [KMGT]?B", "[SIZE]"); + settings.add_filter(r"\d+,?\d* rows", "[ROWS]"); + settings.add_filter(r"\d+\.\d+%", "[PERCENT]"); + settings.add_filter(r"ETA \d{2}:\d{2} / \d{2}:\d{2}", "[ETA]"); + settings.add_filter(r"elapsed: \d{2}:\d{2}", "[ELAPSED]"); + settings.add_filter(r"rows: \d+,?\d*", "rows: [COUNT]"); + settings.add_filter(r"\d+\.\d+", "[DECIMAL]"); + // Remove ALL progress bar output traces for deterministic tests + settings.add_filter(r"β ‹ rows: \[COUNT\] \[ELAPSED\]", ""); + settings.add_filter(r"\[PROGRESS_LINE\]", ""); + settings.add_filter(r"[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]", ""); + settings.add_filter(r"␍", ""); + settings.add_filter(r"␛\[K", ""); + settings.add_filter(r"\[K\]", ""); + // Remove all carriage returns and escape sequences completely + settings.add_filter(r"\r", ""); + settings.add_filter(r"\x1b\[K", ""); + settings.add_filter(r"\x0d", ""); + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + cmd.args([ + "--progress", + "on", + "--progress-style", + style, + "--command", + "SELECT COUNT(*) FROM generate_series(1, 1000)", + "-q", + ]); + + assert_cmd_snapshot!(cmd); +} + +#[test] +fn test_progress_bar_with_blocking_operators() { + let mut settings = make_settings(); + settings.add_filter(r"β–‰+β–Ž*", "[PROGRESS_BAR]"); + settings.add_filter(r"[β”€β”β”Œβ”˜β””β”€β”‚β”¬β”΄β”Ό]+", "[SPINNER]"); + settings.add_filter(r"\d+\.\d+ [KMGT]?B", "[SIZE]"); + settings.add_filter(r"\d+,?\d* rows", "[ROWS]"); + settings.add_filter(r"\d+\.\d+%", "[PERCENT]"); + settings.add_filter(r"ETA \d{2}:\d{2} / \d{2}:\d{2}", "[ETA]"); + settings.add_filter(r"elapsed: \d{2}:\d{2}", "[ELAPSED]"); + settings.add_filter(r"rows: \d+,?\d*", "rows: [COUNT]"); + settings.add_filter(r"\d+\.\d+", "[DECIMAL]"); + // Remove ALL progress bar output traces for deterministic tests + settings.add_filter(r"β ‹ rows: \[COUNT\] \[ELAPSED\]", ""); + settings.add_filter(r"\[PROGRESS_LINE\]", ""); + settings.add_filter(r"[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]", ""); + settings.add_filter(r"␍", ""); + settings.add_filter(r"␛\[K", ""); + settings.add_filter(r"\[K\]", ""); + // Remove all carriage returns and escape sequences completely + settings.add_filter(r"\r", ""); + settings.add_filter(r"\x1b\[K", ""); + settings.add_filter(r"\x0d", ""); + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + cmd.args([ + "--progress", + "on", + "--command", + "SELECT * FROM generate_series(1, 1000) ORDER BY 1 DESC", + "-q", + ]); + + assert_cmd_snapshot!(cmd); +} + +#[test] +fn test_progress_bar_with_aggregation() { + let mut settings = make_settings(); + settings.add_filter(r"β–‰+β–Ž*", "[PROGRESS_BAR]"); + settings.add_filter(r"[β”€β”β”Œβ”˜β””β”€β”‚β”¬β”΄β”Ό]+", "[SPINNER]"); + settings.add_filter(r"\d+\.\d+ [KMGT]?B", "[SIZE]"); + settings.add_filter(r"\d+,?\d* rows", "[ROWS]"); + settings.add_filter(r"\d+\.\d+%", "[PERCENT]"); + settings.add_filter(r"ETA \d{2}:\d{2} / \d{2}:\d{2}", "[ETA]"); + settings.add_filter(r"elapsed: \d{2}:\d{2}", "[ELAPSED]"); + settings.add_filter(r"rows: \d+,?\d*", "rows: [COUNT]"); + settings.add_filter(r"\d+\.\d+", "[DECIMAL]"); + // Remove ALL progress bar output traces for deterministic tests + settings.add_filter(r"β ‹ rows: \[COUNT\] \[ELAPSED\]", ""); + settings.add_filter(r"\[PROGRESS_LINE\]", ""); + settings.add_filter(r"[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]", ""); + settings.add_filter(r"␍", ""); + settings.add_filter(r"␛\[K", ""); + settings.add_filter(r"\[K\]", ""); + // Remove all carriage returns and escape sequences completely + settings.add_filter(r"\r", ""); + settings.add_filter(r"\x1b\[K", ""); + settings.add_filter(r"\x0d", ""); + settings.add_filter(r"πŸ”„|✨", "[PHASE_EMOJI]"); + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + cmd.args([ + "--progress", + "on", + "--command", + "SELECT COUNT(*), AVG(value) FROM generate_series(1, 5000) GROUP BY value % 100", + "-q", + ]); + + assert_cmd_snapshot!(cmd); +} + +#[test] +fn test_progress_bar_with_large_dataset() { + let mut settings = make_settings(); + settings.add_filter(r"β–‰+β–Ž*", "[PROGRESS_BAR]"); + settings.add_filter(r"[β”€β”β”Œβ”˜β””β”€β”‚β”¬β”΄β”Ό]+", "[SPINNER]"); + settings.add_filter(r"\d+\.\d+ [KMGT]?B", "[SIZE]"); + settings.add_filter(r"\d+,?\d* rows", "[ROWS]"); + settings.add_filter(r"\d+\.\d+%", "[PERCENT]"); + settings.add_filter(r"ETA \d{2}:\d{2} / \d{2}:\d{2}", "[ETA]"); + settings.add_filter(r"elapsed: \d{2}:\d{2}", "[ELAPSED]"); + settings.add_filter(r"rows: \d+,?\d*", "rows: [COUNT]"); + settings.add_filter(r"\d+\.\d+", "[DECIMAL]"); + // Remove ALL progress bar output traces for deterministic tests + settings.add_filter(r"β ‹ rows: \[COUNT\] \[ELAPSED\]", ""); + settings.add_filter(r"\[PROGRESS_LINE\]", ""); + settings.add_filter(r"[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏]", ""); + settings.add_filter(r"␍", ""); + settings.add_filter(r"␛\[K", ""); + settings.add_filter(r"\[K\]", ""); + // Remove all carriage returns and escape sequences completely + settings.add_filter(r"\r", ""); + settings.add_filter(r"\x1b\[K", ""); + settings.add_filter(r"\x0d", ""); + let _bound = settings.bind_to_scope(); + + let mut cmd = cli(); + cmd.args([ + "--progress", + "on", + "--command", + "SELECT COUNT(*) FROM generate_series(1, 50000)", + "-q", + ]); + + assert_cmd_snapshot!(cmd); +} + #[rstest] #[case("SELECT pow(1,'foo')")] #[case("SELECT CAST('not_a_number' AS INTEGER);")] diff --git a/datafusion-cli/tests/snapshots/progress_bar_mode@auto.snap b/datafusion-cli/tests/snapshots/progress_bar_mode@auto.snap new file mode 100644 index 0000000000000..7c32132e195f7 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_mode@auto.snap @@ -0,0 +1,21 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - auto + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 1000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 1000 | ++----------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_mode@off.snap b/datafusion-cli/tests/snapshots/progress_bar_mode@off.snap new file mode 100644 index 0000000000000..85a219355f062 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_mode@off.snap @@ -0,0 +1,21 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "off" + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 1000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 1000 | ++----------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_mode@on.snap b/datafusion-cli/tests/snapshots/progress_bar_mode@on.snap new file mode 100644 index 0000000000000..b933cc539e3ae --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_mode@on.snap @@ -0,0 +1,21 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 1000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 1000 | ++----------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_style@bar.snap b/datafusion-cli/tests/snapshots/progress_bar_style@bar.snap new file mode 100644 index 0000000000000..766d761fa5187 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_style@bar.snap @@ -0,0 +1,23 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--progress-style" + - bar + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 1000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 1000 | ++----------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_style@spinner.snap b/datafusion-cli/tests/snapshots/progress_bar_style@spinner.snap new file mode 100644 index 0000000000000..015b9dbb73bdb --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_style@spinner.snap @@ -0,0 +1,23 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--progress-style" + - spinner + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 1000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 1000 | ++----------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_with_aggregation.snap b/datafusion-cli/tests/snapshots/progress_bar_with_aggregation.snap new file mode 100644 index 0000000000000..ac4a9e4c787a4 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_with_aggregation.snap @@ -0,0 +1,63 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--command" + - "SELECT COUNT(*), AVG(value) FROM generate_series(1, 5000) GROUP BY value % 100" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+------------------------------+ +| count(*) | avg(generate_series().value) | ++----------+------------------------------+ +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| 50 | [DECIMAL] | +| . | +| . | +| . | ++----------+------------------------------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_with_blocking_operators.snap b/datafusion-cli/tests/snapshots/progress_bar_with_blocking_operators.snap new file mode 100644 index 0000000000000..741528144fb83 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_with_blocking_operators.snap @@ -0,0 +1,63 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--command" + - "SELECT * FROM generate_series(1, 1000) ORDER BY 1 DESC" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++-------+ +| value | ++-------+ +| 1000 | +| 999 | +| 998 | +| 997 | +| 996 | +| 995 | +| 994 | +| 993 | +| 992 | +| 991 | +| 990 | +| 989 | +| 988 | +| 987 | +| 986 | +| 985 | +| 984 | +| 983 | +| 982 | +| 981 | +| 980 | +| 979 | +| 978 | +| 977 | +| 976 | +| 975 | +| 974 | +| 973 | +| 972 | +| 971 | +| 970 | +| 969 | +| 968 | +| 967 | +| 966 | +| 965 | +| 964 | +| 963 | +| 962 | +| 961 | +| . | +| . | +| . | ++-------+ + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/progress_bar_with_large_dataset.snap b/datafusion-cli/tests/snapshots/progress_bar_with_large_dataset.snap new file mode 100644 index 0000000000000..3997a1179a412 --- /dev/null +++ b/datafusion-cli/tests/snapshots/progress_bar_with_large_dataset.snap @@ -0,0 +1,21 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "--progress" + - "on" + - "--command" + - "SELECT COUNT(*) FROM generate_series(1, 50000)" + - "-q" +--- +success: true +exit_code: 0 +----- stdout ----- ++----------+ +| count(*) | ++----------+ +| 50000 | ++----------+ + +----- stderr -----