Skip to content

Commit 8ef8ef4

Browse files
ansu86dCopilot
andauthored
feat: enhance UpdateStats display with progress bar and error handling (#1223)
* feat: enhance UpdateStats display with progress bar and error handling Fix for #343 * feat: add unicode-width package and remove indicatif dependency * feat: add atty package and implement TTY detection in UpdateStats display * refactor: streamline subprocess.run arguments and improve formatting in UpdateStats display * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent cd96429 commit 8ef8ef4

File tree

5 files changed

+218
-40
lines changed

5 files changed

+218
-40
lines changed

Cargo.lock

Lines changed: 47 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env python3
2+
"""Run main.py and capture output to test_output.txt"""
3+
4+
import subprocess
5+
import sys
6+
7+
# Run main.py and capture output
8+
result = subprocess.run(
9+
[sys.executable, "main.py"], capture_output=True, text=True, timeout=10
10+
)
11+
12+
# Write both stdout and stderr to test_output.txt
13+
with open("test_output.txt", "w") as f:
14+
if result.stdout:
15+
f.write(result.stdout)
16+
if result.stderr:
17+
f.write(result.stderr)
18+
19+
print(f"Output saved to test_output.txt")
20+
print(f"Return code: {result.returncode}")

src/execution/db_tracking.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ pub async fn update_source_tracking_ordinal(
403403
/// Access for the source state table
404404
////////////////////////////////////////////////////////////
405405

406+
#[allow(dead_code)]
406407
pub async fn read_source_state(
407408
source_id: i32,
408409
source_key_json: &serde_json::Value,
@@ -425,6 +426,7 @@ pub async fn read_source_state(
425426
Ok(state)
426427
}
427428

429+
#[allow(dead_code)]
428430
pub async fn upsert_source_state(
429431
source_id: i32,
430432
source_key_json: &serde_json::Value,

src/execution/stats.rs

Lines changed: 148 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
use crate::prelude::*;
2+
use owo_colors::{AnsiColors, OwoColorize};
23

34
use std::{
45
ops::AddAssign,
56
sync::atomic::{AtomicI64, Ordering::Relaxed},
67
};
78

9+
/// Check if stdout is a TTY
10+
fn is_stdout_tty() -> bool {
11+
atty::is(atty::Stream::Stdout)
12+
}
13+
814
#[derive(Default, Serialize)]
915
pub struct Counter(pub AtomicI64);
1016

@@ -107,6 +113,10 @@ pub struct UpdateStats {
107113
pub num_errors: Counter,
108114
/// Processing counters for tracking in-process rows.
109115
pub processing: ProcessingCounters,
116+
/// Cumulative total count of items (for display purposes)
117+
/// This represents the actual total after applying additions and deletions
118+
#[serde(skip)]
119+
pub cumulative_total: Counter,
110120
}
111121

112122
impl UpdateStats {
@@ -119,6 +129,7 @@ impl UpdateStats {
119129
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
120130
num_errors: self.num_errors.delta(&base.num_errors),
121131
processing: self.processing.delta(&base.processing),
132+
cumulative_total: self.cumulative_total.clone(),
122133
}
123134
}
124135

@@ -130,6 +141,9 @@ impl UpdateStats {
130141
self.num_reprocesses.merge(&delta.num_reprocesses);
131142
self.num_errors.merge(&delta.num_errors);
132143
self.processing.merge(&delta.processing);
144+
// Update cumulative total: add insertions, subtract deletions
145+
let net_change = delta.num_insertions.get() - delta.num_deletions.get();
146+
self.cumulative_total.inc(net_change);
133147
}
134148

135149
pub fn has_any_change(&self) -> bool {
@@ -193,55 +207,151 @@ impl OperationInProcessStats {
193207

194208
impl std::fmt::Display for UpdateStats {
195209
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196-
let mut messages = Vec::new();
197-
let num_errors = self.num_errors.get();
198-
if num_errors > 0 {
199-
messages.push(format!("{num_errors} source rows FAILED"));
200-
}
201-
202-
let num_skipped = self.num_no_change.get();
203-
if num_skipped > 0 {
204-
messages.push(format!("{num_skipped} source rows NO CHANGE"));
205-
}
206-
210+
let mut segments = Vec::new();
207211
let num_insertions = self.num_insertions.get();
208212
let num_deletions = self.num_deletions.get();
209213
let num_updates = self.num_updates.get();
214+
let num_no_change = self.num_no_change.get();
215+
let num_errors = self.num_errors.get();
216+
let num_in_process = self.processing.get_in_process();
210217
let num_reprocesses = self.num_reprocesses.get();
211-
let num_source_rows = num_insertions + num_deletions + num_updates + num_reprocesses;
212-
if num_source_rows > 0 {
213-
let mut sub_messages = Vec::new();
218+
let total = num_insertions + num_deletions + num_updates + num_no_change + num_reprocesses;
219+
220+
// Progress bar segments
221+
if total > 0 {
214222
if num_insertions > 0 {
215-
sub_messages.push(format!("{num_insertions} ADDED"));
223+
segments.push((num_insertions, "+", format!("(+{} added)", num_insertions)));
216224
}
217225
if num_deletions > 0 {
218-
sub_messages.push(format!("{num_deletions} REMOVED"));
219-
}
220-
if num_reprocesses > 0 {
221-
sub_messages.push(format!(
222-
"{num_reprocesses} REPROCESSED on flow/logic changes or reexport"
223-
));
226+
segments.push((num_deletions, "-", format!("(-{} removed)", num_deletions)));
224227
}
225228
if num_updates > 0 {
226-
sub_messages.push(format!("{num_updates} UPDATED in source content only"));
229+
segments.push((num_updates, "~", format!("(~{} updated)", num_updates)));
230+
}
231+
if num_no_change > 0 {
232+
segments.push((num_no_change, " ", "".to_string()));
227233
}
228-
messages.push(format!(
229-
"{num_source_rows} source rows processed ({})",
230-
sub_messages.join(", "),
231-
));
232234
}
233235

234-
let num_in_process = self.processing.get_in_process();
235-
if num_in_process > 0 {
236-
messages.push(format!("{num_in_process} source rows IN PROCESS"));
236+
// Error handling
237+
if num_errors > 0 {
238+
let tty = is_stdout_tty();
239+
if tty {
240+
write!(
241+
f,
242+
"{}",
243+
format!("{} rows failed", num_errors).color(AnsiColors::White)
244+
)?;
245+
} else {
246+
write!(f, "{} rows failed", num_errors)?;
247+
}
248+
if !segments.is_empty() {
249+
write!(f, "; ")?;
250+
}
237251
}
238252

239-
if !messages.is_empty() {
240-
write!(f, "{}", messages.join("; "))?;
253+
// Progress bar
254+
if !segments.is_empty() {
255+
let mut sorted_segments = segments.clone();
256+
sorted_segments.sort_by_key(|s| s.0);
257+
sorted_segments.reverse();
258+
259+
let bar_width = 40;
260+
let mut bar = String::new();
261+
262+
let mut remaining_width = bar_width;
263+
264+
for (count, segment_type, _) in sorted_segments.iter() {
265+
let segment_width = (*count * bar_width as i64 / total as i64) as usize;
266+
let width = std::cmp::min(segment_width, remaining_width);
267+
if width > 0 {
268+
// Calculate completed and remaining portions
269+
let completed_portion =
270+
(width as f64 * (total - num_in_process) as f64 / total as f64) as usize;
271+
let remaining_portion = width - completed_portion;
272+
273+
// Add segment with appropriate characters based on type
274+
if completed_portion > 0 {
275+
let completed_char = match *segment_type {
276+
"+" => "█",
277+
"-" => "▓",
278+
"~" => "▒",
279+
_ => "░",
280+
};
281+
bar.push_str(&completed_char.repeat(completed_portion));
282+
}
283+
284+
if remaining_portion > 0 {
285+
let remaining_char = match *segment_type {
286+
"+" => "▒",
287+
"-" => "░",
288+
"~" => "░",
289+
_ => " ",
290+
};
291+
bar.push_str(&remaining_char.repeat(remaining_portion));
292+
}
293+
294+
remaining_width = remaining_width.saturating_sub(width);
295+
}
296+
}
297+
if remaining_width > 0 {
298+
bar.push_str(&" ".repeat(remaining_width));
299+
}
300+
let tty = is_stdout_tty();
301+
// Use total from current operations - this represents the actual record count
302+
if tty {
303+
write!(
304+
f,
305+
"[{}] {}/{} records ",
306+
bar.color(AnsiColors::BrightBlack),
307+
total - num_in_process,
308+
total
309+
)?;
310+
} else {
311+
write!(f, "[{}] {}/{} records ", bar, total - num_in_process, total)?;
312+
}
313+
314+
// Add segment labels with different grey shades for each segment type
315+
let mut first = true;
316+
for (_, segment_type, label) in segments.iter() {
317+
if !label.is_empty() {
318+
if !first {
319+
write!(f, " ")?;
320+
}
321+
if tty {
322+
match *segment_type {
323+
"+" => write!(f, "{}", label.color(AnsiColors::BrightBlack))?, // Lightest grey for additions
324+
"-" => write!(f, "{}", label.color(AnsiColors::White))?, // White for removals
325+
"~" => write!(f, "{}", label.color(AnsiColors::Black))?, // Dark grey for updates
326+
_ => write!(f, "{}", label.color(AnsiColors::Black))?, // Black for no-change
327+
}
328+
} else {
329+
write!(f, "{}", label)?;
330+
}
331+
first = false;
332+
}
333+
}
241334
} else {
242335
write!(f, "No changes")?;
243336
}
244337

338+
// In-process info with grey coloring
339+
if num_in_process > 0 {
340+
if !segments.is_empty() {
341+
write!(f, " ")?;
342+
}
343+
let tty = is_stdout_tty();
344+
if tty {
345+
write!(
346+
f,
347+
"{}",
348+
format!("({} in process)", num_in_process).color(AnsiColors::Black)
349+
)?;
350+
} else {
351+
write!(f, "({} in process)", num_in_process)?;
352+
}
353+
}
354+
245355
Ok(())
246356
}
247357
}
@@ -511,17 +621,19 @@ mod tests {
511621
// Test with no activity
512622
assert_eq!(format!("{}", stats), "No changes");
513623

514-
// Test with in-process rows
624+
// Test with in-process rows (no segments yet, so just shows in-process)
515625
stats.processing.start(5);
516-
assert!(format!("{}", stats).contains("5 source rows IN PROCESS"));
626+
let display = format!("{}", stats);
627+
assert!(display.contains("5 in process"));
517628

518629
// Test with mixed activity
519630
stats.num_insertions.inc(3);
520631
stats.num_errors.inc(1);
632+
stats.cumulative_total.inc(3);
521633
let display = format!("{}", stats);
522-
assert!(display.contains("1 source rows FAILED"));
523-
assert!(display.contains("3 source rows processed"));
524-
assert!(display.contains("5 source rows IN PROCESS"));
634+
assert!(display.contains("1 rows failed"));
635+
assert!(display.contains("(+3 added)"));
636+
assert!(display.contains("5 in process"));
525637
}
526638

527639
#[test]

src/ops/factory_bases.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub struct ResolvedOpArg {
7676

7777
pub trait ResolvedOpArgExt: Sized {
7878
fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value>;
79+
#[allow(dead_code)]
7980
fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value>;
8081
}
8182

0 commit comments

Comments
 (0)