Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
join_set: Mutex<Option<JoinSet<Result<()>>>>,
stats_per_task: Vec<Arc<stats::UpdateStats>>,
/// Global tracking of in-process rows per operation
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
recv_state: tokio::sync::Mutex<UpdateReceiveState>,
num_remaining_tasks_rx: watch::Receiver<usize>,

Expand Down Expand Up @@ -83,6 +85,7 @@ struct SourceUpdateTask {
plan: Arc<plan::ExecutionPlan>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
source_update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Arc<stats::OperationInProcessStats>,
pool: PgPool,
options: FlowLiveUpdaterOptions,

Expand Down Expand Up @@ -137,6 +140,7 @@ impl SourceUpdateTask {
let change_stream_stats = change_stream_stats.clone();
let pool = self.pool.clone();
let status_tx = self.status_tx.clone();
let operation_in_process_stats = self.operation_in_process_stats.clone();
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
Expand Down Expand Up @@ -203,6 +207,7 @@ impl SourceUpdateTask {
},
super::source_indexer::UpdateMode::Normal,
update_stats.clone(),
Some(operation_in_process_stats.clone()),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
Expand Down Expand Up @@ -328,6 +333,7 @@ impl FlowLiveUpdater {

let mut join_set = JoinSet::new();
let mut stats_per_task = Vec::new();
let operation_in_process_stats = Arc::new(stats::OperationInProcessStats::default());

for source_idx in 0..plan.import_ops.len() {
let source_update_stats = Arc::new(stats::UpdateStats::default());
Expand All @@ -337,6 +343,7 @@ impl FlowLiveUpdater {
plan: plan.clone(),
execution_ctx: execution_ctx.clone(),
source_update_stats: source_update_stats.clone(),
operation_in_process_stats: operation_in_process_stats.clone(),
pool: pool.clone(),
options: options.clone(),
status_tx: status_tx.clone(),
Expand All @@ -345,10 +352,12 @@ impl FlowLiveUpdater {
join_set.spawn(source_update_task.run());
stats_per_task.push(source_update_stats);
}

Ok(Self {
flow_ctx,
join_set: Mutex::new(Some(join_set)),
stats_per_task,
operation_in_process_stats,
recv_state: tokio::sync::Mutex::new(UpdateReceiveState {
status_rx,
last_num_source_updates: vec![0; plan.import_ops.len()],
Expand Down Expand Up @@ -406,6 +415,23 @@ impl FlowLiveUpdater {
}
}

/// Get the total number of rows currently being processed across all operations.
pub fn get_total_in_process_count(&self) -> i64 {
self.operation_in_process_stats.get_total_in_process_count()
}

/// Get the number of rows currently being processed for a specific operation.
pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 {
self.operation_in_process_stats
.get_operation_in_process_count(operation_name)
}

/// Get a snapshot of all operation in-process counts.
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
self.operation_in_process_stats
.get_all_operations_in_process()
}

pub async fn next_status_updates(&self) -> Result<FlowLiveUpdaterUpdates> {
let mut recv_state = self.recv_state.lock().await;
let recv_state = &mut *recv_state;
Expand Down
28 changes: 27 additions & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,31 @@ impl SourceIndexingContext {
row_input: ProcessSourceRowInput,
mode: UpdateMode,
update_stats: Arc<stats::UpdateStats>,
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
) {
// Store operation name for tracking cleanup
let operation_name = {
let plan_result = self.flow.get_execution_plan().await;
match plan_result {
Ok(plan) => plan.import_ops[self.source_idx].name.clone(),
Err(_) => "unknown".to_string(),
}
};

let process = async {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let schema = &self.flow.data_schema;

// Track that we're starting to process this row
update_stats.start_processing(1);
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.start_processing(&import_op.name, 1);
}

let eval_ctx = SourceRowEvaluationContext {
plan: &plan,
import_op,
Expand Down Expand Up @@ -474,7 +490,15 @@ impl SourceIndexingContext {
result
};
let process_and_ack = async {
process.await?;
let result = process.await;

// Track that we're finishing processing this row (regardless of success/failure)
update_stats.finish_processing(1);
if let Some(ref op_stats) = operation_in_process_stats {
op_stats.finish_processing(&operation_name, 1);
}

result?;
if let Some(ack_fn) = ack_fn {
ack_fn().await?;
}
Expand Down Expand Up @@ -603,6 +627,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down Expand Up @@ -642,6 +667,7 @@ impl SourceIndexingContext {
},
update_options.mode,
update_stats.clone(),
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
Expand Down
Loading