Skip to content

Commit 4d6ea70

Browse files
authored
fix: use the batch runner to combine source indexer calls (#1230)
1 parent 05e9116 commit 4d6ea70

File tree

3 files changed

+61
-59
lines changed

3 files changed

+61
-59
lines changed

src/execution/live_updater.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ impl SourceUpdateTask {
145145
futs.push(
146146
{
147147
let change_stream_stats = change_stream_stats.clone();
148-
let pool = self.pool.clone();
149148
let status_tx = self.status_tx.clone();
150149
let operation_in_process_stats = self.operation_in_process_stats.clone();
151150
async move {
@@ -230,7 +229,6 @@ impl SourceUpdateTask {
230229
Some(move || async move {
231230
SharedAckFn::ack(&shared_ack_fn).await
232231
}),
233-
pool.clone(),
234232
),
235233
);
236234
}
@@ -267,7 +265,7 @@ impl SourceUpdateTask {
267265
async move {
268266
let refresh_interval = import_op.refresh_options.refresh_interval;
269267

270-
task.update_with_pass_with_error_logging(
268+
task.update_one_pass_with_error_logging(
271269
source_indexing_context,
272270
if refresh_interval.is_some() {
273271
"initial interval update"
@@ -285,7 +283,7 @@ impl SourceUpdateTask {
285283
loop {
286284
interval.tick().await;
287285

288-
task.update_with_pass_with_error_logging(
286+
task.update_one_pass_with_error_logging(
289287
source_indexing_context,
290288
"interval update",
291289
super::source_indexer::UpdateOptions {
@@ -375,7 +373,7 @@ impl SourceUpdateTask {
375373

376374
// Run the actual update
377375
let update_result = source_indexing_context
378-
.update(&self.pool, &update_stats, update_options)
376+
.update(&update_stats, update_options)
379377
.await
380378
.with_context(|| {
381379
format!(
@@ -409,7 +407,7 @@ impl SourceUpdateTask {
409407
Ok(())
410408
}
411409

412-
async fn update_with_pass_with_error_logging(
410+
async fn update_one_pass_with_error_logging(
413411
&self,
414412
source_indexing_context: &Arc<SourceIndexingContext>,
415413
update_title: &str,

src/execution/source_indexer.rs

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::{
2-
execution::row_indexer::ContentHashBasedCollapsingBaseline,
3-
prelude::*,
4-
service::error::{SharedError, SharedResult, SharedResultExt},
2+
execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*, utils::batching,
53
};
64

75
use futures::future::Ready;
@@ -57,13 +55,14 @@ struct SourceIndexingState {
5755
}
5856

5957
pub struct SourceIndexingContext {
58+
pool: PgPool,
6059
flow: Arc<builder::AnalyzedFlow>,
6160
source_idx: usize,
62-
pending_update: Mutex<Option<Shared<BoxFuture<'static, SharedResult<()>>>>>,
63-
update_sem: Semaphore,
6461
state: Mutex<SourceIndexingState>,
6562
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
6663
needs_to_track_rows_to_retry: bool,
64+
65+
update_once_batcher: batching::Batcher<UpdateOnceRunner>,
6766
}
6867

6968
pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
@@ -253,7 +252,7 @@ impl SourceIndexingContext {
253252
source_idx: usize,
254253
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
255254
pool: &PgPool,
256-
) -> Result<Self> {
255+
) -> Result<Arc<Self>> {
257256
let plan = flow.get_execution_plan().await?;
258257
let import_op = &plan.import_ops[source_idx];
259258
let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new();
@@ -294,7 +293,8 @@ impl SourceIndexingContext {
294293
);
295294
}
296295
}
297-
Ok(Self {
296+
Ok(Arc::new(Self {
297+
pool: pool.clone(),
298298
flow,
299299
source_idx,
300300
needs_to_track_rows_to_retry: rows_to_retry.is_some(),
@@ -303,10 +303,9 @@ impl SourceIndexingContext {
303303
scan_generation,
304304
rows_to_retry,
305305
}),
306-
pending_update: Mutex::new(None),
307-
update_sem: Semaphore::new(1),
308306
setup_execution_ctx,
309-
})
307+
update_once_batcher: batching::Batcher::new(UpdateOnceRunner),
308+
}))
310309
}
311310

312311
pub async fn process_source_row<
@@ -320,7 +319,6 @@ impl SourceIndexingContext {
320319
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
321320
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
322321
ack_fn: Option<AckFn>,
323-
pool: PgPool,
324322
) {
325323
use ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint;
326324

@@ -359,7 +357,7 @@ impl SourceIndexingContext {
359357
operation_in_process_stats_cloned
360358
.as_ref()
361359
.map(|s| s.as_ref()),
362-
&pool,
360+
&self.pool,
363361
)?;
364362

365363
let source_data = row_input.data;
@@ -490,7 +488,7 @@ impl SourceIndexingContext {
490488
&source_key_json,
491489
row_indexer::RowIndexer::process_ordinal_from_time(process_time),
492490
&self.setup_execution_ctx.setup_state.tracking_table,
493-
&pool,
491+
&self.pool,
494492
)
495493
.await?;
496494
}
@@ -525,48 +523,19 @@ impl SourceIndexingContext {
525523

526524
pub async fn update(
527525
self: &Arc<Self>,
528-
pool: &PgPool,
529526
update_stats: &Arc<stats::UpdateStats>,
530527
update_options: UpdateOptions,
531528
) -> Result<()> {
532-
let pending_update_fut = {
533-
let mut pending_update = self.pending_update.lock().unwrap();
534-
if let Some(pending_update_fut) = &*pending_update {
535-
pending_update_fut.clone()
536-
} else {
537-
let slf = self.clone();
538-
let pool = pool.clone();
539-
let update_stats = update_stats.clone();
540-
let task = tokio::spawn(async move {
541-
{
542-
let _permit = slf.update_sem.acquire().await?;
543-
{
544-
let mut pending_update = slf.pending_update.lock().unwrap();
545-
*pending_update = None;
546-
}
547-
slf.update_once(&pool, &update_stats, &update_options)
548-
.await?;
549-
}
550-
anyhow::Ok(())
551-
});
552-
let pending_update_fut = async move {
553-
task.await
554-
.map_err(SharedError::from)?
555-
.map_err(SharedError::new)
556-
}
557-
.boxed()
558-
.shared();
559-
*pending_update = Some(pending_update_fut.clone());
560-
pending_update_fut
561-
}
529+
let input = UpdateOnceInput {
530+
context: self.clone(),
531+
stats: update_stats.clone(),
532+
options: update_options,
562533
};
563-
pending_update_fut.await.anyhow_result()?;
564-
Ok(())
534+
self.update_once_batcher.run(input).await
565535
}
566536

567537
async fn update_once(
568538
self: &Arc<Self>,
569-
pool: &PgPool,
570539
update_stats: &Arc<stats::UpdateStats>,
571540
update_options: &UpdateOptions,
572541
) -> Result<()> {
@@ -581,15 +550,14 @@ impl SourceIndexingContext {
581550
&& import_op.executor.provides_ordinal()),
582551
};
583552
let rows_stream = import_op.executor.list(&read_options).await?;
584-
self.update_with_stream(import_op, rows_stream, pool, update_stats, update_options)
553+
self.update_with_stream(import_op, rows_stream, update_stats, update_options)
585554
.await
586555
}
587556

588557
async fn update_with_stream(
589558
self: &Arc<Self>,
590559
import_op: &plan::AnalyzedImportOp,
591560
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRow>>>,
592-
pool: &PgPool,
593561
update_stats: &Arc<stats::UpdateStats>,
594562
update_options: &UpdateOptions,
595563
) -> Result<()> {
@@ -635,7 +603,6 @@ impl SourceIndexingContext {
635603
None, // operation_in_process_stats
636604
concur_permit,
637605
NO_ACK,
638-
pool.clone(),
639606
));
640607
}
641608
}
@@ -675,7 +642,6 @@ impl SourceIndexingContext {
675642
None, // operation_in_process_stats
676643
concur_permit,
677644
NO_ACK,
678-
pool.clone(),
679645
));
680646
}
681647
while let Some(result) = join_set.join_next().await {
@@ -689,3 +655,41 @@ impl SourceIndexingContext {
689655
Ok(())
690656
}
691657
}
658+
659+
struct UpdateOnceInput {
660+
context: Arc<SourceIndexingContext>,
661+
stats: Arc<stats::UpdateStats>,
662+
options: UpdateOptions,
663+
}
664+
665+
struct UpdateOnceRunner;
666+
667+
#[async_trait]
668+
impl batching::Runner for UpdateOnceRunner {
669+
type Input = UpdateOnceInput;
670+
type Output = ();
671+
672+
async fn run(&self, inputs: Vec<UpdateOnceInput>) -> Result<impl ExactSizeIterator<Item = ()>> {
673+
let num_inputs = inputs.len();
674+
let update_options = UpdateOptions {
675+
expect_little_diff: inputs.iter().all(|input| input.options.expect_little_diff),
676+
mode: if inputs
677+
.iter()
678+
.any(|input| input.options.mode == UpdateMode::ReexportTargets)
679+
{
680+
UpdateMode::ReexportTargets
681+
} else {
682+
UpdateMode::Normal
683+
},
684+
};
685+
let input = inputs
686+
.into_iter()
687+
.next()
688+
.ok_or_else(|| anyhow::anyhow!("no input"))?;
689+
input
690+
.context
691+
.update_once(&input.stats, &update_options)
692+
.await?;
693+
Ok(std::iter::repeat(()).take(num_inputs))
694+
}
695+
}

src/lib_context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ impl FlowExecutionContext {
8484
) -> Result<&Arc<SourceIndexingContext>> {
8585
self.source_indexing_contexts[source_idx]
8686
.get_or_try_init(|| async move {
87-
anyhow::Ok(Arc::new(
87+
anyhow::Ok(
8888
SourceIndexingContext::load(
8989
flow.clone(),
9090
source_idx,
9191
self.setup_execution_context.clone(),
9292
pool,
9393
)
9494
.await?,
95-
))
95+
)
9696
})
9797
.await
9898
}

0 commit comments

Comments
 (0)