Skip to content

Commit 47fc7f0

Browse files
committed
feat: add atty package and implement TTY detection in UpdateStats display
1 parent 8dfb4e9 commit 47fc7f0

File tree

6 files changed

+129
-18
lines changed

6 files changed

+129
-18
lines changed

Cargo.lock

Lines changed: 45 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ bytes = "1.10.1"
127127
rand = "0.9.2"
128128
indoc = "2.0.6"
129129
owo-colors = "4.2.2"
130+
atty = "0.2.14"
130131
json5 = "0.4.1"
131132
aws-config = "1.8.5"
132133
aws-sdk-s3 = "1.102.0"
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env python3
2+
"""Run main.py and capture output to test_output.txt"""
3+
4+
import subprocess
5+
import sys
6+
7+
# Run main.py and capture output
8+
result = subprocess.run(
9+
[sys.executable, "main.py"],
10+
capture_output=True,
11+
text=True,
12+
timeout=10
13+
)
14+
15+
# Write both stdout and stderr to test_output.txt
16+
with open("test_output.txt", "w") as f:
17+
if result.stdout:
18+
f.write(result.stdout)
19+
if result.stderr:
20+
f.write(result.stderr)
21+
22+
print(f"Output saved to test_output.txt")
23+
print(f"Return code: {result.returncode}")

src/execution/db_tracking.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ pub async fn update_source_tracking_ordinal(
403403
/// Access for the source state table
404404
////////////////////////////////////////////////////////////
405405

406+
#[allow(dead_code)]
406407
pub async fn read_source_state(
407408
source_id: i32,
408409
source_key_json: &serde_json::Value,
@@ -425,6 +426,7 @@ pub async fn read_source_state(
425426
Ok(state)
426427
}
427428

429+
#[allow(dead_code)]
428430
pub async fn upsert_source_state(
429431
source_id: i32,
430432
source_key_json: &serde_json::Value,

src/execution/stats.rs

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
use crate::prelude::*;
2+
use owo_colors::{AnsiColors, OwoColorize};
23

34
use std::{
45
ops::AddAssign,
56
sync::atomic::{AtomicI64, Ordering::Relaxed},
67
};
78

9+
/// Check if stdout is a TTY
10+
fn is_stdout_tty() -> bool {
11+
atty::is(atty::Stream::Stdout)
12+
}
13+
814
#[derive(Default, Serialize)]
915
pub struct Counter(pub AtomicI64);
1016

@@ -107,6 +113,10 @@ pub struct UpdateStats {
107113
pub num_errors: Counter,
108114
/// Processing counters for tracking in-process rows.
109115
pub processing: ProcessingCounters,
116+
/// Cumulative total count of items (for display purposes)
117+
/// This represents the actual total after applying additions and deletions
118+
#[serde(skip)]
119+
pub cumulative_total: Counter,
110120
}
111121

112122
impl UpdateStats {
@@ -119,6 +129,7 @@ impl UpdateStats {
119129
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
120130
num_errors: self.num_errors.delta(&base.num_errors),
121131
processing: self.processing.delta(&base.processing),
132+
cumulative_total: self.cumulative_total.clone(),
122133
}
123134
}
124135

@@ -130,6 +141,9 @@ impl UpdateStats {
130141
self.num_reprocesses.merge(&delta.num_reprocesses);
131142
self.num_errors.merge(&delta.num_errors);
132143
self.processing.merge(&delta.processing);
144+
// Update cumulative total: add insertions, subtract deletions
145+
let net_change = delta.num_insertions.get() - delta.num_deletions.get();
146+
self.cumulative_total.inc(net_change);
133147
}
134148

135149
pub fn has_any_change(&self) -> bool {
@@ -205,13 +219,13 @@ impl std::fmt::Display for UpdateStats {
205219
// Progress bar segments
206220
if total > 0 {
207221
if num_insertions > 0 {
208-
segments.push((num_insertions, "+", format!("\x1B[90m(+{} added)\x1B[0m", num_insertions)));
222+
segments.push((num_insertions, "+", format!("(+{} added)", num_insertions)));
209223
}
210224
if num_deletions > 0 {
211-
segments.push((num_deletions, "-", format!("\x1B[90m(-{} removed)\x1B[0m", num_deletions)));
225+
segments.push((num_deletions, "-", format!("(-{} removed)", num_deletions)));
212226
}
213227
if num_updates > 0 {
214-
segments.push((num_updates, "~", format!("\x1B[90m(~{} updated)\x1B[0m", num_updates)));
228+
segments.push((num_updates, "~", format!("(~{} updated)", num_updates)));
215229
}
216230
if num_no_change > 0 {
217231
segments.push((num_no_change, " ", "".to_string()));
@@ -220,7 +234,12 @@ impl std::fmt::Display for UpdateStats {
220234

221235
// Error handling
222236
if num_errors > 0 {
223-
write!(f, "{} rows failed", num_errors)?;
237+
let tty = is_stdout_tty();
238+
if tty {
239+
write!(f, "{}", format!("{} rows failed", num_errors).color(AnsiColors::White))?;
240+
} else {
241+
write!(f, "{} rows failed", num_errors)?;
242+
}
224243
if !segments.is_empty() {
225244
write!(f, "; ")?;
226245
}
@@ -235,7 +254,7 @@ impl std::fmt::Display for UpdateStats {
235254
let bar_width = 40;
236255
let mut bar = String::new();
237256

238-
let percentage = ((total - num_in_process) as f64 / total as f64 * 100.0) as i64;
257+
let _percentage = ((total - num_in_process) as f64 / total as f64 * 100.0) as i64;
239258
let mut remaining_width = bar_width;
240259

241260
for (count, segment_type, _) in sorted_segments.iter() {
@@ -273,29 +292,49 @@ impl std::fmt::Display for UpdateStats {
273292
if remaining_width > 0 {
274293
bar.push_str(&" ".repeat(remaining_width));
275294
}
276-
write!(f, "[{}] {}/{} records ", bar, total - num_in_process, total)?;
295+
let tty = is_stdout_tty();
296+
// Use total from current operations - this represents the actual record count
297+
if tty {
298+
write!(f, "[{}] {}/{} records ", bar.color(AnsiColors::BrightBlack), total - num_in_process, total)?;
299+
} else {
300+
write!(f, "[{}] {}/{} records ", bar, total - num_in_process, total)?;
301+
}
277302

278-
// Add segment labels
303+
// Add segment labels with different grey shades for each segment type
279304
let mut first = true;
280-
for (_, _, label) in segments.iter() {
305+
for (_, segment_type, label) in segments.iter() {
281306
if !label.is_empty() {
282307
if !first {
283308
write!(f, " ")?;
284309
}
285-
write!(f, "{}", label)?;
310+
if tty {
311+
match *segment_type {
312+
"+" => write!(f, "{}", label.color(AnsiColors::BrightBlack))?, // Lightest grey for additions
313+
"-" => write!(f, "{}", label.color(AnsiColors::White))?, // White for removals
314+
"~" => write!(f, "{}", label.color(AnsiColors::Black))?, // Dark grey for updates
315+
_ => write!(f, "{}", label.color(AnsiColors::Black))?, // Black for no-change
316+
}
317+
} else {
318+
write!(f, "{}", label)?;
319+
}
286320
first = false;
287321
}
288322
}
289323
} else {
290324
write!(f, "No changes")?;
291325
}
292326

293-
// In-process info
327+
// In-process info with grey coloring
294328
if num_in_process > 0 {
295329
if !segments.is_empty() {
296330
write!(f, " ")?;
297331
}
298-
write!(f, "({} in process)", num_in_process)?;
332+
let tty = is_stdout_tty();
333+
if tty {
334+
write!(f, "{}", format!("({} in process)", num_in_process).color(AnsiColors::Black))?;
335+
} else {
336+
write!(f, "({} in process)", num_in_process)?;
337+
}
299338
}
300339

301340
Ok(())
@@ -567,17 +606,19 @@ mod tests {
567606
// Test with no activity
568607
assert_eq!(format!("{}", stats), "No changes");
569608

570-
// Test with in-process rows
609+
// Test with in-process rows (no segments yet, so just shows in-process)
571610
stats.processing.start(5);
572-
assert!(format!("{}", stats).contains("5 source rows IN PROCESS"));
611+
let display = format!("{}", stats);
612+
assert!(display.contains("5 in process"));
573613

574614
// Test with mixed activity
575615
stats.num_insertions.inc(3);
576616
stats.num_errors.inc(1);
617+
stats.cumulative_total.inc(3);
577618
let display = format!("{}", stats);
578-
assert!(display.contains("1 source rows FAILED"));
579-
assert!(display.contains("3 source rows processed"));
580-
assert!(display.contains("5 source rows IN PROCESS"));
619+
assert!(display.contains("1 rows failed"));
620+
assert!(display.contains("(+3 added)"));
621+
assert!(display.contains("5 in process"));
581622
}
582623

583624
#[test]

src/ops/factory_bases.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub struct ResolvedOpArg {
7272

7373
pub trait ResolvedOpArgExt: Sized {
7474
fn value<'a>(&self, args: &'a [value::Value]) -> Result<&'a value::Value>;
75+
#[allow(dead_code)]
7576
fn take_value(&self, args: &mut [value::Value]) -> Result<value::Value>;
7677
}
7778

0 commit comments

Comments
 (0)