Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl SourceUpdateTask {
futs.push(
{
let change_stream_stats = change_stream_stats.clone();
let pool = self.pool.clone();
let status_tx = self.status_tx.clone();
let operation_in_process_stats = self.operation_in_process_stats.clone();
async move {
Expand Down Expand Up @@ -230,7 +229,6 @@ impl SourceUpdateTask {
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
}),
pool.clone(),
),
);
}
Expand Down Expand Up @@ -267,7 +265,7 @@ impl SourceUpdateTask {
async move {
let refresh_interval = import_op.refresh_options.refresh_interval;

task.update_with_pass_with_error_logging(
task.update_one_pass_with_error_logging(
source_indexing_context,
if refresh_interval.is_some() {
"initial interval update"
Expand All @@ -285,7 +283,7 @@ impl SourceUpdateTask {
loop {
interval.tick().await;

task.update_with_pass_with_error_logging(
task.update_one_pass_with_error_logging(
source_indexing_context,
"interval update",
super::source_indexer::UpdateOptions {
Expand Down Expand Up @@ -375,7 +373,7 @@ impl SourceUpdateTask {

// Run the actual update
let update_result = source_indexing_context
.update(&self.pool, &update_stats, update_options)
.update(&update_stats, update_options)
.await
.with_context(|| {
format!(
Expand Down Expand Up @@ -409,7 +407,7 @@ impl SourceUpdateTask {
Ok(())
}

async fn update_with_pass_with_error_logging(
async fn update_one_pass_with_error_logging(
&self,
source_indexing_context: &Arc<SourceIndexingContext>,
update_title: &str,
Expand Down
106 changes: 55 additions & 51 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::{
execution::row_indexer::ContentHashBasedCollapsingBaseline,
prelude::*,
service::error::{SharedError, SharedResult, SharedResultExt},
execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*, utils::batching,
};

use futures::future::Ready;
Expand Down Expand Up @@ -57,13 +55,14 @@ struct SourceIndexingState {
}

pub struct SourceIndexingContext {
pool: PgPool,
flow: Arc<builder::AnalyzedFlow>,
source_idx: usize,
pending_update: Mutex<Option<Shared<BoxFuture<'static, SharedResult<()>>>>>,
update_sem: Semaphore,
state: Mutex<SourceIndexingState>,
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
needs_to_track_rows_to_retry: bool,

update_once_batcher: batching::Batcher<UpdateOnceRunner>,
}

pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
Expand Down Expand Up @@ -253,7 +252,7 @@ impl SourceIndexingContext {
source_idx: usize,
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
pool: &PgPool,
) -> Result<Self> {
) -> Result<Arc<Self>> {
let plan = flow.get_execution_plan().await?;
let import_op = &plan.import_ops[source_idx];
let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new();
Expand Down Expand Up @@ -294,7 +293,8 @@ impl SourceIndexingContext {
);
}
}
Ok(Self {
Ok(Arc::new(Self {
pool: pool.clone(),
flow,
source_idx,
needs_to_track_rows_to_retry: rows_to_retry.is_some(),
Expand All @@ -303,10 +303,9 @@ impl SourceIndexingContext {
scan_generation,
rows_to_retry,
}),
pending_update: Mutex::new(None),
update_sem: Semaphore::new(1),
setup_execution_ctx,
})
update_once_batcher: batching::Batcher::new(UpdateOnceRunner),
}))
}

pub async fn process_source_row<
Expand All @@ -320,7 +319,6 @@ impl SourceIndexingContext {
operation_in_process_stats: Option<Arc<stats::OperationInProcessStats>>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
) {
use ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint;

Expand Down Expand Up @@ -359,7 +357,7 @@ impl SourceIndexingContext {
operation_in_process_stats_cloned
.as_ref()
.map(|s| s.as_ref()),
&pool,
&self.pool,
)?;

let source_data = row_input.data;
Expand Down Expand Up @@ -490,7 +488,7 @@ impl SourceIndexingContext {
&source_key_json,
row_indexer::RowIndexer::process_ordinal_from_time(process_time),
&self.setup_execution_ctx.setup_state.tracking_table,
&pool,
&self.pool,
)
.await?;
}
Expand Down Expand Up @@ -525,48 +523,19 @@ impl SourceIndexingContext {

pub async fn update(
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
update_options: UpdateOptions,
) -> Result<()> {
let pending_update_fut = {
let mut pending_update = self.pending_update.lock().unwrap();
if let Some(pending_update_fut) = &*pending_update {
pending_update_fut.clone()
} else {
let slf = self.clone();
let pool = pool.clone();
let update_stats = update_stats.clone();
let task = tokio::spawn(async move {
{
let _permit = slf.update_sem.acquire().await?;
{
let mut pending_update = slf.pending_update.lock().unwrap();
*pending_update = None;
}
slf.update_once(&pool, &update_stats, &update_options)
.await?;
}
anyhow::Ok(())
});
let pending_update_fut = async move {
task.await
.map_err(SharedError::from)?
.map_err(SharedError::new)
}
.boxed()
.shared();
*pending_update = Some(pending_update_fut.clone());
pending_update_fut
}
let input = UpdateOnceInput {
context: self.clone(),
stats: update_stats.clone(),
options: update_options,
};
pending_update_fut.await.anyhow_result()?;
Ok(())
self.update_once_batcher.run(input).await
}

async fn update_once(
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
update_options: &UpdateOptions,
) -> Result<()> {
Expand All @@ -581,15 +550,14 @@ impl SourceIndexingContext {
&& import_op.executor.provides_ordinal()),
};
let rows_stream = import_op.executor.list(&read_options).await?;
self.update_with_stream(import_op, rows_stream, pool, update_stats, update_options)
self.update_with_stream(import_op, rows_stream, update_stats, update_options)
.await
}

async fn update_with_stream(
self: &Arc<Self>,
import_op: &plan::AnalyzedImportOp,
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRow>>>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
update_options: &UpdateOptions,
) -> Result<()> {
Expand Down Expand Up @@ -635,7 +603,6 @@ impl SourceIndexingContext {
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
));
}
}
Expand Down Expand Up @@ -675,7 +642,6 @@ impl SourceIndexingContext {
None, // operation_in_process_stats
concur_permit,
NO_ACK,
pool.clone(),
));
}
while let Some(result) = join_set.join_next().await {
Expand All @@ -689,3 +655,41 @@ impl SourceIndexingContext {
Ok(())
}
}

struct UpdateOnceInput {
context: Arc<SourceIndexingContext>,
stats: Arc<stats::UpdateStats>,
options: UpdateOptions,
}

struct UpdateOnceRunner;

#[async_trait]
impl batching::Runner for UpdateOnceRunner {
type Input = UpdateOnceInput;
type Output = ();

async fn run(&self, inputs: Vec<UpdateOnceInput>) -> Result<impl ExactSizeIterator<Item = ()>> {
let num_inputs = inputs.len();
let update_options = UpdateOptions {
expect_little_diff: inputs.iter().all(|input| input.options.expect_little_diff),
mode: if inputs
.iter()
.any(|input| input.options.mode == UpdateMode::ReexportTargets)
{
UpdateMode::ReexportTargets
} else {
UpdateMode::Normal
},
};
let input = inputs
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("no input"))?;
input
.context
.update_once(&input.stats, &update_options)
.await?;
Ok(std::iter::repeat(()).take(num_inputs))
}
}
4 changes: 2 additions & 2 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ impl FlowExecutionContext {
) -> Result<&Arc<SourceIndexingContext>> {
self.source_indexing_contexts[source_idx]
.get_or_try_init(|| async move {
anyhow::Ok(Arc::new(
anyhow::Ok(
SourceIndexingContext::load(
flow.clone(),
source_idx,
self.setup_execution_context.clone(),
pool,
)
.await?,
))
)
})
.await
}
Expand Down
Loading