Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn evaluate_child_op_scope(
child_scope_entry: ScopeEntry<'_>,
concurrency_controller: &concur_control::ConcurrencyController,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
) -> Result<()> {
let _permit = concurrency_controller
.acquire(Some(|| {
Expand All @@ -333,32 +334,44 @@ async fn evaluate_child_op_scope(
.sum()
}))
.await?;
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
evaluate_op_scope(
op_scope,
scoped_entries.prepend(&child_scope_entry),
memory,
operation_in_process_stats,
)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
}

async fn evaluate_op_scope(
op_scope: &AnalyzedOpScope,
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
) -> Result<()> {
let head_scope = *scoped_entries.head().unwrap();
for reactive_op in op_scope.reactive_ops.iter() {
match reactive_op {
AnalyzedReactiveOp::Transform(op) => {
// Track transform operation start
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.start_processing(&op.name, 1);
}

let mut input_values = Vec::with_capacity(op.inputs.len());
input_values
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
if op.function_exec_info.enable_cache {

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
|| {
Ok(op
Expand All @@ -382,7 +395,14 @@ async fn evaluate_op_scope(
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
}
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));

// Track transform operation completion
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&op.name, 1);
}

result?
}

AnalyzedReactiveOp::ForEach(op) => {
Expand All @@ -408,6 +428,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -425,6 +446,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -443,6 +465,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -509,6 +532,7 @@ pub async fn evaluate_source_entry(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
source_value: value::FieldValues,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&crate::execution::stats::OperationInProcessStats>,
) -> Result<EvaluateSourceEntryOutput> {
let _permit = src_eval_ctx
.import_op
Expand Down Expand Up @@ -556,6 +580,7 @@ pub async fn evaluate_source_entry(
&src_eval_ctx.plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
memory,
operation_in_process_stats,
)
.await?;
let collected_values = root_scope_entry
Expand Down Expand Up @@ -604,6 +629,7 @@ pub async fn evaluate_transient_flow(
&flow.execution_plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
&eval_memory,
None, // No operation stats for transient flows
)
.await?;
let output_value = assemble_value(
Expand Down
26 changes: 26 additions & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
join_set: Mutex<Option<JoinSet<Result<()>>>>,
stats_per_task: Vec<Arc<stats::UpdateStats>>,
/// Global tracking of in-process rows per operation
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
num_remaining_tasks_rx: watch::Receiver<usize>,

Expand Down Expand Up @@ -83,6 +85,7 @@ struct SourceUpdateTask {
plan: Arc<plan::ExecutionPlan>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
source_update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
pool: PgPool,
options: FlowLiveUpdaterOptions,

Expand Down Expand Up @@ -137,6 +140,7 @@ impl SourceUpdateTask {
let change_stream_stats = change_stream_stats.clone();
let pool = self.pool.clone();
let status_tx = self.status_tx.clone();
let operation_in_process_stats = self.operation_in_process_stats.clone();
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
Expand Down Expand Up @@ -203,6 +207,7 @@ impl SourceUpdateTask {
},
super::source_indexer::UpdateMode::Normal,
update_stats.clone(),
Some(operation_in_process_stats.clone()),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
Expand Down Expand Up @@ -328,6 +333,7 @@ impl FlowLiveUpdater {

let mut join_set = JoinSet::new();
let mut stats_per_task = Vec::new();
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());

for source_idx in 0..plan.import_ops.len() {
let source_update_stats = Arc::new(stats::UpdateStats::default());
Expand All @@ -337,6 +343,7 @@ impl FlowLiveUpdater {
plan: plan.clone(),
execution_ctx: execution_ctx.clone(),
source_update_stats: source_update_stats.clone(),
operation_in_process_stats: operation_in_process_stats.clone(),
pool: pool.clone(),
options: options.clone(),
status_tx: status_tx.clone(),
Expand All @@ -345,10 +352,12 @@ impl FlowLiveUpdater {
join_set.spawn(source_update_task.run());
stats_per_task.push(source_update_stats);
}

Ok(Self {
flow_ctx,
join_set: Mutex::new(Some(join_set)),
stats_per_task,
operation_in_process_stats,
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
status_rx,
last_num_source_updates: vec![0; plan.import_ops.len()],
Expand Down Expand Up @@ -406,6 +415,23 @@ impl FlowLiveUpdater {
}
}

/// Get the total number of rows currently being processed across all operations.
pub fn get_total_in_process_count(&self) -> i64 {
self.operation_in_process_stats.get_total_in_process_count()
}

/// Get the number of rows currently being processed for a specific operation.
pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 {
self.operation_in_process_stats
.get_operation_in_process_count(operation_name)
}

/// Get a snapshot of all operation in-process counts.
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
self.operation_in_process_stats
.get_all_operations_in_process()
}

pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
let mut recv_state = self.recv_state.lock().await;
let recv_state = &mut *recv_state;
Expand Down
48 changes: 41 additions & 7 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct RowIndexer<'a> {
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
mode: super::source_indexer::UpdateMode,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,

source_id: i32,
Expand All @@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> {
mode: super::source_indexer::UpdateMode,
process_time: chrono::DateTime<chrono::Utc>,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,
) -> Result<Self> {
Ok(Self {
Expand All @@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> {
setup_execution_ctx,
mode,
update_stats,
operation_in_process_stats,
pool,
})
}
Expand Down Expand Up @@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> {
},
);

let output =
evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory)
.await?;
let output = evaluate_source_entry(
self.src_eval_ctx,
source_value,
&evaluation_memory,
self.operation_in_process_stats,
)
.await?;
let mut stored_info = evaluation_memory.into_stored()?;
if tracking_setup_state.has_fast_fingerprint_column {
(Some(output), stored_info, content_version_fp)
Expand Down Expand Up @@ -368,9 +375,36 @@ impl<'a> RowIndexer<'a> {
})
.collect();
(!mutations_w_ctx.is_empty()).then(|| {
export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
// Track export operation start
if let Some(ref op_stats) = self.operation_in_process_stats {
for export_op_idx in &export_op_group.op_idx {
let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx];
op_stats.start_processing(&export_op.name, 1);
}
}

let export_op_names: Vec<String> = export_op_group
.op_idx
.iter()
.map(|idx| self.src_eval_ctx.plan.export_ops[*idx].name.clone())
.collect();
let operation_in_process_stats = self.operation_in_process_stats;

async move {
let result = export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
.await;

// Track export operation completion
if let Some(ref op_stats) = operation_in_process_stats {
for export_op_name in &export_op_names {
op_stats.finish_processing(export_op_name, 1);
}
}

result
}
})
});

Expand Down Expand Up @@ -875,7 +909,7 @@ pub async fn evaluate_source_entry_with_memory(
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
let output = match source_value {
interface::SourceValue::Existence(source_value) => {
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?)
}
interface::SourceValue::NonExistence => None,
};
Expand Down
29 changes: 28 additions & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,31 @@ impl SourceIndexingContext {
row_input: ProcessSourceRowInput,
mode: UpdateMode,
update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
) {
// Store operation name for tracking cleanup
let operation_name = {
let plan_result = self.flow.get_execution_plan().await;
match plan_result {
Ok(plan) => plan.import_ops[self.source_idx].name.clone(),
Err(_) => "unknown".to_string(),
}
};

let process = async {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let schema = &self.flow.data_schema;

// Track that we're starting to process this row
update_stats.processing.start(1);
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.start_processing(&import_op.name, 1);
}

let eval_ctx = SourceRowEvaluationContext {
plan: &plan,
import_op,
Expand All @@ -340,6 +356,7 @@ impl SourceIndexingContext {
mode,
process_time,
&update_stats,
operation_in_process_stats.as_ref().map(|s| s.as_ref()),
&pool,
)?;

Expand Down Expand Up @@ -474,7 +491,15 @@ impl SourceIndexingContext {
result
};
let process_and_ack = async {
process.await?;
let result = process.await;

// Track that we're finishing processing this row (regardless of success/failure)
update_stats.processing.end(1);
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&operation_name, 1);
}

result?;
if let Some(ack_fn) = ack_fn {
ack_fn().await?;
}
Expand Down Expand Up @@ -603,6 +628,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down Expand Up @@ -642,6 +668,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down
Loading