Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ pub async fn analyze_flow(
let target_factory = get_target_factory(&target_kind)?;
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
target_factory,
target_kind: target_kind.clone(),
op_idx: op_ids.export_op_ids,
};
export_ops_futs.extend(
Expand Down
1 change: 1 addition & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct AnalyzedExportOp {

pub struct AnalyzedExportTargetOpGroup {
pub target_factory: Arc<dyn TargetFactory + Send + Sync>,
pub target_kind: String,
pub op_idx: Vec<usize>,
}

Expand Down
54 changes: 41 additions & 13 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn evaluate_child_op_scope(
child_scope_entry: ScopeEntry<'_>,
concurrency_controller: &concur_control::ConcurrencyController,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<()> {
let _permit = concurrency_controller
.acquire(Some(|| {
Expand All @@ -333,32 +334,45 @@ async fn evaluate_child_op_scope(
.sum()
}))
.await?;
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), memory)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
evaluate_op_scope(
op_scope,
scoped_entries.prepend(&child_scope_entry),
memory,
operation_in_process_stats,
)
.await
.with_context(|| {
format!(
"Evaluating in scope with key {}",
match child_scope_entry.key.key() {
Some(k) => k.to_string(),
None => "()".to_string(),
}
)
})
}

async fn evaluate_op_scope(
op_scope: &AnalyzedOpScope,
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<()> {
let head_scope = *scoped_entries.head().unwrap();
for reactive_op in op_scope.reactive_ops.iter() {
match reactive_op {
AnalyzedReactiveOp::Transform(op) => {
// Track transform operation start
if let Some(ref op_stats) = operation_in_process_stats {
let transform_key = format!("transform/{}", op.name);
op_stats.start_processing(&transform_key, 1);
}

let mut input_values = Vec::with_capacity(op.inputs.len());
input_values
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
if op.function_exec_info.enable_cache {

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
|| {
Ok(op
Expand All @@ -382,7 +396,15 @@ async fn evaluate_op_scope(
.await
.and_then(|v| head_scope.define_field(&op.output, &v))
}
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));

// Track transform operation completion
if let Some(ref op_stats) = operation_in_process_stats {
let transform_key = format!("transform/{}", op.name);
op_stats.finish_processing(&transform_key, 1);
}

result?
}

AnalyzedReactiveOp::ForEach(op) => {
Expand All @@ -408,6 +430,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -425,6 +448,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand All @@ -443,6 +467,7 @@ async fn evaluate_op_scope(
),
&op.concurrency_controller,
memory,
operation_in_process_stats,
)
})
.collect::<Vec<_>>(),
Expand Down Expand Up @@ -509,6 +534,7 @@ pub async fn evaluate_source_entry(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
source_value: value::FieldValues,
memory: &EvaluationMemory,
operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
) -> Result<EvaluateSourceEntryOutput> {
let _permit = src_eval_ctx
.import_op
Expand Down Expand Up @@ -556,6 +582,7 @@ pub async fn evaluate_source_entry(
&src_eval_ctx.plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
memory,
operation_in_process_stats,
)
.await?;
let collected_values = root_scope_entry
Expand Down Expand Up @@ -604,6 +631,7 @@ pub async fn evaluate_transient_flow(
&flow.execution_plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
&eval_memory,
None, // No operation stats for transient flows
)
.await?;
let output_value = assemble_value(
Expand Down
9 changes: 9 additions & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
join_set: Mutex<Option<JoinSet<Result<()>>>>,
stats_per_task: Vec<Arc<stats::UpdateStats>>,
/// Global tracking of in-process rows per operation
pub operation_in_process_stats: Arc<stats::OperationInProcessStats>,
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
num_remaining_tasks_rx: watch::Receiver<usize>,

Expand Down Expand Up @@ -83,6 +85,7 @@ struct SourceUpdateTask {
plan: Arc<plan::ExecutionPlan>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
source_update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
pool: PgPool,
options: FlowLiveUpdaterOptions,

Expand Down Expand Up @@ -137,6 +140,7 @@ impl SourceUpdateTask {
let change_stream_stats = change_stream_stats.clone();
let pool = self.pool.clone();
let status_tx = self.status_tx.clone();
let operation_in_process_stats = self.operation_in_process_stats.clone();
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
Expand Down Expand Up @@ -203,6 +207,7 @@ impl SourceUpdateTask {
},
super::source_indexer::UpdateMode::Normal,
update_stats.clone(),
Some(operation_in_process_stats.clone()),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
Expand Down Expand Up @@ -328,6 +333,7 @@ impl FlowLiveUpdater {

let mut join_set = JoinSet::new();
let mut stats_per_task = Vec::new();
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());

for source_idx in 0..plan.import_ops.len() {
let source_update_stats = Arc::new(stats::UpdateStats::default());
Expand All @@ -337,6 +343,7 @@ impl FlowLiveUpdater {
plan: plan.clone(),
execution_ctx: execution_ctx.clone(),
source_update_stats: source_update_stats.clone(),
operation_in_process_stats: operation_in_process_stats.clone(),
pool: pool.clone(),
options: options.clone(),
status_tx: status_tx.clone(),
Expand All @@ -345,10 +352,12 @@ impl FlowLiveUpdater {
join_set.spawn(source_update_task.run());
stats_per_task.push(source_update_stats);
}

Ok(Self {
flow_ctx,
join_set: Mutex::new(Some(join_set)),
stats_per_task,
operation_in_process_stats,
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
status_rx,
last_num_source_updates: vec![0; plan.import_ops.len()],
Expand Down
40 changes: 33 additions & 7 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct RowIndexer<'a> {
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
mode: super::source_indexer::UpdateMode,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,

source_id: i32,
Expand All @@ -201,6 +202,7 @@ impl<'a> RowIndexer<'a> {
mode: super::source_indexer::UpdateMode,
process_time: chrono::DateTime<chrono::Utc>,
update_stats: &'a stats::UpdateStats,
operation_in_process_stats: Option<&'a stats::OperationInProcessStats>,
pool: &'a PgPool,
) -> Result<Self> {
Ok(Self {
Expand All @@ -212,6 +214,7 @@ impl<'a> RowIndexer<'a> {
setup_execution_ctx,
mode,
update_stats,
operation_in_process_stats,
pool,
})
}
Expand Down Expand Up @@ -311,9 +314,13 @@ impl<'a> RowIndexer<'a> {
},
);

let output =
evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory)
.await?;
let output = evaluate_source_entry(
self.src_eval_ctx,
source_value,
&evaluation_memory,
self.operation_in_process_stats,
)
.await?;
let mut stored_info = evaluation_memory.into_stored()?;
if tracking_setup_state.has_fast_fingerprint_column {
(Some(output), stored_info, content_version_fp)
Expand Down Expand Up @@ -368,9 +375,28 @@ impl<'a> RowIndexer<'a> {
})
.collect();
(!mutations_w_ctx.is_empty()).then(|| {
export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
// Track export operation start
if let Some(ref op_stats) = self.operation_in_process_stats {
let export_key = format!("export/{}", export_op_group.target_kind);
op_stats.start_processing(&export_key, 1);
}

let export_key = format!("export/{}", export_op_group.target_kind);
let operation_in_process_stats = self.operation_in_process_stats;

async move {
let result = export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
.await;

// Track export operation completion
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&export_key, 1);
}

result
}
})
});

Expand Down Expand Up @@ -875,7 +901,7 @@ pub async fn evaluate_source_entry_with_memory(
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
let output = match source_value {
interface::SourceValue::Existence(source_value) => {
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory, None).await?)
}
interface::SourceValue::NonExistence => None,
};
Expand Down
30 changes: 29 additions & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,32 @@ impl SourceIndexingContext {
row_input: ProcessSourceRowInput,
mode: UpdateMode,
update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
) {
// Store operation name for tracking cleanup
let operation_name = {
let plan_result = self.flow.get_execution_plan().await;
match plan_result {
Ok(plan) => format!("import/{}", plan.import_ops[self.source_idx].name),
Err(_) => "import/unknown".to_string(),
}
};

let process = async {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let schema = &self.flow.data_schema;

// Track that we're starting to process this row
update_stats.processing.start(1);
if let Some(ref op_stats) = operation_in_process_stats {
let import_key = format!("import/{}", import_op.name);
op_stats.start_processing(&import_key, 1);
}

let eval_ctx = SourceRowEvaluationContext {
plan: &plan,
import_op,
Expand All @@ -340,6 +357,7 @@ impl SourceIndexingContext {
mode,
process_time,
&update_stats,
operation_in_process_stats.as_ref().map(|s| s.as_ref()),
&pool,
)?;

Expand Down Expand Up @@ -474,7 +492,15 @@ impl SourceIndexingContext {
result
};
let process_and_ack = async {
process.await?;
let result = process.await;

// Track that we're finishing processing this row (regardless of success/failure)
update_stats.processing.end(1);
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&operation_name, 1);
}

result?;
if let Some(ack_fn) = ack_fn {
ack_fn().await?;
}
Expand Down Expand Up @@ -603,6 +629,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down Expand Up @@ -642,6 +669,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down
Loading