Skip to content

Commit 1d843b0

Browse files
authored
feat: collect additional counter for number of rows that are being processed for stats (#1105)
* feat/additional-counter-numofrows-processed * fix: now uses ProcessingCounters struct, to make it cummulative * refactored according to reviews * using target_kind for group identification * scope issue fixed, trackin range fixed, moved block to async closure * removed assigning current_scope as Option * minor changes as required
1 parent ef87b68 commit 1d843b0

File tree

7 files changed

+632
-21
lines changed

7 files changed

+632
-21
lines changed

src/builder/analyzer.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,14 +972,38 @@ impl AnalyzerContext {
972972
op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?);
973973
}
974974
let collector_len = op_scope.states.lock().unwrap().collectors.len();
975+
let scope_qualifier = self.build_scope_qualifier(op_scope);
975976
let result_fut = async move {
976977
Ok(AnalyzedOpScope {
977978
reactive_ops: try_join_all(op_futs).await?,
978979
collector_len,
980+
scope_qualifier,
979981
})
980982
};
981983
Ok(result_fut)
982984
}
985+
986+
fn build_scope_qualifier(&self, op_scope: &Arc<OpScope>) -> String {
987+
let mut scope_names = Vec::new();
988+
let mut current_scope = op_scope.as_ref();
989+
990+
// Walk up the parent chain to collect scope names
991+
while let Some((parent, _)) = &current_scope.parent {
992+
scope_names.push(current_scope.name.as_str());
993+
current_scope = parent.as_ref();
994+
}
995+
996+
// Reverse to get the correct order (root to leaf)
997+
scope_names.reverse();
998+
999+
// Build the qualifier string
1000+
let mut result = String::new();
1001+
for name in scope_names {
1002+
result.push_str(&name);
1003+
result.push('.');
1004+
}
1005+
result
1006+
}
9831007
}
9841008

9851009
pub fn build_flow_instance_context(
@@ -1062,6 +1086,7 @@ pub async fn analyze_flow(
10621086
let target_factory = get_target_factory(&target_kind)?;
10631087
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
10641088
target_factory,
1089+
target_kind: target_kind.clone(),
10651090
op_idx: op_ids.export_op_ids,
10661091
};
10671092
export_ops_futs.extend(

src/builder/plan.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ pub struct AnalyzedExportOp {
117117

118118
pub struct AnalyzedExportTargetOpGroup {
119119
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
120+
pub target_kind: String,
120121
pub op_idx: Vec<usize>,
121122
}
122123

@@ -129,6 +130,7 @@ pub enum AnalyzedReactiveOp {
129130
pub struct AnalyzedOpScope {
130131
pub reactive_ops: Vec<AnalyzedReactiveOp>,
131132
pub collector_len: usize,
133+
pub scope_qualifier: String,
132134
}
133135

134136
pub struct ExecutionPlan {

src/execution/evaluator.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ async fn evaluate_child_op_scope(
322322
child_scope_entry: ScopeEntry<'_>,
323323
concurrency_controller: &concur_control::ConcurrencyController,
324324
memory: &EvaluationMemory,
325+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
325326
) -> Result<()> {
326327
let _permit = concurrency_controller
327328
.acquire(Some(|| {
@@ -333,32 +334,46 @@ async fn evaluate_child_op_scope(
333334
.sum()
334335
}))
335336
.await?;
336-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
337-
.await
338-
.with_context(|| {
339-
format!(
340-
"Evaluating in scope with key {}",
341-
match child_scope_entry.key.key() {
342-
Some(k) => k.to_string(),
343-
None => "()".to_string(),
344-
}
345-
)
346-
})
337+
evaluate_op_scope(
338+
op_scope,
339+
scoped_entries.prepend(&child_scope_entry),
340+
memory,
341+
operation_in_process_stats,
342+
)
343+
.await
344+
.with_context(|| {
345+
format!(
346+
"Evaluating in scope with key {}",
347+
match child_scope_entry.key.key() {
348+
Some(k) => k.to_string(),
349+
None => "()".to_string(),
350+
}
351+
)
352+
})
347353
}
348354

349355
async fn evaluate_op_scope(
350356
op_scope: &AnalyzedOpScope,
351357
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
352358
memory: &EvaluationMemory,
359+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
353360
) -> Result<()> {
354361
let head_scope = *scoped_entries.head().unwrap();
355362
for reactive_op in op_scope.reactive_ops.iter() {
356363
match reactive_op {
357364
AnalyzedReactiveOp::Transform(op) => {
365+
// Track transform operation start
366+
if let Some(ref op_stats) = operation_in_process_stats {
367+
let transform_key =
368+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
369+
op_stats.start_processing(&transform_key, 1);
370+
}
371+
358372
let mut input_values = Vec::with_capacity(op.inputs.len());
359373
input_values
360374
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
361-
if op.function_exec_info.enable_cache {
375+
376+
let result = if op.function_exec_info.enable_cache {
362377
let output_value_cell = memory.get_cache_entry(
363378
|| {
364379
Ok(op
@@ -382,7 +397,16 @@ async fn evaluate_op_scope(
382397
.await
383398
.and_then(|v| head_scope.define_field(&op.output, &v))
384399
}
385-
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?
400+
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
401+
402+
// Track transform operation completion
403+
if let Some(ref op_stats) = operation_in_process_stats {
404+
let transform_key =
405+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
406+
op_stats.finish_processing(&transform_key, 1);
407+
}
408+
409+
result?
386410
}
387411

388412
AnalyzedReactiveOp::ForEach(op) => {
@@ -408,6 +432,7 @@ async fn evaluate_op_scope(
408432
),
409433
&op.concurrency_controller,
410434
memory,
435+
operation_in_process_stats,
411436
)
412437
})
413438
.collect::<Vec<_>>(),
@@ -425,6 +450,7 @@ async fn evaluate_op_scope(
425450
),
426451
&op.concurrency_controller,
427452
memory,
453+
operation_in_process_stats,
428454
)
429455
})
430456
.collect::<Vec<_>>(),
@@ -443,6 +469,7 @@ async fn evaluate_op_scope(
443469
),
444470
&op.concurrency_controller,
445471
memory,
472+
operation_in_process_stats,
446473
)
447474
})
448475
.collect::<Vec<_>>(),
@@ -509,6 +536,7 @@ pub async fn evaluate_source_entry(
509536
src_eval_ctx: &SourceRowEvaluationContext<'_>,
510537
source_value: value::FieldValues,
511538
memory: &EvaluationMemory,
539+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
512540
) -> Result<EvaluateSourceEntryOutput> {
513541
let _permit = src_eval_ctx
514542
.import_op
@@ -556,6 +584,7 @@ pub async fn evaluate_source_entry(
556584
&src_eval_ctx.plan.op_scope,
557585
RefList::Nil.prepend(&root_scope_entry),
558586
memory,
587+
operation_in_process_stats,
559588
)
560589
.await?;
561590
let collected_values = root_scope_entry
@@ -604,6 +633,7 @@ pub async fn evaluate_transient_flow(
604633
&flow.execution_plan.op_scope,
605634
RefList::Nil.prepend(&root_scope_entry),
606635
&eval_memory,
636+
None, // No operation stats for transient flows
607637
)
608638
.await?;
609639
let output_value = assemble_value(

src/execution/live_updater.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
2727
flow_ctx: Arc<FlowContext>,
2828
join_set: Mutex<Option<JoinSet<Result<()>>>>,
2929
stats_per_task: Vec<Arc<stats::UpdateStats>>,
30+
/// Global tracking of in-process rows per operation
31+
pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
3032
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
3133
num_remaining_tasks_rx: watch::Receiver<usize>,
3234

@@ -83,6 +85,7 @@ struct SourceUpdateTask {
8385
plan: Arc<plan::ExecutionPlan>,
8486
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
8587
source_update_stats: Arc<stats::UpdateStats>,
88+
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
8689
pool: PgPool,
8790
options: FlowLiveUpdaterOptions,
8891

@@ -137,6 +140,7 @@ impl SourceUpdateTask {
137140
let change_stream_stats = change_stream_stats.clone();
138141
let pool = self.pool.clone();
139142
let status_tx = self.status_tx.clone();
143+
let operation_in_process_stats = self.operation_in_process_stats.clone();
140144
async move {
141145
let mut change_stream = change_stream;
142146
let retry_options = retryable::RetryOptions {
@@ -203,6 +207,7 @@ impl SourceUpdateTask {
203207
},
204208
super::source_indexer::UpdateMode::Normal,
205209
update_stats.clone(),
210+
Some(operation_in_process_stats.clone()),
206211
concur_permit,
207212
Some(move || async move {
208213
SharedAckFn::ack(&shared_ack_fn).await
@@ -328,6 +333,7 @@ impl FlowLiveUpdater {
328333

329334
let mut join_set = JoinSet::new();
330335
let mut stats_per_task = Vec::new();
336+
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());
331337

332338
for source_idx in 0..plan.import_ops.len() {
333339
let source_update_stats = Arc::new(stats::UpdateStats::default());
@@ -337,6 +343,7 @@ impl FlowLiveUpdater {
337343
plan: plan.clone(),
338344
execution_ctx: execution_ctx.clone(),
339345
source_update_stats: source_update_stats.clone(),
346+
operation_in_process_stats: operation_in_process_stats.clone(),
340347
pool: pool.clone(),
341348
options: options.clone(),
342349
status_tx: status_tx.clone(),
@@ -345,10 +352,12 @@ impl FlowLiveUpdater {
345352
join_set.spawn(source_update_task.run());
346353
stats_per_task.push(source_update_stats);
347354
}
355+
348356
Ok(Self {
349357
flow_ctx,
350358
join_set: Mutex::new(Some(join_set)),
351359
stats_per_task,
360+
operation_in_process_stats,
352361
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
353362
status_rx,
354363
last_num_source_updates: vec![0; plan.import_ops.len()],

src/execution/row_indexer.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ pub struct RowIndexer<'a> {
183183
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
184184
mode: super::source_indexer::UpdateMode,
185185
update_stats: &'a stats::UpdateStats,
186+
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
186187
pool: &'a PgPool,
187188

188189
source_id: i32,
@@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> {
201202
mode: super::source_indexer::UpdateMode,
202203
process_time: chrono::DateTime<chrono::Utc>,
203204
update_stats: &'a stats::UpdateStats,
205+
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
204206
pool: &'a PgPool,
205207
) -> Result<Self> {
206208
Ok(Self {
@@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> {
212214
setup_execution_ctx,
213215
mode,
214216
update_stats,
217+
operation_in_process_stats,
215218
pool,
216219
})
217220
}
@@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> {
311314
},
312315
);
313316

314-
let output =
315-
evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory)
316-
.await?;
317+
let output = evaluate_source_entry(
318+
self.src_eval_ctx,
319+
source_value,
320+
&evaluation_memory,
321+
self.operation_in_process_stats,
322+
)
323+
.await?;
317324
let mut stored_info = evaluation_memory.into_stored()?;
318325
if tracking_setup_state.has_fast_fingerprint_column {
319326
(Some(output), stored_info, content_version_fp)
@@ -368,9 +375,27 @@ impl<'a> RowIndexer<'a> {
368375
})
369376
.collect();
370377
(!mutations_w_ctx.is_empty()).then(|| {
371-
export_op_group
372-
.target_factory
373-
.apply_mutation(mutations_w_ctx)
378+
let export_key = format!("export/{}", export_op_group.target_kind);
379+
let operation_in_process_stats = self.operation_in_process_stats;
380+
381+
async move {
382+
// Track export operation start
383+
if let Some(ref op_stats) = operation_in_process_stats {
384+
op_stats.start_processing(&export_key, 1);
385+
}
386+
387+
let result = export_op_group
388+
.target_factory
389+
.apply_mutation(mutations_w_ctx)
390+
.await;
391+
392+
// Track export operation completion
393+
if let Some(ref op_stats) = operation_in_process_stats {
394+
op_stats.finish_processing(&export_key, 1);
395+
}
396+
397+
result
398+
}
374399
})
375400
});
376401

@@ -875,7 +900,7 @@ pub async fn evaluate_source_entry_with_memory(
875900
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
876901
let output = match source_value {
877902
interface::SourceValue::Existence(source_value) => {
878-
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
903+
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?)
879904
}
880905
interface::SourceValue::NonExistence => None,
881906
};

0 commit comments

Comments
 (0)