diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 89d2343e2..e137ecdeb 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -118,6 +118,7 @@ pub struct SourceTrackingInfoForPrecommit { pub staging_target_keys: sqlx::types::Json, pub processed_source_ordinal: Option, + pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, pub process_ordinal: Option, pub target_keys: Option>, @@ -130,7 +131,12 @@ pub async fn read_source_tracking_info_for_precommit( db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>, ) -> Result> { let query_str = format!( - "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2", + if db_setup.has_fast_fingerprint_column { + "processed_source_fp" + } else { + "NULL::bytea AS processed_source_fp" + }, db_setup.table_name ); let precommit_tracking_info = sqlx::query_as(&query_str) @@ -282,6 +288,7 @@ pub async fn delete_source_tracking_info( pub struct TrackedSourceKeyMetadata { pub source_key: serde_json::Value, pub processed_source_ordinal: Option, + pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, } @@ -303,7 +310,12 @@ impl ListTrackedSourceKeyMetadataState { pool: &'a PgPool, ) -> impl Stream> + 'a { self.query_str = format!( - "SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1", + "SELECT source_key, processed_source_ordinal, {}, process_logic_fingerprint FROM {} WHERE source_id = $1", + if db_setup.has_fast_fingerprint_column { + "processed_source_fp" + } else { + "NULL::bytea AS processed_source_fp" + }, db_setup.table_name ); sqlx::query_as(&self.query_str).bind(source_id).fetch(pool) diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index ee41b32fb..5bd52822a 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -193,6 +193,7 @@ impl<'a> Dumper<'a> { let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions { include_ordinal: false, + include_content_version_fp: false, }); while let Some(rows) = rows_stream.next().await { for row in rows?.into_iter() { diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs index 6702cb30f..354586393 100644 --- a/src/execution/indexing_status.rs +++ b/src/execution/indexing_status.rs @@ -41,6 +41,7 @@ pub async fn get_source_row_indexing_status( &interface::SourceExecutorGetOptions { include_value: false, include_ordinal: true, + include_content_version_fp: false, }, ); let (last_processed, current) = try_join!(last_processed_fut, current_fut)?; diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 6e434171f..13e3ff7d6 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -1,5 +1,5 @@ use crate::{ - execution::{source_indexer::ProcessSourceKeyOptions, stats::UpdateStats}, + execution::{source_indexer::ProcessSourceKeyInput, stats::UpdateStats}, prelude::*, }; @@ -200,10 +200,9 @@ impl SourceUpdateTask { SharedAckFn::ack(&shared_ack_fn).await }), pool.clone(), - ProcessSourceKeyOptions { + ProcessSourceKeyInput { key_aux_info: Some(change.key_aux_info), - source_data: change.data, - ..Default::default() + data: change.data, }, )); } diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 510f86eb7..b967ede39 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -42,7 +42,7 @@ pub enum SourceVersionKind { NonExistence, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] pub struct SourceVersion { pub ordinal: Ordinal, pub kind: SourceVersionKind, @@ -91,6 +91,7 @@ impl SourceVersion { ) } + /// Create a version from the current ordinal. For existing rows only. pub fn from_current_with_ordinal(ordinal: Ordinal) -> Self { Self { ordinal, @@ -98,15 +99,12 @@ impl SourceVersion { } } - pub fn from_current_data(data: &interface::SourceData) -> Self { - let kind = match &data.value { + pub fn from_current_data(ordinal: Ordinal, value: &interface::SourceValue) -> Self { + let kind = match value { interface::SourceValue::Existence(_) => SourceVersionKind::CurrentLogic, interface::SourceValue::NonExistence => SourceVersionKind::NonExistence, }; - Self { - ordinal: data.ordinal, - kind, - } + Self { ordinal, kind } } pub fn should_skip( @@ -135,7 +133,7 @@ impl SourceVersion { pub enum SkippedOr { Normal(T), - Skipped(SourceVersion), + Skipped(SourceVersion, Option>), } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -202,9 +200,12 @@ async fn precommit_source_tracking_info( .await?; if let Some(tracking_info) = &tracking_info { let existing_source_version = - SourceVersion::from_stored_precommit_info(tracking_info, logic_fp); + SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp); if existing_source_version.should_skip(source_version, Some(update_stats)) { - return Ok(SkippedOr::Skipped(existing_source_version)); + return Ok(SkippedOr::Skipped( + existing_source_version, + tracking_info.processed_source_fp.clone(), + )); } } let tracking_info_exists = tracking_info.is_some(); @@ -506,98 +507,337 @@ async fn commit_source_tracking_info( Ok(()) } -#[allow(clippy::too_many_arguments)] -async fn try_content_hash_optimization( +pub struct RowIndexer<'a> { + src_eval_ctx: &'a SourceRowEvaluationContext<'a>, + setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, + update_stats: &'a stats::UpdateStats, + pool: &'a PgPool, + source_id: i32, - src_eval_ctx: &SourceRowEvaluationContext<'_>, - source_key_json: &serde_json::Value, - source_version: &SourceVersion, - current_hash: &[u8], - tracking_info: &db_tracking::SourceTrackingInfoForProcessing, - existing_version: &Option, - db_setup: &db_tracking_setup::TrackingTableSetupState, - update_stats: &stats::UpdateStats, - pool: &PgPool, -) -> Result>> { - // Check if we can use content hash optimization - if existing_version - .as_ref() - .is_none_or(|v| v.kind != SourceVersionKind::CurrentLogic) - { - return Ok(None); + process_time: chrono::DateTime, + source_key_json: serde_json::Value, +} +pub enum ContentHashBasedCollapsingBaseline<'a> { + ProcessedSourceFingerprint(&'a Vec), + SourceTrackingInfo(&'a db_tracking::SourceTrackingInfoForProcessing), +} + +impl<'a> RowIndexer<'a> { + pub fn new( + src_eval_ctx: &'a SourceRowEvaluationContext<'_>, + setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, + pool: &'a PgPool, + update_stats: &'a stats::UpdateStats, + ) -> Result { + Ok(Self { + source_id: setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id, + process_time: chrono::Utc::now(), + source_key_json: serde_json::to_value(src_eval_ctx.key)?, + + src_eval_ctx, + setup_execution_ctx, + pool, + update_stats, + }) } - let existing_hash: Option>> = if db_setup.has_fast_fingerprint_column { - tracking_info - .processed_source_fp - .as_ref() - .map(|fp| Cow::Borrowed(fp)) - } else { - if tracking_info - .max_process_ordinal - .zip(tracking_info.process_ordinal) - .is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord) - { - return Ok(None); + pub async fn update_source_row( + &mut self, + source_version: &SourceVersion, + source_value: interface::SourceValue, + source_version_fp: Option>, + ) -> Result> { + let tracking_setup_state = &self.setup_execution_ctx.setup_state.tracking_table; + // Phase 1: Check existing tracking info and apply optimizations + let existing_tracking_info = read_source_tracking_info_for_processing( + self.source_id, + &self.source_key_json, + &self.setup_execution_ctx.setup_state.tracking_table, + self.pool, + ) + .await?; + + let existing_version = match &existing_tracking_info { + Some(info) => { + let existing_version = SourceVersion::from_stored_processing_info( + info, + self.src_eval_ctx.plan.logic_fingerprint, + ); + + // First check ordinal-based skipping + if existing_version.should_skip(source_version, Some(self.update_stats)) { + return Ok(SkippedOr::Skipped( + existing_version, + info.processed_source_fp.clone(), + )); + } + + Some(existing_version) + } + None => None, + }; + + // Compute content hash once if needed for both optimization and evaluation + let content_version_fp = match (source_version_fp, &source_value) { + (Some(fp), _) => Some(fp), + (None, interface::SourceValue::Existence(field_values)) => Some(Vec::from( + Fingerprinter::default() + .with(field_values)? + .into_fingerprint() + .0, + )), + (None, interface::SourceValue::NonExistence) => None, + }; + + if let Some(content_version_fp) = &content_version_fp { + let baseline = if tracking_setup_state.has_fast_fingerprint_column { + existing_tracking_info + .as_ref() + .and_then(|info| info.processed_source_fp.as_ref()) + .map(ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint) + } else { + existing_tracking_info + .as_ref() + .map(ContentHashBasedCollapsingBaseline::SourceTrackingInfo) + }; + if let Some(baseline) = baseline + && let Some(existing_version) = &existing_version + && let Some(optimization_result) = self + .try_collapse( + source_version, + &content_version_fp.as_slice(), + &existing_version, + baseline, + ) + .await? + { + return Ok(optimization_result); + } } - tracking_info - .memoization_info - .as_ref() - .and_then(|info| info.0.as_ref()) - .and_then(|stored_info| stored_info.content_hash.as_ref()) - .map(|content_hash| BASE64_STANDARD.decode(content_hash)) - .transpose()? - .map(Cow::Owned) - }; + let (output, stored_mem_info, source_fp) = { + let extracted_memoization_info = existing_tracking_info + .and_then(|info| info.memoization_info) + .and_then(|info| info.0); + + match source_value { + interface::SourceValue::Existence(source_value) => { + let evaluation_memory = EvaluationMemory::new( + self.process_time, + extracted_memoization_info, + EvaluationMemoryOptions { + enable_cache: true, + evaluation_only: false, + }, + ); + + let output = + evaluate_source_entry(self.src_eval_ctx, source_value, &evaluation_memory) + .await?; + let mut stored_info = evaluation_memory.into_stored()?; + if tracking_setup_state.has_fast_fingerprint_column { + (Some(output), stored_info, content_version_fp) + } else { + stored_info.content_hash = + content_version_fp.map(|fp| BASE64_STANDARD.encode(fp)); + (Some(output), stored_info, None) + } + } + interface::SourceValue::NonExistence => (None, Default::default(), None), + } + }; - if existing_hash.as_ref().map(|fp| fp.as_slice()) != Some(current_hash) { - return Ok(None); - } + // Phase 2 (precommit): Update with the memoization info and stage target keys. + let precommit_output = precommit_source_tracking_info( + self.source_id, + &self.source_key_json, + source_version, + self.src_eval_ctx.plan.logic_fingerprint, + output.as_ref().map(|scope_value| PrecommitData { + evaluate_output: scope_value, + memoization_info: &stored_mem_info, + }), + &self.process_time, + &self.setup_execution_ctx.setup_state.tracking_table, + &self.src_eval_ctx.plan.export_ops, + &self.setup_execution_ctx.export_ops, + self.update_stats, + self.pool, + ) + .await?; + let precommit_output = match precommit_output { + SkippedOr::Normal(output) => output, + SkippedOr::Skipped(v, fp) => return Ok(SkippedOr::Skipped(v, fp)), + }; - // Content hash matches - try optimization - let mut txn = pool.begin().await?; + // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. + let mut target_mutations = precommit_output.target_mutations; + let apply_futs = + self.src_eval_ctx + .plan + .export_op_groups + .iter() + .filter_map(|export_op_group| { + let mutations_w_ctx: Vec<_> = export_op_group + .op_idx + .iter() + .filter_map(|export_op_idx| { + let export_op = &self.src_eval_ctx.plan.export_ops[*export_op_idx]; + target_mutations + .remove( + &self.setup_execution_ctx.export_ops[*export_op_idx].target_id, + ) + .filter(|m| !m.is_empty()) + .map(|mutation| interface::ExportTargetMutationWithContext { + mutation, + export_context: export_op.export_context.as_ref(), + }) + }) + .collect(); + (!mutations_w_ctx.is_empty()).then(|| { + export_op_group + .target_factory + .apply_mutation(mutations_w_ctx) + }) + }); - let current_tracking_info = db_tracking::read_source_tracking_info_for_precommit( - source_id, - source_key_json, - db_setup, - &mut *txn, - ) - .await?; + // TODO: Handle errors. + try_join_all(apply_futs).await?; - let Some(current_tracking_info) = current_tracking_info else { - return Ok(None); - }; + // Phase 4: Update the tracking record. + commit_source_tracking_info( + self.source_id, + &self.source_key_json, + source_version, + source_fp, + &self.src_eval_ctx.plan.logic_fingerprint.0, + precommit_output.metadata, + &self.process_time, + tracking_setup_state, + self.pool, + ) + .await?; - // Check 1: Same check as precommit - verify no newer version exists - let current_source_version = SourceVersion::from_stored_precommit_info( - ¤t_tracking_info, - src_eval_ctx.plan.logic_fingerprint, - ); - if current_source_version.should_skip(source_version, Some(update_stats)) { - return Ok(Some(SkippedOr::Skipped(current_source_version))); - } + if let Some(existing_version) = existing_version { + if output.is_some() { + if !source_version.ordinal.is_available() + || source_version.ordinal != existing_version.ordinal + { + self.update_stats.num_updates.inc(1); + } else { + self.update_stats.num_reprocesses.inc(1); + } + } else { + self.update_stats.num_deletions.inc(1); + } + } else if output.is_some() { + self.update_stats.num_insertions.inc(1); + } - // Check 2: Verify process_ordinal hasn't changed (no concurrent processing) - let original_process_ordinal = tracking_info.process_ordinal; - if current_tracking_info.process_ordinal != original_process_ordinal { - return Ok(None); + Ok(SkippedOr::Normal(())) } - // Safe to apply optimization - just update tracking table - db_tracking::update_source_tracking_ordinal( - source_id, - source_key_json, - source_version.ordinal.0, - db_setup, - &mut *txn, - ) - .await?; + pub async fn try_collapse( + &mut self, + source_version: &SourceVersion, + content_version_fp: &[u8], + existing_version: &SourceVersion, + baseline: ContentHashBasedCollapsingBaseline<'_>, + ) -> Result>> { + let tracking_table_setup = &self.setup_execution_ctx.setup_state.tracking_table; + + // Check if we can use content hash optimization + if existing_version.kind != SourceVersionKind::CurrentLogic { + return Ok(None); + } - txn.commit().await?; - update_stats.num_no_change.inc(1); - Ok(Some(SkippedOr::Normal(()))) + let existing_hash: Option>> = match baseline { + ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint(fp) => { + Some(Cow::Borrowed(fp)) + } + ContentHashBasedCollapsingBaseline::SourceTrackingInfo(tracking_info) => { + if tracking_info + .max_process_ordinal + .zip(tracking_info.process_ordinal) + .is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord) + { + return Ok(None); + } + + tracking_info + .memoization_info + .as_ref() + .and_then(|info| info.0.as_ref()) + .and_then(|stored_info| stored_info.content_hash.as_ref()) + .map(|content_hash| BASE64_STANDARD.decode(content_hash)) + .transpose()? + .map(Cow::Owned) + } + }; + if existing_hash.as_ref().map(|fp| fp.as_slice()) != Some(content_version_fp) { + return Ok(None); + } + + // Content hash matches - try optimization + let mut txn = self.pool.begin().await?; + + let existing_tracking_info = db_tracking::read_source_tracking_info_for_precommit( + self.source_id, + &self.source_key_json, + tracking_table_setup, + &mut *txn, + ) + .await?; + + let Some(existing_tracking_info) = existing_tracking_info else { + return Ok(None); + }; + + // Check 1: Same check as precommit - verify no newer version exists + let existing_source_version = SourceVersion::from_stored_precommit_info( + &existing_tracking_info, + self.src_eval_ctx.plan.logic_fingerprint, + ); + if existing_source_version.should_skip(source_version, Some(self.update_stats)) { + return Ok(Some(SkippedOr::Skipped( + existing_source_version, + existing_tracking_info.processed_source_fp.clone(), + ))); + } + + // Check 2: Verify the situation hasn't changed (no concurrent processing) + match baseline { + ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint(fp) => { + if existing_tracking_info + .processed_source_fp + .as_ref() + .map(|fp| fp.as_slice()) + != Some(fp) + { + return Ok(None); + } + } + ContentHashBasedCollapsingBaseline::SourceTrackingInfo(info) => { + if existing_tracking_info.process_ordinal != info.process_ordinal { + return Ok(None); + } + } + } + + // Safe to apply optimization - just update tracking table + db_tracking::update_source_tracking_ordinal( + self.source_id, + &self.source_key_json, + source_version.ordinal.0, + tracking_table_setup, + &mut *txn, + ) + .await?; + + txn.commit().await?; + self.update_stats.num_no_change.inc(1); + Ok(Some(SkippedOr::Normal(()))) + } } pub async fn evaluate_source_entry_with_memory( @@ -633,6 +873,7 @@ pub async fn evaluate_source_entry_with_memory( &SourceExecutorGetOptions { include_value: true, include_ordinal: false, + include_content_version_fp: false, }, ) .await? @@ -647,195 +888,6 @@ pub async fn evaluate_source_entry_with_memory( Ok(output) } -pub async fn update_source_row( - src_eval_ctx: &SourceRowEvaluationContext<'_>, - setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext, - source_value: interface::SourceValue, - source_version: &SourceVersion, - pool: &PgPool, - update_stats: &stats::UpdateStats, -) -> Result> { - let tracking_setup_state = &setup_execution_ctx.setup_state.tracking_table; - - let source_key_json = serde_json::to_value(src_eval_ctx.key)?; - let process_time = chrono::Utc::now(); - let source_id = setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id; - - // Phase 1: Check existing tracking info and apply optimizations - let existing_tracking_info = read_source_tracking_info_for_processing( - source_id, - &source_key_json, - &setup_execution_ctx.setup_state.tracking_table, - pool, - ) - .await?; - - let existing_version = match &existing_tracking_info { - Some(info) => { - let existing_version = SourceVersion::from_stored_processing_info( - info, - src_eval_ctx.plan.logic_fingerprint, - ); - - // First check ordinal-based skipping - if existing_version.should_skip(source_version, Some(update_stats)) { - return Ok(SkippedOr::Skipped(existing_version)); - } - - Some(existing_version) - } - None => None, - }; - - // Compute content hash once if needed for both optimization and evaluation - let current_content_hash = match &source_value { - interface::SourceValue::Existence(source_value) => Some( - Fingerprinter::default() - .with(source_value)? - .into_fingerprint(), - ), - interface::SourceValue::NonExistence => None, - }; - - if let (Some(current_hash), Some(existing_tracking_info)) = - (¤t_content_hash, &existing_tracking_info) - { - if let Some(optimization_result) = try_content_hash_optimization( - source_id, - src_eval_ctx, - &source_key_json, - source_version, - current_hash.as_slice(), - existing_tracking_info, - &existing_version, - tracking_setup_state, - update_stats, - pool, - ) - .await? - { - return Ok(optimization_result); - } - } - - let (output, stored_mem_info, source_fp) = { - let extracted_memoization_info = existing_tracking_info - .and_then(|info| info.memoization_info) - .and_then(|info| info.0); - - match source_value { - interface::SourceValue::Existence(source_value) => { - let evaluation_memory = EvaluationMemory::new( - process_time, - extracted_memoization_info, - EvaluationMemoryOptions { - enable_cache: true, - evaluation_only: false, - }, - ); - - let output = - evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?; - let mut stored_info = evaluation_memory.into_stored()?; - let content_hash = current_content_hash.map(|fp| fp.0.to_vec()); - if tracking_setup_state.has_fast_fingerprint_column { - (Some(output), stored_info, content_hash) - } else { - stored_info.content_hash = content_hash.map(|fp| BASE64_STANDARD.encode(fp)); - (Some(output), stored_info, None) - } - } - interface::SourceValue::NonExistence => (None, Default::default(), None), - } - }; - - // Phase 2 (precommit): Update with the memoization info and stage target keys. - let precommit_output = precommit_source_tracking_info( - source_id, - &source_key_json, - source_version, - src_eval_ctx.plan.logic_fingerprint, - output.as_ref().map(|scope_value| PrecommitData { - evaluate_output: scope_value, - memoization_info: &stored_mem_info, - }), - &process_time, - &setup_execution_ctx.setup_state.tracking_table, - &src_eval_ctx.plan.export_ops, - &setup_execution_ctx.export_ops, - update_stats, - pool, - ) - .await?; - let precommit_output = match precommit_output { - SkippedOr::Normal(output) => output, - SkippedOr::Skipped(source_version) => return Ok(SkippedOr::Skipped(source_version)), - }; - - // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. - let mut target_mutations = precommit_output.target_mutations; - let apply_futs = src_eval_ctx - .plan - .export_op_groups - .iter() - .filter_map(|export_op_group| { - let mutations_w_ctx: Vec<_> = export_op_group - .op_idx - .iter() - .filter_map(|export_op_idx| { - let export_op = &src_eval_ctx.plan.export_ops[*export_op_idx]; - target_mutations - .remove(&setup_execution_ctx.export_ops[*export_op_idx].target_id) - .filter(|m| !m.is_empty()) - .map(|mutation| interface::ExportTargetMutationWithContext { - mutation, - export_context: export_op.export_context.as_ref(), - }) - }) - .collect(); - (!mutations_w_ctx.is_empty()).then(|| { - export_op_group - .target_factory - .apply_mutation(mutations_w_ctx) - }) - }); - - // TODO: Handle errors. - try_join_all(apply_futs).await?; - - // Phase 4: Update the tracking record. - commit_source_tracking_info( - source_id, - &source_key_json, - source_version, - source_fp, - &src_eval_ctx.plan.logic_fingerprint.0, - precommit_output.metadata, - &process_time, - tracking_setup_state, - pool, - ) - .await?; - - if let Some(existing_version) = existing_version { - if output.is_some() { - if !source_version.ordinal.is_available() - || source_version.ordinal != existing_version.ordinal - { - update_stats.num_updates.inc(1); - } else { - update_stats.num_reprocesses.inc(1); - } - } else { - update_stats.num_deletions.inc(1); - } - } else if output.is_some() { - update_stats.num_insertions.inc(1); - } - - Ok(SkippedOr::Normal(())) -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index bff79f341..c5955cd8c 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,4 +1,5 @@ use crate::{ + execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*, service::error::{SharedError, SharedResult, SharedResultExt}, }; @@ -6,7 +7,10 @@ use crate::{ use futures::future::Ready; use sqlx::PgPool; use std::collections::{HashMap, hash_map}; -use tokio::{sync::Semaphore, task::JoinSet}; +use tokio::{ + sync::{OwnedSemaphorePermit, Semaphore}, + task::JoinSet, +}; use super::{ db_tracking, @@ -18,6 +22,7 @@ use super::{ use crate::ops::interface; struct SourceRowIndexingState { source_version: SourceVersion, + content_version_fp: Option>, processing_sem: Arc, touched_generation: usize, } @@ -26,6 +31,7 @@ impl Default for SourceRowIndexingState { fn default() -> Self { Self { source_version: SourceVersion::default(), + content_version_fp: None, processing_sem: Arc::new(Semaphore::new(1)), touched_generation: 0, } @@ -48,11 +54,122 @@ pub struct SourceIndexingContext { pub const NO_ACK: Option Ready>> = None; -#[derive(Default)] -pub struct ProcessSourceKeyOptions { - /// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`. +struct LocalSourceRowStateOperator<'a> { + key: &'a value::KeyValue, + indexing_state: &'a Mutex, + update_stats: &'a Arc, + + processing_sem: Option>, + processing_sem_permit: Option, + last_source_version: Option, +} + +enum RowStateAdvanceOutcome { + Skipped, + Advanced { + prev_source_version: Option, + prev_content_version_fp: Option>, + }, + Noop, +} + +impl<'a> LocalSourceRowStateOperator<'a> { + fn new( + key: &'a value::KeyValue, + indexing_state: &'a Mutex, + update_stats: &'a Arc, + ) -> Self { + Self { + key, + indexing_state, + update_stats, + processing_sem: None, + processing_sem_permit: None, + last_source_version: None, + } + } + async fn advance( + &mut self, + source_version: SourceVersion, + content_version_fp: Option<&Vec>, + ) -> Result { + let (sem, outcome) = { + let mut state = self.indexing_state.lock().unwrap(); + let touched_generation = state.scan_generation; + + if self.last_source_version == Some(source_version) { + return Ok(RowStateAdvanceOutcome::Noop); + } + self.last_source_version = Some(source_version); + + match state.rows.entry(self.key.clone()) { + hash_map::Entry::Occupied(mut entry) => { + if entry + .get() + .source_version + .should_skip(&source_version, Some(self.update_stats.as_ref())) + { + return Ok(RowStateAdvanceOutcome::Skipped); + } + let entry_sem = &entry.get().processing_sem; + let sem = if self + .processing_sem + .as_ref() + .is_none_or(|sem| !Arc::ptr_eq(sem, &entry_sem)) + { + Some(entry_sem.clone()) + } else { + None + }; + + let entry_mut = entry.get_mut(); + let outcome = RowStateAdvanceOutcome::Advanced { + prev_source_version: Some(std::mem::take(&mut entry_mut.source_version)), + prev_content_version_fp: entry_mut.content_version_fp.take(), + }; + if source_version.kind == row_indexer::SourceVersionKind::NonExistence { + entry.remove(); + } else { + entry_mut.source_version = source_version; + entry_mut.content_version_fp = content_version_fp.cloned(); + } + (sem, outcome) + } + hash_map::Entry::Vacant(entry) => { + if source_version.kind == row_indexer::SourceVersionKind::NonExistence { + self.update_stats.num_no_change.inc(1); + return Ok(RowStateAdvanceOutcome::Skipped); + } + let new_entry = SourceRowIndexingState { + source_version, + content_version_fp: content_version_fp.cloned(), + touched_generation, + ..Default::default() + }; + let sem = new_entry.processing_sem.clone(); + entry.insert(new_entry); + ( + Some(sem), + RowStateAdvanceOutcome::Advanced { + prev_source_version: None, + prev_content_version_fp: None, + }, + ) + } + } + }; + if let Some(sem) = sem { + self.processing_sem_permit = Some(sem.clone().acquire_owned().await?); + self.processing_sem = Some(sem); + } + Ok(outcome) + } +} + +pub struct ProcessSourceKeyInput { + /// `key_aux_info` is not available for deletions. It must be provided if `data.value` is `None`. pub key_aux_info: Option, - pub source_data: Option, + pub data: interface::PartialSourceRowData, } impl SourceIndexingContext { @@ -88,6 +205,7 @@ impl SourceIndexingContext { &key_metadata.process_logic_fingerprint, plan.logic_fingerprint, ), + content_version_fp: key_metadata.processed_source_fp, processing_sem: Arc::new(Semaphore::new(1)), touched_generation: scan_generation, }, @@ -117,130 +235,114 @@ impl SourceIndexingContext { _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, pool: PgPool, - options: ProcessSourceKeyOptions, + inputs: ProcessSourceKeyInput, ) { let process = async { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; let schema = &self.flow.data_schema; - let source_data = match options.source_data { - Some(source_data) => source_data, - None => import_op - .executor - .get_value( - &key, - options.key_aux_info.as_ref().ok_or_else(|| { - anyhow::anyhow!( - "`key_aux_info` must be provided when there's no `source_data`" - ) - })?, - &interface::SourceExecutorGetOptions { - include_value: true, - include_ordinal: true, - }, - ) - .await? - .try_into()?, - }; - let source_version = SourceVersion::from_current_data(&source_data); - let processing_sem = { - let mut state = self.state.lock().unwrap(); - let touched_generation = state.scan_generation; - match state.rows.entry(key.clone()) { - hash_map::Entry::Occupied(mut entry) => { - if entry - .get() - .source_version - .should_skip(&source_version, Some(update_stats.as_ref())) - { - return anyhow::Ok(()); - } - let sem = entry.get().processing_sem.clone(); - if source_version.kind == row_indexer::SourceVersionKind::NonExistence { - entry.remove(); - } else { - entry.get_mut().source_version = source_version.clone(); - } - sem - } - hash_map::Entry::Vacant(entry) => { - if source_version.kind == row_indexer::SourceVersionKind::NonExistence { - update_stats.num_no_change.inc(1); - return anyhow::Ok(()); - } - let new_entry = SourceRowIndexingState { - source_version: source_version.clone(), - touched_generation, - ..Default::default() - }; - let sem = new_entry.processing_sem.clone(); - entry.insert(new_entry); - sem - } - } + let eval_ctx = SourceRowEvaluationContext { + plan: &plan, + import_op, + schema, + key: &key, + import_op_idx: self.source_idx, }; - - let _processing_permit = processing_sem.acquire().await?; - let result = row_indexer::update_source_row( - &SourceRowEvaluationContext { - plan: &plan, - import_op, - schema, - key: &key, - import_op_idx: self.source_idx, - }, + let mut row_indexer = row_indexer::RowIndexer::new( + &eval_ctx, &self.setup_execution_ctx, - source_data.value, - &source_version, &pool, &update_stats, - ) - .await?; - let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result + )?; + + let mut row_state_operator = + LocalSourceRowStateOperator::new(&key, &self.state, &update_stats); + + let source_data = inputs.data; + if let Some(ordinal) = source_data.ordinal + && let Some(content_version_fp) = &source_data.content_version_fp { - Some(existing_source_version) - } else if source_version.kind == row_indexer::SourceVersionKind::NonExistence { - Some(source_version) - } else { - None - }; - if let Some(target_source_version) = target_source_version { - let mut state = self.state.lock().unwrap(); - let scan_generation = state.scan_generation; - let entry = state.rows.entry(key.clone()); - match entry { - hash_map::Entry::Occupied(mut entry) => { - if !entry - .get() - .source_version - .should_skip(&target_source_version, None) - { - if target_source_version.kind - == row_indexer::SourceVersionKind::NonExistence - { - entry.remove(); - } else { - let mut_entry = entry.get_mut(); - mut_entry.source_version = target_source_version; - mut_entry.touched_generation = scan_generation; - } - } + let version = SourceVersion::from_current_with_ordinal(ordinal); + match row_state_operator + .advance(version, Some(content_version_fp)) + .await? + { + RowStateAdvanceOutcome::Skipped => { + return anyhow::Ok(()); } - hash_map::Entry::Vacant(entry) => { - if target_source_version.kind - != row_indexer::SourceVersionKind::NonExistence + RowStateAdvanceOutcome::Advanced { + prev_source_version: Some(prev_source_version), + prev_content_version_fp: Some(prev_content_version_fp), + } => { + // Fast path optimization: may collapse the row based on source version fingerprint. + // Still need to update the tracking table as the processed ordinal advanced. + if row_indexer + .try_collapse( + &version, + content_version_fp.as_slice(), + &prev_source_version, + ContentHashBasedCollapsingBaseline::ProcessedSourceFingerprint( + &prev_content_version_fp, + ), + ) + .await? + .is_some() { - entry.insert(SourceRowIndexingState { - source_version: target_source_version, - touched_generation: scan_generation, - ..Default::default() - }); + return Ok(()); } } + _ => {} } } - anyhow::Ok(()) + + let (ordinal, value, content_version_fp) = + match (source_data.ordinal, source_data.value) { + (Some(ordinal), Some(value)) => { + (ordinal, value, source_data.content_version_fp) + } + _ => { + let data = import_op + .executor + .get_value( + &key, + inputs.key_aux_info.as_ref().ok_or_else(|| { + anyhow::anyhow!( + "`key_aux_info` must be provided when there's no `source_data`" + ) + })?, + &interface::SourceExecutorGetOptions { + include_value: true, + include_ordinal: true, + include_content_version_fp: true, + }, + ) + .await?; + ( + data.ordinal + .ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?, + data.value + .ok_or_else(|| anyhow::anyhow!("value is not available"))?, + data.content_version_fp, + ) + } + }; + + let source_version = SourceVersion::from_current_data(ordinal, &value); + if let RowStateAdvanceOutcome::Skipped = row_state_operator + .advance(source_version, content_version_fp.as_ref()) + .await? + { + return Ok(()); + } + + let result = row_indexer + .update_source_row(&source_version, value, content_version_fp.clone()) + .await?; + if let SkippedOr::Skipped(version, fp) = result { + row_state_operator.advance(version, fp.as_ref()).await?; + } + Ok(()) }; let process_and_ack = async { process.await?; @@ -311,6 +413,7 @@ impl SourceIndexingContext { .executor .list(&interface::SourceExecutorListOptions { include_ordinal: true, + include_content_version_fp: true, }); let mut join_set = JoinSet::new(); let scan_generation = { @@ -346,9 +449,13 @@ impl SourceIndexingContext { concur_permit, NO_ACK, pool.clone(), - ProcessSourceKeyOptions { + ProcessSourceKeyInput { key_aux_info: Some(row.key_aux_info), - ..Default::default() + data: interface::PartialSourceRowData { + value: None, + ordinal: Some(source_version.ordinal), + content_version_fp: row.content_version_fp, + }, }, )); } @@ -372,10 +479,6 @@ impl SourceIndexingContext { deleted_key_versions }; for (key, source_ordinal) in deleted_key_versions { - let source_data = interface::SourceData { - value: interface::SourceValue::NonExistence, - ordinal: source_ordinal, - }; let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?; join_set.spawn(self.clone().process_source_key( key, @@ -383,9 +486,13 @@ impl SourceIndexingContext { concur_permit, NO_ACK, pool.clone(), - ProcessSourceKeyOptions { - source_data: Some(source_data), - ..Default::default() + ProcessSourceKeyInput { + key_aux_info: None, + data: interface::PartialSourceRowData { + value: Some(interface::SourceValue::NonExistence), + ordinal: Some(source_ordinal), + content_version_fp: None, + }, }, )); } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 1e865d0ac..d22d42b36 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -56,6 +56,14 @@ pub struct PartialSourceRowMetadata { pub key_aux_info: serde_json::Value, pub ordinal: Option, + + /// A content version fingerprint can be anything that changes when the content of the row changes. + /// Note that it's acceptable if sometimes the fingerprint differs even though the content is the same, + /// which will lead to less optimization opportunities but won't break correctness. + /// + /// It's optional. The source shouldn't use generic way to compute it, e.g. computing a hash of the content. + /// The framework will do so. If there's no fast way to get it from the source, leave it as `None`. + pub content_version_fp: Option>, } #[derive(Debug)] @@ -84,11 +92,6 @@ impl SourceValue { } } -pub struct SourceData { - pub value: SourceValue, - pub ordinal: Ordinal, -} - pub struct SourceChange { pub key: KeyValue, /// Auxiliary information for the source row, to be used when reading the content. @@ -96,7 +99,7 @@ pub struct SourceChange { pub key_aux_info: serde_json::Value, /// If None, the engine will poll to get the latest existence state and value. - pub data: Option, + pub data: PartialSourceRowData, } pub struct SourceChangeMessage { @@ -107,34 +110,23 @@ pub struct SourceChangeMessage { #[derive(Debug, Default)] pub struct SourceExecutorListOptions { pub include_ordinal: bool, + pub include_content_version_fp: bool, } #[derive(Debug, Default)] pub struct SourceExecutorGetOptions { pub include_ordinal: bool, pub include_value: bool, + pub include_content_version_fp: bool, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct PartialSourceRowData { pub value: Option, pub ordinal: Option, + pub content_version_fp: Option>, } -impl TryFrom for SourceData { - type Error = anyhow::Error; - - fn try_from(data: PartialSourceRowData) -> Result { - Ok(Self { - value: data - .value - .ok_or_else(|| anyhow::anyhow!("value is missing"))?, - ordinal: data - .ordinal - .ok_or_else(|| anyhow::anyhow!("ordinal is missing"))?, - }) - } -} #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 4b3422ae7..765a277c5 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -89,6 +89,7 @@ impl SourceExecutor for Executor { key: KeyValue::Str(key.to_string().into()), key_aux_info: serde_json::Value::Null, ordinal: obj.last_modified().map(datetime_to_ordinal), + content_version_fp: None, }); } } @@ -117,6 +118,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } let resp = self @@ -131,6 +133,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } r => r?, @@ -150,7 +153,11 @@ impl SourceExecutor for Executor { } else { None }; - Ok(PartialSourceRowData { value, ordinal }) + Ok(PartialSourceRowData { + value, + ordinal, + content_version_fp: None, + }) } async fn change_stream( @@ -251,7 +258,7 @@ impl Executor { changes.push(SourceChange { key: KeyValue::Str(decoded_key), key_aux_info: serde_json::Value::Null, - data: None, + data: PartialSourceRowData::default(), }); } } diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index f5320468a..b54e6e644 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -79,6 +79,7 @@ impl SourceExecutor for Executor { key: KeyValue::Str(key.clone().into()), key_aux_info: serde_json::Value::Null, ordinal, + content_version_fp: None, }); } } @@ -107,6 +108,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } @@ -124,6 +126,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } }; @@ -147,7 +150,11 @@ impl SourceExecutor for Executor { None }; - Ok(PartialSourceRowData { value, ordinal }) + Ok(PartialSourceRowData { + value, + ordinal, + content_version_fp: None, + }) } async fn change_stream( diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 5d99eae3e..1fe0f2dd3 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -137,6 +137,7 @@ impl Executor { key: KeyValue::Str(id), key_aux_info: serde_json::Value::Null, ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, + content_version_fp: None, }) } else { None @@ -212,7 +213,7 @@ impl Executor { changes.push(SourceChange { key: KeyValue::Str(Arc::from(file_id)), key_aux_info: serde_json::Value::Null, - data: None, + data: PartialSourceRowData::default(), }); } } @@ -348,6 +349,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } }; @@ -404,7 +406,11 @@ impl SourceExecutor for Executor { } None => None, }; - Ok(PartialSourceRowData { value, ordinal }) + Ok(PartialSourceRowData { + value, + ordinal, + content_version_fp: None, + }) } async fn change_stream( diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 05f457c07..84f8ed18c 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -58,6 +58,7 @@ impl SourceExecutor for Executor { key: KeyValue::Str(relative_path.into()), key_aux_info: serde_json::Value::Null, ordinal, + content_version_fp: None, }]; } } @@ -80,6 +81,7 @@ impl SourceExecutor for Executor { return Ok(PartialSourceRowData { value: Some(SourceValue::NonExistence), ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, }); } let path = self.root_path.join(key.str_value()?.as_ref()); @@ -106,7 +108,11 @@ impl SourceExecutor for Executor { } else { None }; - Ok(PartialSourceRowData { value, ordinal }) + Ok(PartialSourceRowData { + value, + ordinal, + content_version_fp: None, + }) } } diff --git a/src/service/flows.rs b/src/service/flows.rs index e1ebce46b..52839ab1d 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -107,6 +107,7 @@ pub async fn get_keys( let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions { include_ordinal: false, + include_content_version_fp: false, }); let mut keys = Vec::new(); while let Some(rows) = rows_stream.next().await {