Skip to content

Commit 8421d95

Browse files
committed
using target_kind for group identification
1 parent 1449612 commit 8421d95

File tree

6 files changed

+18
-38
lines changed

6 files changed

+18
-38
lines changed

src/builder/analyzer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,7 @@ pub async fn analyze_flow(
10591059
let target_factory = get_target_factory(&target_kind)?;
10601060
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
10611061
target_factory,
1062+
target_kind: target_kind.clone(),
10621063
op_idx: op_ids.export_op_ids,
10631064
};
10641065
export_ops_futs.extend(

src/builder/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ pub struct AnalyzedExportOp {
117117

118118
pub struct AnalyzedExportTargetOpGroup {
119119
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
120+
pub target_kind: String,
120121
pub op_idx: Vec<usize>,
121122
}
122123

src/execution/evaluator.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ async fn evaluate_child_op_scope(
322322
child_scope_entry: ScopeEntry<'_>,
323323
concurrency_controller: &concur_control::ConcurrencyController,
324324
memory: &EvaluationMemory,
325-
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
325+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
326326
) -> Result<()> {
327327
let _permit = concurrency_controller
328328
.acquire(Some(|| {
@@ -356,15 +356,16 @@ async fn evaluate_op_scope(
356356
op_scope: &AnalyzedOpScope,
357357
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
358358
memory: &EvaluationMemory,
359-
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
359+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
360360
) -> Result<()> {
361361
let head_scope = *scoped_entries.head().unwrap();
362362
for reactive_op in op_scope.reactive_ops.iter() {
363363
match reactive_op {
364364
AnalyzedReactiveOp::Transform(op) => {
365365
// Track transform operation start
366366
if let Some(ref op_stats) = operation_in_process_stats {
367-
op_stats.start_processing(&op.name, 1);
367+
let transform_key = format!("transform/{}", op.name);
368+
op_stats.start_processing(&transform_key, 1);
368369
}
369370

370371
let mut input_values = Vec::with_capacity(op.inputs.len());
@@ -399,7 +400,8 @@ async fn evaluate_op_scope(
399400

400401
// Track transform operation completion
401402
if let Some(ref op_stats) = operation_in_process_stats {
402-
op_stats.finish_processing(&op.name, 1);
403+
let transform_key = format!("transform/{}", op.name);
404+
op_stats.finish_processing(&transform_key, 1);
403405
}
404406

405407
result?
@@ -532,7 +534,7 @@ pub async fn evaluate_source_entry(
532534
src_eval_ctx: &SourceRowEvaluationContext<'_>,
533535
source_value: value::FieldValues,
534536
memory: &EvaluationMemory,
535-
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
537+
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
536538
) -> Result<EvaluateSourceEntryOutput> {
537539
let _permit = src_eval_ctx
538540
.import_op

src/execution/live_updater.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct FlowLiveUpdater {
2828
join_set: Mutex<Option<JoinSet<Result<()>>>>,
2929
stats_per_task: Vec<Arc<stats::UpdateStats>>,
3030
/// Global tracking of in-process rows per operation
31-
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
31+
pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
3232
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
3333
num_remaining_tasks_rx: watch::Receiver<usize>,
3434

@@ -415,23 +415,6 @@ impl FlowLiveUpdater {
415415
}
416416
}
417417

418-
/// Get the total number of rows currently being processed across all operations.
419-
pub fn get_total_in_process_count(&self) -> i64 {
420-
self.operation_in_process_stats.get_total_in_process_count()
421-
}
422-
423-
/// Get the number of rows currently being processed for a specific operation.
424-
pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 {
425-
self.operation_in_process_stats
426-
.get_operation_in_process_count(operation_name)
427-
}
428-
429-
/// Get a snapshot of all operation in-process counts.
430-
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
431-
self.operation_in_process_stats
432-
.get_all_operations_in_process()
433-
}
434-
435418
pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
436419
let mut recv_state = self.recv_state.lock().await;
437420
let recv_state = &mut *recv_state;

src/execution/row_indexer.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,11 @@ impl<'a> RowIndexer<'a> {
377377
(!mutations_w_ctx.is_empty()).then(|| {
378378
// Track export operation start
379379
if let Some(ref op_stats) = self.operation_in_process_stats {
380-
for export_op_idx in &export_op_group.op_idx {
381-
let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx];
382-
op_stats.start_processing(&export_op.name, 1);
383-
}
380+
let export_key = format!("export/{}", export_op_group.target_kind);
381+
op_stats.start_processing(&export_key, 1);
384382
}
385383

386-
let export_op_names: Vec<String> = export_op_group
387-
.op_idx
388-
.iter()
389-
.map(|idx| self.src_eval_ctx.plan.export_ops[*idx].name.clone())
390-
.collect();
384+
let export_key = format!("export/{}", export_op_group.target_kind);
391385
let operation_in_process_stats = self.operation_in_process_stats;
392386

393387
async move {
@@ -398,9 +392,7 @@ impl<'a> RowIndexer<'a> {
398392

399393
// Track export operation completion
400394
if let Some(ref op_stats) = operation_in_process_stats {
401-
for export_op_name in &export_op_names {
402-
op_stats.finish_processing(export_op_name, 1);
403-
}
395+
op_stats.finish_processing(&export_key, 1);
404396
}
405397

406398
result

src/execution/source_indexer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ impl SourceIndexingContext {
326326
let operation_name = {
327327
let plan_result = self.flow.get_execution_plan().await;
328328
match plan_result {
329-
Ok(plan) => plan.import_ops[self.source_idx].name.clone(),
330-
Err(_) => "unknown".to_string(),
329+
Ok(plan) => format!("import/{}", plan.import_ops[self.source_idx].name),
330+
Err(_) => "import/unknown".to_string(),
331331
}
332332
};
333333

@@ -339,7 +339,8 @@ impl SourceIndexingContext {
339339
// Track that we're starting to process this row
340340
update_stats.processing.start(1);
341341
if let Some(ref op_stats) = operation_in_process_stats {
342-
op_stats.start_processing(&import_op.name, 1);
342+
let import_key = format!("import/{}", import_op.name);
343+
op_stats.start_processing(&import_key, 1);
343344
}
344345

345346
let eval_ctx = SourceRowEvaluationContext {

0 commit comments

Comments
 (0)