diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index 09f7c95e7..47af1ae6b 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -288,6 +288,9 @@ pub fn build_flow_setup_execution_context( db_tracking_setup::default_source_state_table_name(&flow_inst.name) }) }), + has_fast_fingerprint_column: metadata + .features + .contains(setup::flow_features::FAST_FINGERPRINT), }, targets: target_states, metadata, diff --git a/src/execution/db_tracking.rs b/src/execution/db_tracking.rs index 5e11f0182..89d2343e2 100644 --- a/src/execution/db_tracking.rs +++ b/src/execution/db_tracking.rs @@ -82,6 +82,7 @@ pub struct SourceTrackingInfoForProcessing { pub memoization_info: Option>>, pub processed_source_ordinal: Option, + pub processed_source_fp: Option>, pub process_logic_fingerprint: Option>, pub max_process_ordinal: Option, pub process_ordinal: Option, @@ -94,7 +95,12 @@ pub async fn read_source_tracking_info_for_processing( pool: &PgPool, ) -> Result> { let query_str = format!( - "SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2", + "SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2", + if db_setup.has_fast_fingerprint_column { + "processed_source_fp" + } else { + "NULL::bytea AS processed_source_fp" + }, db_setup.table_name ); let tracking_info = sqlx::query_as(&query_str) @@ -198,6 +204,7 @@ pub async fn commit_source_tracking_info( source_key_json: &serde_json::Value, staging_target_keys: TrackedTargetKeyForSource, processed_source_ordinal: Option, + processed_source_fp: Option>, logic_fingerprint: &[u8], process_ordinal: i64, process_time_micros: i64, @@ -211,16 +218,31 @@ pub async fn commit_source_tracking_info( "INSERT INTO {} ( \ source_id, source_key, \ max_process_ordinal, staging_target_keys, \ - processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys) \ - VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8)", - db_setup.table_name + processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \ + VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})", + db_setup.table_name, + if db_setup.has_fast_fingerprint_column { + ", processed_source_fp" + } else { + "" + }, + if db_setup.has_fast_fingerprint_column { + ", $9" + } else { + "" + }, ), WriteAction::Update => format!( - "UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8 WHERE source_id = $1 AND source_key = $2", - db_setup.table_name + "UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2", + db_setup.table_name, + if db_setup.has_fast_fingerprint_column { + ", processed_source_fp = $9" + } else { + "" + }, ), }; - sqlx::query(&query_str) + let mut query = sqlx::query(&query_str) .bind(source_id) // $1 .bind(source_key_json) // $2 .bind(sqlx::types::Json(staging_target_keys)) // $3 @@ -228,9 +250,13 @@ pub async fn commit_source_tracking_info( .bind(logic_fingerprint) // $5 .bind(process_ordinal) // $6 .bind(process_time_micros) // $7 - .bind(sqlx::types::Json(target_keys)) // $8 - .execute(db_executor) - .await?; + .bind(sqlx::types::Json(target_keys)); // $8 + + if db_setup.has_fast_fingerprint_column { + query = query.bind(processed_source_fp); // $9 + } + query.execute(db_executor).await?; + Ok(()) } diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index aacd48c4e..b9a64514c 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -22,12 +22,16 @@ pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1; async fn upgrade_tracking_table( pool: &PgPool, - table_name: &str, + desired_state: &TrackingTableSetupState, existing_version_id: i32, - target_version_id: i32, ) -> Result<()> { - if existing_version_id < 1 && target_version_id >= 1 { - let query = format!( + if existing_version_id < 1 && desired_state.version_id >= 1 { + let table_name = &desired_state.table_name; + let opt_fast_fingerprint_column = desired_state + .has_fast_fingerprint_column + .then(|| "processed_source_fp BYTEA,") + .unwrap_or(""); + let query = format!( "CREATE TABLE IF NOT EXISTS {table_name} ( source_id INTEGER NOT NULL, source_key JSONB NOT NULL, @@ -39,6 +43,7 @@ async fn upgrade_tracking_table( -- Update after applying the changes to the target storage. processed_source_ordinal BIGINT, + {opt_fast_fingerprint_column} process_logic_fingerprint BYTEA, process_ordinal BIGINT, process_time_micros BIGINT, @@ -73,6 +78,8 @@ pub struct TrackingTableSetupState { pub version_id: i32, #[serde(default)] pub source_state_table_name: Option, + #[serde(default)] + pub has_fast_fingerprint_column: bool, } #[derive(Debug)] @@ -248,13 +255,8 @@ impl TrackingTableSetupChange { } if self.min_existing_version_id != Some(desired.version_id) { - upgrade_tracking_table( - pool, - &desired.table_name, - self.min_existing_version_id.unwrap_or(0), - desired.version_id, - ) - .await?; + upgrade_tracking_table(pool, desired, self.min_existing_version_id.unwrap_or(0)) + .await?; } } else { for lagacy_name in self.legacy_tracking_table_names.iter() { diff --git a/src/execution/memoization.rs b/src/execution/memoization.rs index 4b8f77205..c90def8b6 100644 --- a/src/execution/memoization.rs +++ b/src/execution/memoization.rs @@ -26,8 +26,9 @@ pub struct StoredMemoizationInfo { #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub uuids: HashMap>, + /// TO BE DEPRECATED. Use the new `processed_source_fp` column instead. #[serde(default, skip_serializing_if = "Option::is_none")] - pub content_hash: Option, + pub content_hash: Option, } pub type CacheEntryCell = Arc>>; diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 6ee6ed2c1..510f86eb7 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -1,5 +1,7 @@ use crate::prelude::*; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; use futures::future::try_join_all; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; @@ -419,6 +421,7 @@ async fn commit_source_tracking_info( source_id: i32, source_key_json: &serde_json::Value, source_version: &SourceVersion, + source_fp: Option>, logic_fingerprint: &[u8], precommit_metadata: PrecommitMetadata, process_timestamp: &chrono::DateTime, @@ -482,6 +485,7 @@ async fn commit_source_tracking_info( source_key_json, cleaned_staging_target_keys, source_version.ordinal.into(), + source_fp, logic_fingerprint, precommit_metadata.process_ordinal, process_timestamp.timestamp_micros(), @@ -508,7 +512,7 @@ async fn try_content_hash_optimization( src_eval_ctx: &SourceRowEvaluationContext<'_>, source_key_json: &serde_json::Value, source_version: &SourceVersion, - current_hash: &crate::utils::fingerprint::Fingerprint, + current_hash: &[u8], tracking_info: &db_tracking::SourceTrackingInfoForProcessing, existing_version: &Option, db_setup: &db_tracking_setup::TrackingTableSetupState, @@ -523,21 +527,31 @@ async fn try_content_hash_optimization( return Ok(None); } - if tracking_info - .max_process_ordinal - .zip(tracking_info.process_ordinal) - .is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord) - { - return Ok(None); - } + let existing_hash: Option>> = if db_setup.has_fast_fingerprint_column { + tracking_info + .processed_source_fp + .as_ref() + .map(|fp| Cow::Borrowed(fp)) + } else { + if tracking_info + .max_process_ordinal + .zip(tracking_info.process_ordinal) + .is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord) + { + return Ok(None); + } - let existing_hash = tracking_info - .memoization_info - .as_ref() - .and_then(|info| info.0.as_ref()) - .and_then(|stored_info| stored_info.content_hash.as_ref()); + tracking_info + .memoization_info + .as_ref() + .and_then(|info| info.0.as_ref()) + .and_then(|stored_info| stored_info.content_hash.as_ref()) + .map(|content_hash| BASE64_STANDARD.decode(content_hash)) + .transpose()? + .map(Cow::Owned) + }; - if existing_hash != Some(current_hash) { + if existing_hash.as_ref().map(|fp| fp.as_slice()) != Some(current_hash) { return Ok(None); } @@ -641,6 +655,8 @@ pub async fn update_source_row( pool: &PgPool, update_stats: &stats::UpdateStats, ) -> Result> { + let tracking_setup_state = &setup_execution_ctx.setup_state.tracking_table; + let source_key_json = serde_json::to_value(src_eval_ctx.key)?; let process_time = chrono::Utc::now(); let source_id = setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id; @@ -689,10 +705,10 @@ pub async fn update_source_row( src_eval_ctx, &source_key_json, source_version, - current_hash, + current_hash.as_slice(), existing_tracking_info, &existing_version, - &setup_execution_ctx.setup_state.tracking_table, + tracking_setup_state, update_stats, pool, ) @@ -702,7 +718,7 @@ pub async fn update_source_row( } } - let (output, stored_mem_info) = { + let (output, stored_mem_info, source_fp) = { let extracted_memoization_info = existing_tracking_info .and_then(|info| info.memoization_info) .and_then(|info| info.0); @@ -721,11 +737,15 @@ pub async fn update_source_row( let output = evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?; let mut stored_info = evaluation_memory.into_stored()?; - stored_info.content_hash = current_content_hash; - - (Some(output), stored_info) + let content_hash = current_content_hash.map(|fp| fp.0.to_vec()); + if tracking_setup_state.has_fast_fingerprint_column { + (Some(output), stored_info, content_hash) + } else { + stored_info.content_hash = content_hash.map(|fp| BASE64_STANDARD.encode(fp)); + (Some(output), stored_info, None) + } } - interface::SourceValue::NonExistence => (None, Default::default()), + interface::SourceValue::NonExistence => (None, Default::default(), None), } }; @@ -788,10 +808,11 @@ pub async fn update_source_row( source_id, &source_key_json, source_version, + source_fp, &src_eval_ctx.plan.logic_fingerprint.0, precommit_output.metadata, &process_time, - &setup_execution_ctx.setup_state.tracking_table, + tracking_setup_state, pool, ) .await?; diff --git a/src/setup/flow_features.rs b/src/setup/flow_features.rs index 52851527e..b14350717 100644 --- a/src/setup/flow_features.rs +++ b/src/setup/flow_features.rs @@ -1,7 +1,8 @@ use crate::prelude::*; pub const SOURCE_STATE_TABLE: &str = "source_state_table"; +pub const FAST_FINGERPRINT: &str = "fast_fingerprint"; pub fn default_features() -> BTreeSet { - BTreeSet::new() + BTreeSet::from_iter([FAST_FINGERPRINT.to_string()]) } diff --git a/src/utils/fingerprint.rs b/src/utils/fingerprint.rs index 19d844dfc..97a1d6e39 100644 --- a/src/utils/fingerprint.rs +++ b/src/utils/fingerprint.rs @@ -51,6 +51,10 @@ impl Fingerprint { Err(e) => bail!("Fingerprint bytes length is unexpected: {}", e.len()), } } + + pub fn as_slice(&self) -> &[u8] { + &self.0 + } } impl Serialize for Fingerprint {