Skip to content

Commit 03893b7

Browse files
committed
fix: now uses ProcessingCounters struct, to make it cummulative
1 parent 7715b51 commit 03893b7

File tree

1 file changed

+132
-17
lines changed

1 file changed

+132
-17
lines changed

src/execution/stats.rs

Lines changed: 132 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,55 @@ impl std::fmt::Debug for Counter {
5454
}
5555
}
5656

57+
#[derive(Debug, Serialize, Default, Clone)]
58+
pub struct ProcessingCounters {
59+
/// Total number of processing operations started.
60+
pub num_starts: Counter,
61+
/// Total number of processing operations ended.
62+
pub num_ends: Counter,
63+
}
64+
65+
impl ProcessingCounters {
66+
/// Start processing the specified number of items.
67+
pub fn start(&self, count: i64) {
68+
self.num_starts.inc(count);
69+
}
70+
71+
/// End processing the specified number of items.
72+
pub fn end(&self, count: i64) {
73+
self.num_ends.inc(count);
74+
}
75+
76+
/// Get the current number of items being processed (starts - ends).
77+
pub fn get_in_process(&self) -> i64 {
78+
self.num_starts.get() - self.num_ends.get()
79+
}
80+
81+
/// Get the total number of processing operations started.
82+
pub fn get_total_starts(&self) -> i64 {
83+
self.num_starts.get()
84+
}
85+
86+
/// Get the total number of processing operations ended.
87+
pub fn get_total_ends(&self) -> i64 {
88+
self.num_ends.get()
89+
}
90+
91+
/// Calculate the delta between this and a base ProcessingCounters.
92+
pub fn delta(&self, base: &Self) -> Self {
93+
ProcessingCounters {
94+
num_starts: self.num_starts.delta(&base.num_starts),
95+
num_ends: self.num_ends.delta(&base.num_ends),
96+
}
97+
}
98+
99+
/// Merge a delta into this ProcessingCounters.
100+
pub fn merge(&self, delta: &Self) {
101+
self.num_starts.merge(&delta.num_starts);
102+
self.num_ends.merge(&delta.num_ends);
103+
}
104+
}
105+
57106
#[derive(Debug, Serialize, Default, Clone)]
58107
pub struct UpdateStats {
59108
pub num_no_change: Counter,
@@ -64,8 +113,8 @@ pub struct UpdateStats {
64113
/// Number of source rows that were reprocessed because of logic change.
65114
pub num_reprocesses: Counter,
66115
pub num_errors: Counter,
67-
/// Number of source rows currently being processed.
68-
pub num_in_process: Counter,
116+
/// Processing counters for tracking in-process rows.
117+
pub processing: ProcessingCounters,
69118
}
70119

71120
impl UpdateStats {
@@ -77,7 +126,7 @@ impl UpdateStats {
77126
num_updates: self.num_updates.delta(&base.num_updates),
78127
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
79128
num_errors: self.num_errors.delta(&base.num_errors),
80-
num_in_process: self.num_in_process.delta(&base.num_in_process),
129+
processing: self.processing.delta(&base.processing),
81130
}
82131
}
83132

@@ -88,7 +137,7 @@ impl UpdateStats {
88137
self.num_updates.merge(&delta.num_updates);
89138
self.num_reprocesses.merge(&delta.num_reprocesses);
90139
self.num_errors.merge(&delta.num_errors);
91-
self.num_in_process.merge(&delta.num_in_process);
140+
self.processing.merge(&delta.processing);
92141
}
93142

94143
pub fn has_any_change(&self) -> bool {
@@ -100,43 +149,43 @@ impl UpdateStats {
100149
}
101150

102151
/// Start processing the specified number of rows.
103-
/// Increments the in-process counter and is called when beginning row processing.
152+
/// Increments the processing start counter.
104153
pub fn start_processing(&self, count: i64) {
105-
self.num_in_process.inc(count);
154+
self.processing.start(count);
106155
}
107156

108157
/// Finish processing the specified number of rows.
109-
/// Decrements the in-process counter and is called when row processing completes.
158+
/// Increments the processing end counter.
110159
pub fn finish_processing(&self, count: i64) {
111-
self.num_in_process.inc(-count);
160+
self.processing.end(count);
112161
}
113162

114163
/// Get the current number of rows being processed.
115164
pub fn get_in_process_count(&self) -> i64 {
116-
self.num_in_process.get()
165+
self.processing.get_in_process()
117166
}
118167
}
119168

120169
/// Per-operation tracking of in-process row counts.
121170
#[derive(Debug, Default)]
122171
pub struct OperationInProcessStats {
123-
/// Maps operation names to their current in-process row counts.
124-
operation_counters: std::sync::RwLock<std::collections::HashMap<String, Counter>>,
172+
/// Maps operation names to their processing counters.
173+
operation_counters: std::sync::RwLock<std::collections::HashMap<String, ProcessingCounters>>,
125174
}
126175

127176
impl OperationInProcessStats {
128177
/// Start processing rows for the specified operation.
129178
pub fn start_processing(&self, operation_name: &str, count: i64) {
130179
let mut counters = self.operation_counters.write().unwrap();
131180
let counter = counters.entry(operation_name.to_string()).or_default();
132-
counter.inc(count);
181+
counter.start(count);
133182
}
134183

135184
/// Finish processing rows for the specified operation.
136185
pub fn finish_processing(&self, operation_name: &str, count: i64) {
137186
let counters = self.operation_counters.write().unwrap();
138187
if let Some(counter) = counters.get(operation_name) {
139-
counter.inc(-count);
188+
counter.end(count);
140189
}
141190
}
142191

@@ -145,22 +194,25 @@ impl OperationInProcessStats {
145194
let counters = self.operation_counters.read().unwrap();
146195
counters
147196
.get(operation_name)
148-
.map_or(0, |counter| counter.get())
197+
.map_or(0, |counter| counter.get_in_process())
149198
}
150199

151200
/// Get a snapshot of all operation in-process counts.
152201
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
153202
let counters = self.operation_counters.read().unwrap();
154203
counters
155204
.iter()
156-
.map(|(name, counter)| (name.clone(), counter.get()))
205+
.map(|(name, counter)| (name.clone(), counter.get_in_process()))
157206
.collect()
158207
}
159208

160209
/// Get the total in-process count across all operations.
161210
pub fn get_total_in_process_count(&self) -> i64 {
162211
let counters = self.operation_counters.read().unwrap();
163-
counters.values().map(|counter| counter.get()).sum()
212+
counters
213+
.values()
214+
.map(|counter| counter.get_in_process())
215+
.sum()
164216
}
165217
}
166218

@@ -204,7 +256,7 @@ impl std::fmt::Display for UpdateStats {
204256
));
205257
}
206258

207-
let num_in_process = self.num_in_process.get();
259+
let num_in_process = self.processing.get_in_process();
208260
if num_in_process > 0 {
209261
messages.push(format!("{num_in_process} source rows IN PROCESS"));
210262
}
@@ -251,6 +303,69 @@ mod tests {
251303
use std::sync::Arc;
252304
use std::thread;
253305

306+
#[test]
307+
fn test_processing_counters() {
308+
let counters = ProcessingCounters::default();
309+
310+
// Initially should be zero
311+
assert_eq!(counters.get_in_process(), 0);
312+
assert_eq!(counters.get_total_starts(), 0);
313+
assert_eq!(counters.get_total_ends(), 0);
314+
315+
// Start processing some items
316+
counters.start(5);
317+
assert_eq!(counters.get_in_process(), 5);
318+
assert_eq!(counters.get_total_starts(), 5);
319+
assert_eq!(counters.get_total_ends(), 0);
320+
321+
// Start processing more items
322+
counters.start(3);
323+
assert_eq!(counters.get_in_process(), 8);
324+
assert_eq!(counters.get_total_starts(), 8);
325+
assert_eq!(counters.get_total_ends(), 0);
326+
327+
// End processing some items
328+
counters.end(2);
329+
assert_eq!(counters.get_in_process(), 6);
330+
assert_eq!(counters.get_total_starts(), 8);
331+
assert_eq!(counters.get_total_ends(), 2);
332+
333+
// End processing remaining items
334+
counters.end(6);
335+
assert_eq!(counters.get_in_process(), 0);
336+
assert_eq!(counters.get_total_starts(), 8);
337+
assert_eq!(counters.get_total_ends(), 8);
338+
}
339+
340+
#[test]
341+
fn test_processing_counters_delta_and_merge() {
342+
let base = ProcessingCounters::default();
343+
let current = ProcessingCounters::default();
344+
345+
// Set up base state
346+
base.start(5);
347+
base.end(2);
348+
349+
// Set up current state
350+
current.start(12);
351+
current.end(4);
352+
353+
// Calculate delta
354+
let delta = current.delta(&base);
355+
assert_eq!(delta.get_total_starts(), 7); // 12 - 5
356+
assert_eq!(delta.get_total_ends(), 2); // 4 - 2
357+
assert_eq!(delta.get_in_process(), 5); // 7 - 2
358+
359+
// Test merge
360+
let merged = ProcessingCounters::default();
361+
merged.start(10);
362+
merged.end(3);
363+
merged.merge(&delta);
364+
assert_eq!(merged.get_total_starts(), 17); // 10 + 7
365+
assert_eq!(merged.get_total_ends(), 5); // 3 + 2
366+
assert_eq!(merged.get_in_process(), 12); // 17 - 5
367+
}
368+
254369
#[test]
255370
fn test_update_stats_in_process_tracking() {
256371
let stats = UpdateStats::default();

0 commit comments

Comments
 (0)