diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index b967ede39..513689463 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -7,7 +7,6 @@ use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use super::db_tracking::{self, TrackedTargetKeyInfo, read_source_tracking_info_for_processing}; -use super::db_tracking_setup; use super::evaluator::{ EvaluateSourceEntryOutput, SourceRowEvaluationContext, evaluate_source_entry, }; @@ -175,338 +174,6 @@ struct PrecommitOutput { target_mutations: HashMap, } -#[allow(clippy::too_many_arguments)] -async fn precommit_source_tracking_info( - source_id: i32, - source_key_json: &serde_json::Value, - source_version: &SourceVersion, - logic_fp: Fingerprint, - data: Option>, - process_timestamp: &chrono::DateTime, - db_setup: &db_tracking_setup::TrackingTableSetupState, - export_ops: &[AnalyzedExportOp], - export_ops_exec_ctx: &[exec_ctx::ExportOpExecutionContext], - update_stats: &stats::UpdateStats, - pool: &PgPool, -) -> Result> { - let mut txn = pool.begin().await?; - - let tracking_info = db_tracking::read_source_tracking_info_for_precommit( - source_id, - source_key_json, - db_setup, - &mut *txn, - ) - .await?; - if let Some(tracking_info) = &tracking_info { - let existing_source_version = - SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp); - if existing_source_version.should_skip(source_version, Some(update_stats)) { - return Ok(SkippedOr::Skipped( - existing_source_version, - tracking_info.processed_source_fp.clone(), - )); - } - } - let tracking_info_exists = tracking_info.is_some(); - let process_ordinal = (tracking_info - .as_ref() - .map(|info| info.max_process_ordinal) - .unwrap_or(0) - + 1) - .max(process_timestamp.timestamp_millis()); - let existing_process_ordinal = tracking_info.as_ref().and_then(|info| info.process_ordinal); - - let mut tracking_info_for_targets = HashMap::::new(); - for (export_op, export_op_exec_ctx) in - std::iter::zip(export_ops.iter(), export_ops_exec_ctx.iter()) - { - tracking_info_for_targets - .entry(export_op_exec_ctx.target_id) - .or_default() - .export_op = Some(export_op); - } - - // Collect `tracking_info_for_targets` from existing tracking info. - if let Some(info) = tracking_info { - let sqlx::types::Json(staging_target_keys) = info.staging_target_keys; - for (target_id, keys_info) in staging_target_keys.into_iter() { - let target_info = tracking_info_for_targets.entry(target_id).or_default(); - for key_info in keys_info.into_iter() { - target_info - .existing_staging_keys_info - .entry(TargetKeyPair { - key: key_info.key, - additional_key: key_info.additional_key, - }) - .or_default() - .push((key_info.process_ordinal, key_info.fingerprint)); - } - } - - if let Some(sqlx::types::Json(target_keys)) = info.target_keys { - for (target_id, keys_info) in target_keys.into_iter() { - let target_info = tracking_info_for_targets.entry(target_id).or_default(); - for key_info in keys_info.into_iter() { - target_info - .existing_keys_info - .entry(TargetKeyPair { - key: key_info.key, - additional_key: key_info.additional_key, - }) - .or_default() - .push((key_info.process_ordinal, key_info.fingerprint)); - } - } - } - } - - let mut new_target_keys_info = db_tracking::TrackedTargetKeyForSource::default(); - if let Some(data) = &data { - for (export_op, export_op_exec_ctx) in - std::iter::zip(export_ops.iter(), export_ops_exec_ctx.iter()) - { - let target_info = tracking_info_for_targets - .entry(export_op_exec_ctx.target_id) - .or_default(); - let mut keys_info = Vec::new(); - let collected_values = - &data.evaluate_output.collected_values[export_op.input.collector_idx as usize]; - for value in collected_values.iter() { - let primary_key = extract_primary_key(&export_op.primary_key_def, value)?; - let primary_key_json = serde_json::to_value(&primary_key)?; - - let mut field_values = FieldValues { - fields: Vec::with_capacity(export_op.value_fields.len()), - }; - for field in export_op.value_fields.iter() { - field_values - .fields - .push(value.fields[*field as usize].clone()); - } - let additional_key = export_op.export_target_factory.extract_additional_key( - &primary_key, - &field_values, - export_op.export_context.as_ref(), - )?; - let target_key_pair = TargetKeyPair { - key: primary_key_json, - additional_key, - }; - let existing_target_keys = target_info.existing_keys_info.remove(&target_key_pair); - let existing_staging_target_keys = target_info - .existing_staging_keys_info - .remove(&target_key_pair); - - let curr_fp = if !export_op.value_stable { - Some( - Fingerprinter::default() - .with(&field_values)? - .into_fingerprint(), - ) - } else { - None - }; - if existing_target_keys - .as_ref() - .map(|keys| !keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp)) - .unwrap_or(false) - && existing_staging_target_keys - .map(|keys| keys.iter().all(|(_, fp)| fp == &curr_fp)) - .unwrap_or(true) - { - // Already exists, with exactly the same value fingerprint. - // Nothing need to be changed, except carrying over the existing target keys info. - let (existing_ordinal, existing_fp) = existing_target_keys - .ok_or_else(invariance_violation)? - .into_iter() - .next() - .ok_or_else(invariance_violation)?; - keys_info.push(TrackedTargetKeyInfo { - key: target_key_pair.key, - additional_key: target_key_pair.additional_key, - process_ordinal: existing_ordinal, - fingerprint: existing_fp, - }); - } else { - // Entry with new value. Needs to be upserted. - let tracked_target_key = TrackedTargetKeyInfo { - key: target_key_pair.key.clone(), - additional_key: target_key_pair.additional_key.clone(), - process_ordinal, - fingerprint: curr_fp, - }; - target_info.mutation.upserts.push(ExportTargetUpsertEntry { - key: primary_key, - additional_key: target_key_pair.additional_key, - value: field_values, - }); - target_info - .new_staging_keys_info - .push(tracked_target_key.clone()); - keys_info.push(tracked_target_key); - } - } - new_target_keys_info.push((export_op_exec_ctx.target_id, keys_info)); - } - } - - let mut new_staging_target_keys = db_tracking::TrackedTargetKeyForSource::default(); - let mut target_mutations = HashMap::with_capacity(export_ops.len()); - for (target_id, target_tracking_info) in tracking_info_for_targets.into_iter() { - let legacy_keys: HashSet = target_tracking_info - .existing_keys_info - .into_keys() - .chain(target_tracking_info.existing_staging_keys_info.into_keys()) - .collect(); - - let mut new_staging_keys_info = target_tracking_info.new_staging_keys_info; - // Add tracking info for deletions. - new_staging_keys_info.extend(legacy_keys.iter().map(|key| TrackedTargetKeyInfo { - key: key.key.clone(), - additional_key: key.additional_key.clone(), - process_ordinal, - fingerprint: None, - })); - new_staging_target_keys.push((target_id, new_staging_keys_info)); - - if let Some(export_op) = target_tracking_info.export_op { - let mut mutation = target_tracking_info.mutation; - mutation.deletes.reserve(legacy_keys.len()); - for legacy_key in legacy_keys.into_iter() { - let key = value::Value::::from_json( - legacy_key.key, - &export_op.primary_key_type, - )? - .as_key()?; - mutation.deletes.push(interface::ExportTargetDeleteEntry { - key, - additional_key: legacy_key.additional_key, - }); - } - target_mutations.insert(target_id, mutation); - } - } - - db_tracking::precommit_source_tracking_info( - source_id, - source_key_json, - process_ordinal, - new_staging_target_keys, - data.as_ref().map(|data| data.memoization_info), - db_setup, - &mut *txn, - if tracking_info_exists { - WriteAction::Update - } else { - WriteAction::Insert - }, - ) - .await?; - - txn.commit().await?; - - Ok(SkippedOr::Normal(PrecommitOutput { - metadata: PrecommitMetadata { - source_entry_exists: data.is_some(), - process_ordinal, - existing_process_ordinal, - new_target_keys: new_target_keys_info, - }, - target_mutations, - })) -} - -#[allow(clippy::too_many_arguments)] -async fn commit_source_tracking_info( - source_id: i32, - source_key_json: &serde_json::Value, - source_version: &SourceVersion, - source_fp: Option>, - logic_fingerprint: &[u8], - precommit_metadata: PrecommitMetadata, - process_timestamp: &chrono::DateTime, - db_setup: &db_tracking_setup::TrackingTableSetupState, - pool: &PgPool, -) -> Result<()> { - let mut txn = pool.begin().await?; - - let tracking_info = db_tracking::read_source_tracking_info_for_commit( - source_id, - source_key_json, - db_setup, - &mut *txn, - ) - .await?; - let tracking_info_exists = tracking_info.is_some(); - if tracking_info.as_ref().and_then(|info| info.process_ordinal) - >= Some(precommit_metadata.process_ordinal) - { - return Ok(()); - } - - let cleaned_staging_target_keys = tracking_info - .map(|info| { - let sqlx::types::Json(staging_target_keys) = info.staging_target_keys; - staging_target_keys - .into_iter() - .filter_map(|(target_id, target_keys)| { - let cleaned_target_keys: Vec<_> = target_keys - .into_iter() - .filter(|key_info| { - Some(key_info.process_ordinal) - > precommit_metadata.existing_process_ordinal - && key_info.process_ordinal != precommit_metadata.process_ordinal - }) - .collect(); - if !cleaned_target_keys.is_empty() { - Some((target_id, cleaned_target_keys)) - } else { - None - } - }) - .collect::>() - }) - .unwrap_or_default(); - if !precommit_metadata.source_entry_exists && cleaned_staging_target_keys.is_empty() { - // TODO: When we support distributed execution in the future, we'll need to leave a tombstone for a while - // to prevent an earlier update causing the record reappear because of out-of-order processing. - if tracking_info_exists { - db_tracking::delete_source_tracking_info( - source_id, - source_key_json, - db_setup, - &mut *txn, - ) - .await?; - } - } else { - db_tracking::commit_source_tracking_info( - source_id, - source_key_json, - cleaned_staging_target_keys, - source_version.ordinal.into(), - source_fp, - logic_fingerprint, - precommit_metadata.process_ordinal, - process_timestamp.timestamp_micros(), - precommit_metadata.new_target_keys, - db_setup, - &mut *txn, - if tracking_info_exists { - WriteAction::Update - } else { - WriteAction::Insert - }, - ) - .await?; - } - - txn.commit().await?; - - Ok(()) -} - pub struct RowIndexer<'a> { src_eval_ctx: &'a SourceRowEvaluationContext<'a>, setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext, @@ -648,23 +315,15 @@ impl<'a> RowIndexer<'a> { }; // Phase 2 (precommit): Update with the memoization info and stage target keys. - let precommit_output = precommit_source_tracking_info( - self.source_id, - &self.source_key_json, - source_version, - self.src_eval_ctx.plan.logic_fingerprint, - output.as_ref().map(|scope_value| PrecommitData { - evaluate_output: scope_value, - memoization_info: &stored_mem_info, - }), - &self.process_time, - &self.setup_execution_ctx.setup_state.tracking_table, - &self.src_eval_ctx.plan.export_ops, - &self.setup_execution_ctx.export_ops, - self.update_stats, - self.pool, - ) - .await?; + let precommit_output = self + .precommit_source_tracking_info( + source_version, + output.as_ref().map(|scope_value| PrecommitData { + evaluate_output: scope_value, + memoization_info: &stored_mem_info, + }), + ) + .await?; let precommit_output = match precommit_output { SkippedOr::Normal(output) => output, SkippedOr::Skipped(v, fp) => return Ok(SkippedOr::Skipped(v, fp)), @@ -705,18 +364,8 @@ impl<'a> RowIndexer<'a> { try_join_all(apply_futs).await?; // Phase 4: Update the tracking record. - commit_source_tracking_info( - self.source_id, - &self.source_key_json, - source_version, - source_fp, - &self.src_eval_ctx.plan.logic_fingerprint.0, - precommit_output.metadata, - &self.process_time, - tracking_setup_state, - self.pool, - ) - .await?; + self.commit_source_tracking_info(source_version, source_fp, precommit_output.metadata) + .await?; if let Some(existing_version) = existing_version { if output.is_some() { @@ -838,6 +487,329 @@ impl<'a> RowIndexer<'a> { self.update_stats.num_no_change.inc(1); Ok(Some(SkippedOr::Normal(()))) } + + async fn precommit_source_tracking_info( + &self, + source_version: &SourceVersion, + data: Option>, + ) -> Result> { + let db_setup = &self.setup_execution_ctx.setup_state.tracking_table; + let export_ops = &self.src_eval_ctx.plan.export_ops; + let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops; + let logic_fp = self.src_eval_ctx.plan.logic_fingerprint; + + let mut txn = self.pool.begin().await?; + + let tracking_info = db_tracking::read_source_tracking_info_for_precommit( + self.source_id, + &self.source_key_json, + db_setup, + &mut *txn, + ) + .await?; + if let Some(tracking_info) = &tracking_info { + let existing_source_version = + SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp); + if existing_source_version.should_skip(source_version, Some(self.update_stats)) { + return Ok(SkippedOr::Skipped( + existing_source_version, + tracking_info.processed_source_fp.clone(), + )); + } + } + let tracking_info_exists = tracking_info.is_some(); + let process_ordinal = (tracking_info + .as_ref() + .map(|info| info.max_process_ordinal) + .unwrap_or(0) + + 1) + .max(self.process_time.timestamp_millis()); + let existing_process_ordinal = tracking_info.as_ref().and_then(|info| info.process_ordinal); + + let mut tracking_info_for_targets = HashMap::::new(); + for (export_op, export_op_exec_ctx) in + std::iter::zip(export_ops.iter(), export_ops_exec_ctx.iter()) + { + tracking_info_for_targets + .entry(export_op_exec_ctx.target_id) + .or_default() + .export_op = Some(export_op); + } + + // Collect from existing tracking info. + if let Some(info) = tracking_info { + let sqlx::types::Json(staging_target_keys) = info.staging_target_keys; + for (target_id, keys_info) in staging_target_keys.into_iter() { + let target_info = tracking_info_for_targets.entry(target_id).or_default(); + for key_info in keys_info.into_iter() { + target_info + .existing_staging_keys_info + .entry(TargetKeyPair { + key: key_info.key, + additional_key: key_info.additional_key, + }) + .or_default() + .push((key_info.process_ordinal, key_info.fingerprint)); + } + } + + if let Some(sqlx::types::Json(target_keys)) = info.target_keys { + for (target_id, keys_info) in target_keys.into_iter() { + let target_info = tracking_info_for_targets.entry(target_id).or_default(); + for key_info in keys_info.into_iter() { + target_info + .existing_keys_info + .entry(TargetKeyPair { + key: key_info.key, + additional_key: key_info.additional_key, + }) + .or_default() + .push((key_info.process_ordinal, key_info.fingerprint)); + } + } + } + } + + let mut new_target_keys_info = db_tracking::TrackedTargetKeyForSource::default(); + if let Some(data) = &data { + for (export_op, export_op_exec_ctx) in + std::iter::zip(export_ops.iter(), export_ops_exec_ctx.iter()) + { + let target_info = tracking_info_for_targets + .entry(export_op_exec_ctx.target_id) + .or_default(); + let mut keys_info = Vec::new(); + let collected_values = + &data.evaluate_output.collected_values[export_op.input.collector_idx as usize]; + for value in collected_values.iter() { + let primary_key = extract_primary_key(&export_op.primary_key_def, value)?; + let primary_key_json = serde_json::to_value(&primary_key)?; + + let mut field_values = FieldValues { + fields: Vec::with_capacity(export_op.value_fields.len()), + }; + for field in export_op.value_fields.iter() { + field_values + .fields + .push(value.fields[*field as usize].clone()); + } + let additional_key = export_op.export_target_factory.extract_additional_key( + &primary_key, + &field_values, + export_op.export_context.as_ref(), + )?; + let target_key_pair = TargetKeyPair { + key: primary_key_json, + additional_key, + }; + let existing_target_keys = + target_info.existing_keys_info.remove(&target_key_pair); + let existing_staging_target_keys = target_info + .existing_staging_keys_info + .remove(&target_key_pair); + + let curr_fp = if !export_op.value_stable { + Some( + Fingerprinter::default() + .with(&field_values)? + .into_fingerprint(), + ) + } else { + None + }; + if existing_target_keys + .as_ref() + .map(|keys| !keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp)) + .unwrap_or(false) + && existing_staging_target_keys + .map(|keys| keys.iter().all(|(_, fp)| fp == &curr_fp)) + .unwrap_or(true) + { + // carry over existing target keys info + let (existing_ordinal, existing_fp) = existing_target_keys + .ok_or_else(invariance_violation)? + .into_iter() + .next() + .ok_or_else(invariance_violation)?; + keys_info.push(TrackedTargetKeyInfo { + key: target_key_pair.key, + additional_key: target_key_pair.additional_key, + process_ordinal: existing_ordinal, + fingerprint: existing_fp, + }); + } else { + // new value, upsert + let tracked_target_key = TrackedTargetKeyInfo { + key: target_key_pair.key.clone(), + additional_key: target_key_pair.additional_key.clone(), + process_ordinal, + fingerprint: curr_fp, + }; + target_info.mutation.upserts.push(ExportTargetUpsertEntry { + key: primary_key, + additional_key: target_key_pair.additional_key, + value: field_values, + }); + target_info + .new_staging_keys_info + .push(tracked_target_key.clone()); + keys_info.push(tracked_target_key); + } + } + new_target_keys_info.push((export_op_exec_ctx.target_id, keys_info)); + } + } + + let mut new_staging_target_keys = db_tracking::TrackedTargetKeyForSource::default(); + let mut target_mutations = HashMap::with_capacity(export_ops.len()); + for (target_id, target_tracking_info) in tracking_info_for_targets.into_iter() { + let legacy_keys: HashSet = target_tracking_info + .existing_keys_info + .into_keys() + .chain(target_tracking_info.existing_staging_keys_info.into_keys()) + .collect(); + + let mut new_staging_keys_info = target_tracking_info.new_staging_keys_info; + // add deletions + new_staging_keys_info.extend(legacy_keys.iter().map(|key| TrackedTargetKeyInfo { + key: key.key.clone(), + additional_key: key.additional_key.clone(), + process_ordinal, + fingerprint: None, + })); + new_staging_target_keys.push((target_id, new_staging_keys_info)); + + if let Some(export_op) = target_tracking_info.export_op { + let mut mutation = target_tracking_info.mutation; + mutation.deletes.reserve(legacy_keys.len()); + for legacy_key in legacy_keys.into_iter() { + let key = value::Value::::from_json( + legacy_key.key, + &export_op.primary_key_type, + )? + .as_key()?; + mutation.deletes.push(interface::ExportTargetDeleteEntry { + key, + additional_key: legacy_key.additional_key, + }); + } + target_mutations.insert(target_id, mutation); + } + } + + db_tracking::precommit_source_tracking_info( + self.source_id, + &self.source_key_json, + process_ordinal, + new_staging_target_keys, + data.as_ref().map(|data| data.memoization_info), + db_setup, + &mut *txn, + if tracking_info_exists { + WriteAction::Update + } else { + WriteAction::Insert + }, + ) + .await?; + + txn.commit().await?; + + Ok(SkippedOr::Normal(PrecommitOutput { + metadata: PrecommitMetadata { + source_entry_exists: data.is_some(), + process_ordinal, + existing_process_ordinal, + new_target_keys: new_target_keys_info, + }, + target_mutations, + })) + } + + async fn commit_source_tracking_info( + &self, + source_version: &SourceVersion, + source_fp: Option>, + precommit_metadata: PrecommitMetadata, + ) -> Result<()> { + let db_setup = &self.setup_execution_ctx.setup_state.tracking_table; + let mut txn = self.pool.begin().await?; + + let tracking_info = db_tracking::read_source_tracking_info_for_commit( + self.source_id, + &self.source_key_json, + db_setup, + &mut *txn, + ) + .await?; + let tracking_info_exists = tracking_info.is_some(); + if tracking_info.as_ref().and_then(|info| info.process_ordinal) + >= Some(precommit_metadata.process_ordinal) + { + return Ok(()); + } + + let cleaned_staging_target_keys = tracking_info + .map(|info| { + let sqlx::types::Json(staging_target_keys) = info.staging_target_keys; + staging_target_keys + .into_iter() + .filter_map(|(target_id, target_keys)| { + let cleaned_target_keys: Vec<_> = target_keys + .into_iter() + .filter(|key_info| { + Some(key_info.process_ordinal) + > precommit_metadata.existing_process_ordinal + && key_info.process_ordinal + != precommit_metadata.process_ordinal + }) + .collect(); + if !cleaned_target_keys.is_empty() { + Some((target_id, cleaned_target_keys)) + } else { + None + } + }) + .collect::>() + }) + .unwrap_or_default(); + if !precommit_metadata.source_entry_exists && cleaned_staging_target_keys.is_empty() { + // delete tracking if no source and no staged keys + if tracking_info_exists { + db_tracking::delete_source_tracking_info( + self.source_id, + &self.source_key_json, + db_setup, + &mut *txn, + ) + .await?; + } + } else { + db_tracking::commit_source_tracking_info( + self.source_id, + &self.source_key_json, + cleaned_staging_target_keys, + source_version.ordinal.into(), + source_fp, + &self.src_eval_ctx.plan.logic_fingerprint.0, + precommit_metadata.process_ordinal, + self.process_time.timestamp_micros(), + precommit_metadata.new_target_keys, + db_setup, + &mut *txn, + if tracking_info_exists { + WriteAction::Update + } else { + WriteAction::Insert + }, + ) + .await?; + } + + txn.commit().await?; + + Ok(()) + } } pub async fn evaluate_source_entry_with_memory(