Skip to content

Commit 1449612

Browse files
committed
refactored according to reviews
1 parent 03893b7 commit 1449612

File tree

4 files changed

+253
-83
lines changed

4 files changed

+253
-83
lines changed

src/execution/evaluator.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ async fn evaluate_child_op_scope(
322322
child_scope_entry: ScopeEntry<'_>,
323323
concurrency_controller: &concur_control::ConcurrencyController,
324324
memory: &EvaluationMemory,
325+
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
325326
) -> Result<()> {
326327
let _permit = concurrency_controller
327328
.acquire(Some(|| {
@@ -333,32 +334,44 @@ async fn evaluate_child_op_scope(
333334
.sum()
334335
}))
335336
.await?;
336-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
337-
.await
338-
.with_context(|| {
339-
format!(
340-
"Evaluating in scope with key {}",
341-
match child_scope_entry.key.key() {
342-
Some(k) => k.to_string(),
343-
None => "()".to_string(),
344-
}
345-
)
346-
})
337+
evaluate_op_scope(
338+
op_scope,
339+
scoped_entries.prepend(&child_scope_entry),
340+
memory,
341+
operation_in_process_stats,
342+
)
343+
.await
344+
.with_context(|| {
345+
format!(
346+
"Evaluating in scope with key {}",
347+
match child_scope_entry.key.key() {
348+
Some(k) => k.to_string(),
349+
None => "()".to_string(),
350+
}
351+
)
352+
})
347353
}
348354

349355
async fn evaluate_op_scope(
350356
op_scope: &AnalyzedOpScope,
351357
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
352358
memory: &EvaluationMemory,
359+
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
353360
) -> Result<()> {
354361
let head_scope = *scoped_entries.head().unwrap();
355362
for reactive_op in op_scope.reactive_ops.iter() {
356363
match reactive_op {
357364
AnalyzedReactiveOp::Transform(op) => {
365+
// Track transform operation start
366+
if let Some(ref op_stats) = operation_in_process_stats {
367+
op_stats.start_processing(&op.name, 1);
368+
}
369+
358370
let mut input_values = Vec::with_capacity(op.inputs.len());
359371
input_values
360372
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
361-
if op.function_exec_info.enable_cache {
373+
374+
let result = if op.function_exec_info.enable_cache {
362375
let output_value_cell = memory.get_cache_entry(
363376
|| {
364377
Ok(op
@@ -382,7 +395,14 @@ async fn evaluate_op_scope(
382395
.await
383396
.and_then(|v| head_scope.define_field(&op.output, &v))
384397
}
385-
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?
398+
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
399+
400+
// Track transform operation completion
401+
if let Some(ref op_stats) = operation_in_process_stats {
402+
op_stats.finish_processing(&op.name, 1);
403+
}
404+
405+
result?
386406
}
387407

388408
AnalyzedReactiveOp::ForEach(op) => {
@@ -408,6 +428,7 @@ async fn evaluate_op_scope(
408428
),
409429
&op.concurrency_controller,
410430
memory,
431+
operation_in_process_stats,
411432
)
412433
})
413434
.collect::<Vec<_>>(),
@@ -425,6 +446,7 @@ async fn evaluate_op_scope(
425446
),
426447
&op.concurrency_controller,
427448
memory,
449+
operation_in_process_stats,
428450
)
429451
})
430452
.collect::<Vec<_>>(),
@@ -443,6 +465,7 @@ async fn evaluate_op_scope(
443465
),
444466
&op.concurrency_controller,
445467
memory,
468+
operation_in_process_stats,
446469
)
447470
})
448471
.collect::<Vec<_>>(),
@@ -509,6 +532,7 @@ pub async fn evaluate_source_entry(
509532
src_eval_ctx: &SourceRowEvaluationContext<'_>,
510533
source_value: value::FieldValues,
511534
memory: &EvaluationMemory,
535+
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
512536
) -> Result<EvaluateSourceEntryOutput> {
513537
let _permit = src_eval_ctx
514538
.import_op
@@ -556,6 +580,7 @@ pub async fn evaluate_source_entry(
556580
&src_eval_ctx.plan.op_scope,
557581
RefList::Nil.prepend(&root_scope_entry),
558582
memory,
583+
operation_in_process_stats,
559584
)
560585
.await?;
561586
let collected_values = root_scope_entry
@@ -604,6 +629,7 @@ pub async fn evaluate_transient_flow(
604629
&flow.execution_plan.op_scope,
605630
RefList::Nil.prepend(&root_scope_entry),
606631
&eval_memory,
632+
None, // No operation stats for transient flows
607633
)
608634
.await?;
609635
let output_value = assemble_value(

src/execution/row_indexer.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ pub struct RowIndexer<'a> {
183183
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
184184
mode: super::source_indexer::UpdateMode,
185185
update_stats: &'a stats::UpdateStats,
186+
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
186187
pool: &'a PgPool,
187188

188189
source_id: i32,
@@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> {
201202
mode: super::source_indexer::UpdateMode,
202203
process_time: chrono::DateTime<chrono::Utc>,
203204
update_stats: &'a stats::UpdateStats,
205+
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
204206
pool: &'a PgPool,
205207
) -> Result<Self> {
206208
Ok(Self {
@@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> {
212214
setup_execution_ctx,
213215
mode,
214216
update_stats,
217+
operation_in_process_stats,
215218
pool,
216219
})
217220
}
@@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> {
311314
},
312315
);
313316

314-
let output =
315-
evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory)
316-
.await?;
317+
let output = evaluate_source_entry(
318+
self.src_eval_ctx,
319+
source_value,
320+
&evaluation_memory,
321+
self.operation_in_process_stats,
322+
)
323+
.await?;
317324
let mut stored_info = evaluation_memory.into_stored()?;
318325
if tracking_setup_state.has_fast_fingerprint_column {
319326
(Some(output), stored_info, content_version_fp)
@@ -368,9 +375,36 @@ impl<'a> RowIndexer<'a> {
368375
})
369376
.collect();
370377
(!mutations_w_ctx.is_empty()).then(|| {
371-
export_op_group
372-
.target_factory
373-
.apply_mutation(mutations_w_ctx)
378+
// Track export operation start
379+
if let Some(ref op_stats) = self.operation_in_process_stats {
380+
for export_op_idx in &export_op_group.op_idx {
381+
let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx];
382+
op_stats.start_processing(&export_op.name, 1);
383+
}
384+
}
385+
386+
let export_op_names: Vec<String> = export_op_group
387+
.op_idx
388+
.iter()
389+
.map(|idx| self.src_eval_ctx.plan.export_ops[*idx].name.clone())
390+
.collect();
391+
let operation_in_process_stats = self.operation_in_process_stats;
392+
393+
async move {
394+
let result = export_op_group
395+
.target_factory
396+
.apply_mutation(mutations_w_ctx)
397+
.await;
398+
399+
// Track export operation completion
400+
if let Some(ref op_stats) = operation_in_process_stats {
401+
for export_op_name in &export_op_names {
402+
op_stats.finish_processing(export_op_name, 1);
403+
}
404+
}
405+
406+
result
407+
}
374408
})
375409
});
376410

@@ -875,7 +909,7 @@ pub async fn evaluate_source_entry_with_memory(
875909
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
876910
let output = match source_value {
877911
interface::SourceValue::Existence(source_value) => {
878-
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
912+
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?)
879913
}
880914
interface::SourceValue::NonExistence => None,
881915
};

src/execution/source_indexer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ impl SourceIndexingContext {
337337
let schema = &self.flow.data_schema;
338338

339339
// Track that we're starting to process this row
340-
update_stats.start_processing(1);
340+
update_stats.processing.start(1);
341341
if let Some(ref op_stats) = operation_in_process_stats {
342342
op_stats.start_processing(&import_op.name, 1);
343343
}
@@ -356,6 +356,7 @@ impl SourceIndexingContext {
356356
mode,
357357
process_time,
358358
&update_stats,
359+
operation_in_process_stats.as_ref().map(|s| s.as_ref()),
359360
&pool,
360361
)?;
361362

@@ -493,7 +494,7 @@ impl SourceIndexingContext {
493494
let result = process.await;
494495

495496
// Track that we're finishing processing this row (regardless of success/failure)
496-
update_stats.finish_processing(1);
497+
update_stats.processing.end(1);
497498
if let Some(ref op_stats) = operation_in_process_stats {
498499
op_stats.finish_processing(&operation_name, 1);
499500
}

0 commit comments

Comments
 (0)