diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index bea3e737a..92f23da92 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -20,9 +20,14 @@ use super::{ }; use crate::ops::interface; -struct SourceRowIndexingState { + +#[derive(Default)] +struct SourceRowVersionState { source_version: SourceVersion, content_version_fp: Option>, +} +struct SourceRowIndexingState { + version_state: SourceRowVersionState, processing_sem: Arc, touched_generation: usize, } @@ -30,8 +35,10 @@ struct SourceRowIndexingState { impl Default for SourceRowIndexingState { fn default() -> Self { Self { - source_version: SourceVersion::default(), - content_version_fp: None, + version_state: SourceRowVersionState { + source_version: SourceVersion::default(), + content_version_fp: None, + }, processing_sem: Arc::new(Semaphore::new(1)), touched_generation: 0, } @@ -62,13 +69,19 @@ struct LocalSourceRowStateOperator<'a> { processing_sem: Option>, processing_sem_permit: Option, last_source_version: Option, + + // `None` means no advance yet. + // `Some(None)` means the state before advance is `None`. + // `Some(Some(version_state))` means the state before advance is `Some(version_state)`. + prev_version_state: Option>, + + to_remove_entry_on_success: bool, } enum RowStateAdvanceOutcome { Skipped, Advanced { - prev_source_version: Option, - prev_content_version_fp: Option>, + prev_version_state: Option, }, Noop, } @@ -86,6 +99,8 @@ impl<'a> LocalSourceRowStateOperator<'a> { processing_sem: None, processing_sem_permit: None, last_source_version: None, + prev_version_state: None, + to_remove_entry_on_success: false, } } async fn advance( @@ -108,6 +123,7 @@ impl<'a> LocalSourceRowStateOperator<'a> { if !force_reload && entry .get() + .version_state .source_version .should_skip(&source_version, Some(self.update_stats.as_ref())) { @@ -126,14 +142,20 @@ impl<'a> LocalSourceRowStateOperator<'a> { let entry_mut = entry.get_mut(); let outcome = RowStateAdvanceOutcome::Advanced { - prev_source_version: Some(std::mem::take(&mut entry_mut.source_version)), - prev_content_version_fp: entry_mut.content_version_fp.take(), + prev_version_state: Some(std::mem::take(&mut entry_mut.version_state)), }; if source_version.kind == row_indexer::SourceVersionKind::NonExistence { - entry.remove(); - } else { - entry_mut.source_version = source_version; - entry_mut.content_version_fp = content_version_fp.cloned(); + self.to_remove_entry_on_success = true; + } + let prev_version_state = std::mem::replace( + &mut entry_mut.version_state, + SourceRowVersionState { + source_version, + content_version_fp: content_version_fp.cloned(), + }, + ); + if self.prev_version_state.is_none() { + self.prev_version_state = Some(Some(prev_version_state)); } (sem, outcome) } @@ -143,18 +165,22 @@ impl<'a> LocalSourceRowStateOperator<'a> { return Ok(RowStateAdvanceOutcome::Skipped); } let new_entry = SourceRowIndexingState { - source_version, - content_version_fp: content_version_fp.cloned(), + version_state: SourceRowVersionState { + source_version, + content_version_fp: content_version_fp.cloned(), + }, touched_generation, ..Default::default() }; let sem = new_entry.processing_sem.clone(); entry.insert(new_entry); + if self.prev_version_state.is_none() { + self.prev_version_state = Some(None); + } ( Some(sem), RowStateAdvanceOutcome::Advanced { - prev_source_version: None, - prev_content_version_fp: None, + prev_version_state: None, }, ) } @@ -166,6 +192,26 @@ impl<'a> LocalSourceRowStateOperator<'a> { } Ok(outcome) } + + fn commit(self) { + if self.to_remove_entry_on_success { + self.indexing_state.lock().unwrap().rows.remove(self.key); + } + } + + fn rollback(self) { + let Some(prev_version_state) = self.prev_version_state else { + return; + }; + let mut indexing_state = self.indexing_state.lock().unwrap(); + if let Some(prev_version_state) = prev_version_state { + if let Some(entry) = indexing_state.rows.get_mut(self.key) { + entry.version_state = prev_version_state; + } + } else { + indexing_state.rows.remove(self.key); + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -214,12 +260,14 @@ impl SourceIndexingContext { rows.insert( source_pk, SourceRowIndexingState { - source_version: SourceVersion::from_stored( - key_metadata.processed_source_ordinal, - &key_metadata.process_logic_fingerprint, - plan.logic_fingerprint, - ), - content_version_fp: key_metadata.processed_source_fp, + version_state: SourceRowVersionState { + source_version: SourceVersion::from_stored( + key_metadata.processed_source_ordinal, + &key_metadata.process_logic_fingerprint, + plan.logic_fingerprint, + ), + content_version_fp: key_metadata.processed_source_fp, + }, processing_sem: Arc::new(Semaphore::new(1)), touched_generation: scan_generation, }, @@ -271,39 +319,43 @@ impl SourceIndexingContext { &pool, )?; + let source_data = row_input.data; let mut row_state_operator = LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats); - - let source_data = row_input.data; - if let Some(ordinal) = source_data.ordinal - && let Some(content_version_fp) = &source_data.content_version_fp - { - let version = SourceVersion::from_current_with_ordinal(ordinal); - match row_state_operator - .advance( - version, - Some(content_version_fp), - /*force_reload=*/ mode == UpdateMode::ReexportTargets, - ) - .await? - { - RowStateAdvanceOutcome::Skipped => { - return anyhow::Ok(()); - } - RowStateAdvanceOutcome::Advanced { - prev_source_version: Some(prev_source_version), - prev_content_version_fp: Some(prev_content_version_fp), - } => { - // Fast path optimization: may collapse the row based on source version fingerprint. - // Still need to update the tracking table as the processed ordinal advanced. - if mode == UpdateMode::Normal + let result = { + let row_state_operator = &mut row_state_operator; + let row_key = &row_input.key; + async move { + if let Some(ordinal) = source_data.ordinal + && let Some(content_version_fp) = &source_data.content_version_fp + { + let version = SourceVersion::from_current_with_ordinal(ordinal); + match row_state_operator + .advance( + version, + Some(content_version_fp), + /*force_reload=*/ mode == UpdateMode::ReexportTargets, + ) + .await? + { + RowStateAdvanceOutcome::Skipped => { + return anyhow::Ok(()); + } + RowStateAdvanceOutcome::Advanced { + prev_version_state: Some(prev_version_state), + } => { + // Fast path optimization: may collapse the row based on source version fingerprint. + // Still need to update the tracking table as the processed ordinal advanced. + if let Some(prev_content_version_fp) = + &prev_version_state.content_version_fp + && mode == UpdateMode::Normal && row_indexer .try_collapse( &version, content_version_fp.as_slice(), - &prev_source_version, + &prev_version_state.source_version, ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint( - &prev_content_version_fp, + prev_content_version_fp, ), ) .await? @@ -311,21 +363,21 @@ impl SourceIndexingContext { { return Ok(()); } + } + _ => {} + } } - _ => {} - } - } - let (ordinal, content_version_fp, value) = - match (source_data.ordinal, source_data.value) { - (Some(ordinal), Some(value)) => { - (ordinal, source_data.content_version_fp, value) - } - _ => { - let data = import_op + let (ordinal, content_version_fp, value) = + match (source_data.ordinal, source_data.value) { + (Some(ordinal), Some(value)) => { + (ordinal, source_data.content_version_fp, value) + } + _ => { + let data = import_op .executor .get_value( - &row_input.key, + row_key, row_input.key_aux_info.as_ref().ok_or_else(|| { anyhow::anyhow!( "`key_aux_info` must be provided when there's no `source_data`" @@ -338,37 +390,47 @@ impl SourceIndexingContext { }, ) .await?; - ( - data.ordinal - .ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?, - data.content_version_fp, - data.value - .ok_or_else(|| anyhow::anyhow!("value is not available"))?, + ( + data.ordinal.ok_or_else(|| { + anyhow::anyhow!("ordinal is not available") + })?, + data.content_version_fp, + data.value + .ok_or_else(|| anyhow::anyhow!("value is not available"))?, + ) + } + }; + + let source_version = SourceVersion::from_current_data(ordinal, &value); + if let RowStateAdvanceOutcome::Skipped = row_state_operator + .advance( + source_version, + content_version_fp.as_ref(), + /*force_reload=*/ mode == UpdateMode::ReexportTargets, ) + .await? + { + return Ok(()); } - }; - - let source_version = SourceVersion::from_current_data(ordinal, &value); - if let RowStateAdvanceOutcome::Skipped = row_state_operator - .advance( - source_version, - content_version_fp.as_ref(), - /*force_reload=*/ mode == UpdateMode::ReexportTargets, - ) - .await? - { - return Ok(()); - } - let result = row_indexer - .update_source_row(&source_version, value, content_version_fp.clone()) - .await?; - if let SkippedOr::Skipped(version, fp) = result { - row_state_operator - .advance(version, fp.as_ref(), /*force_reload=*/ false) - .await?; + let result = row_indexer + .update_source_row(&source_version, value, content_version_fp.clone()) + .await?; + if let SkippedOr::Skipped(version, fp) = result { + row_state_operator + .advance(version, fp.as_ref(), /*force_reload=*/ false) + .await?; + } + Ok(()) + } + } + .await; + if result.is_ok() { + row_state_operator.commit(); + } else { + row_state_operator.rollback(); } - Ok(()) + result }; let process_and_ack = async { process.await?; @@ -481,6 +543,7 @@ impl SourceIndexingContext { row_state.touched_generation = scan_generation; if update_options.mode == UpdateMode::Normal && row_state + .version_state .source_version .should_skip(&source_version, Some(update_stats.as_ref())) { @@ -518,7 +581,8 @@ impl SourceIndexingContext { let state = self.state.lock().unwrap(); for (key, row_state) in state.rows.iter() { if row_state.touched_generation < scan_generation { - deleted_key_versions.push((key.clone(), row_state.source_version.ordinal)); + deleted_key_versions + .push((key.clone(), row_state.version_state.source_version.ordinal)); } } deleted_key_versions