diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index a435b0a7..c5748680 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -145,7 +145,6 @@ impl SourceUpdateTask { futs.push( { let change_stream_stats = change_stream_stats.clone(); - let pool = self.pool.clone(); let status_tx = self.status_tx.clone(); let operation_in_process_stats = self.operation_in_process_stats.clone(); async move { @@ -230,7 +229,6 @@ impl SourceUpdateTask { Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await }), - pool.clone(), ), ); } @@ -267,7 +265,7 @@ impl SourceUpdateTask { async move { let refresh_interval = import_op.refresh_options.refresh_interval; - task.update_with_pass_with_error_logging( + task.update_one_pass_with_error_logging( source_indexing_context, if refresh_interval.is_some() { "initial interval update" @@ -285,7 +283,7 @@ impl SourceUpdateTask { loop { interval.tick().await; - task.update_with_pass_with_error_logging( + task.update_one_pass_with_error_logging( source_indexing_context, "interval update", super::source_indexer::UpdateOptions { @@ -375,7 +373,7 @@ impl SourceUpdateTask { // Run the actual update let update_result = source_indexing_context - .update(&self.pool, &update_stats, update_options) + .update(&update_stats, update_options) .await .with_context(|| { format!( @@ -409,7 +407,7 @@ impl SourceUpdateTask { Ok(()) } - async fn update_with_pass_with_error_logging( + async fn update_one_pass_with_error_logging( &self, source_indexing_context: &Arc, update_title: &str, diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index de80fcc6..df1f9720 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,7 +1,5 @@ use crate::{ - execution::row_indexer::ContentHashBasedCollapsingBaseline, - prelude::*, - service::error::{SharedError, SharedResult, SharedResultExt}, + execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*, utils::batching, }; use futures::future::Ready; @@ -57,13 +55,14 @@ struct SourceIndexingState { } pub struct SourceIndexingContext { + pool: PgPool, flow: Arc, source_idx: usize, - pending_update: Mutex>>>>, - update_sem: Semaphore, state: Mutex, setup_execution_ctx: Arc, needs_to_track_rows_to_retry: bool, + + update_once_batcher: batching::Batcher, } pub const NO_ACK: Option Ready>> = None; @@ -253,7 +252,7 @@ impl SourceIndexingContext { source_idx: usize, setup_execution_ctx: Arc, pool: &PgPool, - ) -> Result { + ) -> Result> { let plan = flow.get_execution_plan().await?; let import_op = &plan.import_ops[source_idx]; let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new(); @@ -294,7 +293,8 @@ impl SourceIndexingContext { ); } } - Ok(Self { + Ok(Arc::new(Self { + pool: pool.clone(), flow, source_idx, needs_to_track_rows_to_retry: rows_to_retry.is_some(), @@ -303,10 +303,9 @@ impl SourceIndexingContext { scan_generation, rows_to_retry, }), - pending_update: Mutex::new(None), - update_sem: Semaphore::new(1), setup_execution_ctx, - }) + update_once_batcher: batching::Batcher::new(UpdateOnceRunner), + })) } pub async fn process_source_row< @@ -320,7 +319,6 @@ impl SourceIndexingContext { operation_in_process_stats: Option>, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, - pool: PgPool, ) { use ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint; @@ -359,7 +357,7 @@ impl SourceIndexingContext { operation_in_process_stats_cloned .as_ref() .map(|s| s.as_ref()), - &pool, + &self.pool, )?; let source_data = row_input.data; @@ -490,7 +488,7 @@ impl SourceIndexingContext { &source_key_json, row_indexer::RowIndexer::process_ordinal_from_time(process_time), &self.setup_execution_ctx.setup_state.tracking_table, - &pool, + &self.pool, ) .await?; } @@ -525,48 +523,19 @@ impl SourceIndexingContext { pub async fn update( self: &Arc, - pool: &PgPool, update_stats: &Arc, update_options: UpdateOptions, ) -> Result<()> { - let pending_update_fut = { - let mut pending_update = self.pending_update.lock().unwrap(); - if let Some(pending_update_fut) = &*pending_update { - pending_update_fut.clone() - } else { - let slf = self.clone(); - let pool = pool.clone(); - let update_stats = update_stats.clone(); - let task = tokio::spawn(async move { - { - let _permit = slf.update_sem.acquire().await?; - { - let mut pending_update = slf.pending_update.lock().unwrap(); - *pending_update = None; - } - slf.update_once(&pool, &update_stats, &update_options) - .await?; - } - anyhow::Ok(()) - }); - let pending_update_fut = async move { - task.await - .map_err(SharedError::from)? - .map_err(SharedError::new) - } - .boxed() - .shared(); - *pending_update = Some(pending_update_fut.clone()); - pending_update_fut - } + let input = UpdateOnceInput { + context: self.clone(), + stats: update_stats.clone(), + options: update_options, }; - pending_update_fut.await.anyhow_result()?; - Ok(()) + self.update_once_batcher.run(input).await } async fn update_once( self: &Arc, - pool: &PgPool, update_stats: &Arc, update_options: &UpdateOptions, ) -> Result<()> { @@ -581,7 +550,7 @@ impl SourceIndexingContext { && import_op.executor.provides_ordinal()), }; let rows_stream = import_op.executor.list(&read_options).await?; - self.update_with_stream(import_op, rows_stream, pool, update_stats, update_options) + self.update_with_stream(import_op, rows_stream, update_stats, update_options) .await } @@ -589,7 +558,6 @@ impl SourceIndexingContext { self: &Arc, import_op: &plan::AnalyzedImportOp, mut rows_stream: BoxStream<'_, Result>>, - pool: &PgPool, update_stats: &Arc, update_options: &UpdateOptions, ) -> Result<()> { @@ -635,7 +603,6 @@ impl SourceIndexingContext { None, // operation_in_process_stats concur_permit, NO_ACK, - pool.clone(), )); } } @@ -675,7 +642,6 @@ impl SourceIndexingContext { None, // operation_in_process_stats concur_permit, NO_ACK, - pool.clone(), )); } while let Some(result) = join_set.join_next().await { @@ -689,3 +655,41 @@ impl SourceIndexingContext { Ok(()) } } + +struct UpdateOnceInput { + context: Arc, + stats: Arc, + options: UpdateOptions, +} + +struct UpdateOnceRunner; + +#[async_trait] +impl batching::Runner for UpdateOnceRunner { + type Input = UpdateOnceInput; + type Output = (); + + async fn run(&self, inputs: Vec) -> Result> { + let num_inputs = inputs.len(); + let update_options = UpdateOptions { + expect_little_diff: inputs.iter().all(|input| input.options.expect_little_diff), + mode: if inputs + .iter() + .any(|input| input.options.mode == UpdateMode::ReexportTargets) + { + UpdateMode::ReexportTargets + } else { + UpdateMode::Normal + }, + }; + let input = inputs + .into_iter() + .next() + .ok_or_else(|| anyhow::anyhow!("no input"))?; + input + .context + .update_once(&input.stats, &update_options) + .await?; + Ok(std::iter::repeat(()).take(num_inputs)) + } +} diff --git a/src/lib_context.rs b/src/lib_context.rs index 6a076aae..3b307d94 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -84,7 +84,7 @@ impl FlowExecutionContext { ) -> Result<&Arc> { self.source_indexing_contexts[source_idx] .get_or_try_init(|| async move { - anyhow::Ok(Arc::new( + anyhow::Ok( SourceIndexingContext::load( flow.clone(), source_idx, @@ -92,7 +92,7 @@ impl FlowExecutionContext { pool, ) .await?, - )) + ) }) .await }