Skip to content

Commit 339d41d

Browse files
committed
scope issue fixed, trackin range fixed, moved block to async closure
1 parent 8421d95 commit 339d41d

File tree

5 files changed

+50
-16
lines changed

5 files changed

+50
-16
lines changed

src/builder/analyzer.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,14 +969,41 @@ impl AnalyzerContext {
969969
op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?);
970970
}
971971
let collector_len = op_scope.states.lock().unwrap().collectors.len();
972+
let scope_qualifier = self.build_scope_qualifier(op_scope);
972973
let result_fut = async move {
973974
Ok(AnalyzedOpScope {
974975
reactive_ops: try_join_all(op_futs).await?,
975976
collector_len,
977+
scope_qualifier,
976978
})
977979
};
978980
Ok(result_fut)
979981
}
982+
983+
fn build_scope_qualifier(&self, op_scope: &Arc<OpScope>) -> String {
984+
let mut scope_names = Vec::new();
985+
let mut current_scope = Some(op_scope.as_ref());
986+
987+
// Walk up the parent chain to collect scope names
988+
while let Some(scope) = current_scope {
989+
if let Some((parent, _)) = &scope.parent {
990+
scope_names.push(scope.name.clone());
991+
current_scope = Some(parent.as_ref());
992+
} else {
993+
break;
994+
}
995+
}
996+
997+
// Reverse to get the correct order (root to leaf)
998+
scope_names.reverse();
999+
1000+
// Build the qualifier string: "" for root, "name." for single level, "parent.child." for nested
1001+
if scope_names.is_empty() {
1002+
String::new()
1003+
} else {
1004+
format!("{}.", scope_names.join("."))
1005+
}
1006+
}
9801007
}
9811008

9821009
pub fn build_flow_instance_context(

src/builder/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ pub enum AnalyzedReactiveOp {
130130
pub struct AnalyzedOpScope {
131131
pub reactive_ops: Vec<AnalyzedReactiveOp>,
132132
pub collector_len: usize,
133+
pub scope_qualifier: String,
133134
}
134135

135136
pub struct ExecutionPlan {

src/execution/evaluator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ async fn evaluate_op_scope(
364364
AnalyzedReactiveOp::Transform(op) => {
365365
// Track transform operation start
366366
if let Some(ref op_stats) = operation_in_process_stats {
367-
let transform_key = format!("transform/{}", op.name);
367+
let transform_key =
368+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
368369
op_stats.start_processing(&transform_key, 1);
369370
}
370371

@@ -400,7 +401,8 @@ async fn evaluate_op_scope(
400401

401402
// Track transform operation completion
402403
if let Some(ref op_stats) = operation_in_process_stats {
403-
let transform_key = format!("transform/{}", op.name);
404+
let transform_key =
405+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
404406
op_stats.finish_processing(&transform_key, 1);
405407
}
406408

src/execution/row_indexer.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -375,16 +375,15 @@ impl<'a> RowIndexer<'a> {
375375
})
376376
.collect();
377377
(!mutations_w_ctx.is_empty()).then(|| {
378-
// Track export operation start
379-
if let Some(ref op_stats) = self.operation_in_process_stats {
380-
let export_key = format!("export/{}", export_op_group.target_kind);
381-
op_stats.start_processing(&export_key, 1);
382-
}
383-
384378
let export_key = format!("export/{}", export_op_group.target_kind);
385379
let operation_in_process_stats = self.operation_in_process_stats;
386380

387381
async move {
382+
// Track export operation start
383+
if let Some(ref op_stats) = operation_in_process_stats {
384+
op_stats.start_processing(&export_key, 1);
385+
}
386+
388387
let result = export_op_group
389388
.target_factory
390389
.apply_mutation(mutations_w_ctx)

src/execution/source_indexer.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,10 +338,6 @@ impl SourceIndexingContext {
338338

339339
// Track that we're starting to process this row
340340
update_stats.processing.start(1);
341-
if let Some(ref op_stats) = operation_in_process_stats {
342-
let import_key = format!("import/{}", import_op.name);
343-
op_stats.start_processing(&import_key, 1);
344-
}
345341

346342
let eval_ctx = SourceRowEvaluationContext {
347343
plan: &plan,
@@ -351,20 +347,26 @@ impl SourceIndexingContext {
351347
import_op_idx: self.source_idx,
352348
};
353349
let process_time = chrono::Utc::now();
350+
let operation_in_process_stats_cloned = operation_in_process_stats.clone();
354351
let row_indexer = row_indexer::RowIndexer::new(
355352
&eval_ctx,
356353
&self.setup_execution_ctx,
357354
mode,
358355
process_time,
359356
&update_stats,
360-
operation_in_process_stats.as_ref().map(|s| s.as_ref()),
357+
operation_in_process_stats_cloned
358+
.as_ref()
359+
.map(|s| s.as_ref()),
361360
&pool,
362361
)?;
363362

364363
let source_data = row_input.data;
365364
let mut row_state_operator =
366365
LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats);
367366
let mut ordinal_touched = false;
367+
368+
let operation_in_process_stats_for_async = operation_in_process_stats.clone();
369+
let operation_name_for_async = operation_name.clone();
368370
let result = {
369371
let row_state_operator = &mut row_state_operator;
370372
let row_key = &row_input.key;
@@ -417,6 +419,9 @@ impl SourceIndexingContext {
417419
(ordinal, source_data.content_version_fp, value)
418420
}
419421
_ => {
422+
if let Some(ref op_stats) = operation_in_process_stats_for_async {
423+
op_stats.start_processing(&operation_name_for_async, 1);
424+
}
420425
let data = import_op
421426
.executor
422427
.get_value(
@@ -433,6 +438,9 @@ impl SourceIndexingContext {
433438
},
434439
)
435440
.await?;
441+
if let Some(ref op_stats) = operation_in_process_stats_for_async {
442+
op_stats.finish_processing(&operation_name_for_async, 1);
443+
}
436444
(
437445
data.ordinal.ok_or_else(|| {
438446
anyhow::anyhow!("ordinal is not available")
@@ -496,9 +504,6 @@ impl SourceIndexingContext {
496504

497505
// Track that we're finishing processing this row (regardless of success/failure)
498506
update_stats.processing.end(1);
499-
if let Some(ref op_stats) = operation_in_process_stats {
500-
op_stats.finish_processing(&operation_name, 1);
501-
}
502507

503508
result?;
504509
if let Some(ack_fn) = ack_fn {

0 commit comments

Comments
 (0)