Skip to content

Commit 7715b51

Browse files
committed
feat/additional-counter-numofrows-processed
1 parent 9baca6e commit 7715b51

File tree

3 files changed

+316
-1
lines changed

3 files changed

+316
-1
lines changed

src/execution/live_updater.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
2727
flow_ctx: Arc<FlowContext>,
2828
join_set: Mutex<Option<JoinSet<Result<()>>>>,
2929
stats_per_task: Vec<Arc<stats::UpdateStats>>,
30+
/// Global tracking of in-process rows per operation
31+
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
3032
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
3133
num_remaining_tasks_rx: watch::Receiver<usize>,
3234

@@ -83,6 +85,7 @@ struct SourceUpdateTask {
8385
plan: Arc<plan::ExecutionPlan>,
8486
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
8587
source_update_stats: Arc<stats::UpdateStats>,
88+
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
8689
pool: PgPool,
8790
options: FlowLiveUpdaterOptions,
8891

@@ -137,6 +140,7 @@ impl SourceUpdateTask {
137140
let change_stream_stats = change_stream_stats.clone();
138141
let pool = self.pool.clone();
139142
let status_tx = self.status_tx.clone();
143+
let operation_in_process_stats = self.operation_in_process_stats.clone();
140144
async move {
141145
let mut change_stream = change_stream;
142146
let retry_options = retryable::RetryOptions {
@@ -203,6 +207,7 @@ impl SourceUpdateTask {
203207
},
204208
super::source_indexer::UpdateMode::Normal,
205209
update_stats.clone(),
210+
Some(operation_in_process_stats.clone()),
206211
concur_permit,
207212
Some(move || async move {
208213
SharedAckFn::ack(&shared_ack_fn).await
@@ -328,6 +333,7 @@ impl FlowLiveUpdater {
328333

329334
let mut join_set = JoinSet::new();
330335
let mut stats_per_task = Vec::new();
336+
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());
331337

332338
for source_idx in 0..plan.import_ops.len() {
333339
let source_update_stats = Arc::new(stats::UpdateStats::default());
@@ -337,6 +343,7 @@ impl FlowLiveUpdater {
337343
plan: plan.clone(),
338344
execution_ctx: execution_ctx.clone(),
339345
source_update_stats: source_update_stats.clone(),
346+
operation_in_process_stats: operation_in_process_stats.clone(),
340347
pool: pool.clone(),
341348
options: options.clone(),
342349
status_tx: status_tx.clone(),
@@ -345,10 +352,12 @@ impl FlowLiveUpdater {
345352
join_set.spawn(source_update_task.run());
346353
stats_per_task.push(source_update_stats);
347354
}
355+
348356
Ok(Self {
349357
flow_ctx,
350358
join_set: Mutex::new(Some(join_set)),
351359
stats_per_task,
360+
operation_in_process_stats,
352361
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
353362
status_rx,
354363
last_num_source_updates: vec![0; plan.import_ops.len()],
@@ -406,6 +415,23 @@ impl FlowLiveUpdater {
406415
}
407416
}
408417

418+
/// Get the total number of rows currently being processed across all operations.
419+
pub fn get_total_in_process_count(&self) -> i64 {
420+
self.operation_in_process_stats.get_total_in_process_count()
421+
}
422+
423+
/// Get the number of rows currently being processed for a specific operation.
424+
pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 {
425+
self.operation_in_process_stats
426+
.get_operation_in_process_count(operation_name)
427+
}
428+
429+
/// Get a snapshot of all operation in-process counts.
430+
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
431+
self.operation_in_process_stats
432+
.get_all_operations_in_process()
433+
}
434+
409435
pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
410436
let mut recv_state = self.recv_state.lock().await;
411437
let recv_state = &mut *recv_state;

src/execution/source_indexer.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,15 +317,31 @@ impl SourceIndexingContext {
317317
row_input: ProcessSourceRowInput,
318318
mode: UpdateMode,
319319
update_stats: Arc<stats::UpdateStats>,
320+
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
320321
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
321322
ack_fn: Option<AckFn>,
322323
pool: PgPool,
323324
) {
325+
// Store operation name for tracking cleanup
326+
let operation_name = {
327+
let plan_result = self.flow.get_execution_plan().await;
328+
match plan_result {
329+
Ok(plan) => plan.import_ops[self.source_idx].name.clone(),
330+
Err(_) => "unknown".to_string(),
331+
}
332+
};
333+
324334
let process = async {
325335
let plan = self.flow.get_execution_plan().await?;
326336
let import_op = &plan.import_ops[self.source_idx];
327337
let schema = &self.flow.data_schema;
328338

339+
// Track that we're starting to process this row
340+
update_stats.start_processing(1);
341+
if let Some(ref op_stats) = operation_in_process_stats {
342+
op_stats.start_processing(&import_op.name, 1);
343+
}
344+
329345
let eval_ctx = SourceRowEvaluationContext {
330346
plan: &plan,
331347
import_op,
@@ -474,7 +490,15 @@ impl SourceIndexingContext {
474490
result
475491
};
476492
let process_and_ack = async {
477-
process.await?;
493+
let result = process.await;
494+
495+
// Track that we're finishing processing this row (regardless of success/failure)
496+
update_stats.finish_processing(1);
497+
if let Some(ref op_stats) = operation_in_process_stats {
498+
op_stats.finish_processing(&operation_name, 1);
499+
}
500+
501+
result?;
478502
if let Some(ack_fn) = ack_fn {
479503
ack_fn().await?;
480504
}
@@ -603,6 +627,7 @@ impl SourceIndexingContext {
603627
},
604628
update_options.mode,
605629
update_stats.clone(),
630+
None, // operation_in_process_stats
606631
concur_permit,
607632
NO_ACK,
608633
pool.clone(),
@@ -642,6 +667,7 @@ impl SourceIndexingContext {
642667
},
643668
update_options.mode,
644669
update_stats.clone(),
670+
None, // operation_in_process_stats
645671
concur_permit,
646672
NO_ACK,
647673
pool.clone(),

0 commit comments

Comments
 (0)