11use crate :: prelude:: * ;
22
3+ use base64:: Engine ;
4+ use base64:: prelude:: BASE64_STANDARD ;
35use futures:: future:: try_join_all;
46use sqlx:: PgPool ;
57use std:: collections:: { HashMap , HashSet } ;
@@ -419,6 +421,7 @@ async fn commit_source_tracking_info(
419421 source_id : i32 ,
420422 source_key_json : & serde_json:: Value ,
421423 source_version : & SourceVersion ,
424+ source_fp : Option < Vec < u8 > > ,
422425 logic_fingerprint : & [ u8 ] ,
423426 precommit_metadata : PrecommitMetadata ,
424427 process_timestamp : & chrono:: DateTime < chrono:: Utc > ,
@@ -482,6 +485,7 @@ async fn commit_source_tracking_info(
482485 source_key_json,
483486 cleaned_staging_target_keys,
484487 source_version. ordinal . into ( ) ,
488+ source_fp,
485489 logic_fingerprint,
486490 precommit_metadata. process_ordinal ,
487491 process_timestamp. timestamp_micros ( ) ,
@@ -508,7 +512,7 @@ async fn try_content_hash_optimization(
508512 src_eval_ctx : & SourceRowEvaluationContext < ' _ > ,
509513 source_key_json : & serde_json:: Value ,
510514 source_version : & SourceVersion ,
511- current_hash : & crate :: utils :: fingerprint :: Fingerprint ,
515+ current_hash : & [ u8 ] ,
512516 tracking_info : & db_tracking:: SourceTrackingInfoForProcessing ,
513517 existing_version : & Option < SourceVersion > ,
514518 db_setup : & db_tracking_setup:: TrackingTableSetupState ,
@@ -523,21 +527,31 @@ async fn try_content_hash_optimization(
523527 return Ok ( None ) ;
524528 }
525529
526- if tracking_info
527- . max_process_ordinal
528- . zip ( tracking_info. process_ordinal )
529- . is_none_or ( |( max_ord, proc_ord) | max_ord != proc_ord)
530- {
531- return Ok ( None ) ;
532- }
530+ let existing_hash: Option < Cow < ' _ , Vec < u8 > > > = if db_setup. has_fast_fingerprint_column {
531+ tracking_info
532+ . processed_source_fp
533+ . as_ref ( )
534+ . map ( |fp| Cow :: Borrowed ( fp) )
535+ } else {
536+ if tracking_info
537+ . max_process_ordinal
538+ . zip ( tracking_info. process_ordinal )
539+ . is_none_or ( |( max_ord, proc_ord) | max_ord != proc_ord)
540+ {
541+ return Ok ( None ) ;
542+ }
533543
534- let existing_hash = tracking_info
535- . memoization_info
536- . as_ref ( )
537- . and_then ( |info| info. 0 . as_ref ( ) )
538- . and_then ( |stored_info| stored_info. content_hash . as_ref ( ) ) ;
544+ tracking_info
545+ . memoization_info
546+ . as_ref ( )
547+ . and_then ( |info| info. 0 . as_ref ( ) )
548+ . and_then ( |stored_info| stored_info. content_hash . as_ref ( ) )
549+ . map ( |content_hash| BASE64_STANDARD . decode ( content_hash) )
550+ . transpose ( ) ?
551+ . map ( Cow :: Owned )
552+ } ;
539553
540- if existing_hash != Some ( current_hash) {
554+ if existing_hash. as_ref ( ) . map ( |fp| fp . as_slice ( ) ) != Some ( current_hash) {
541555 return Ok ( None ) ;
542556 }
543557
@@ -641,6 +655,8 @@ pub async fn update_source_row(
641655 pool : & PgPool ,
642656 update_stats : & stats:: UpdateStats ,
643657) -> Result < SkippedOr < ( ) > > {
658+ let tracking_setup_state = & setup_execution_ctx. setup_state . tracking_table ;
659+
644660 let source_key_json = serde_json:: to_value ( src_eval_ctx. key ) ?;
645661 let process_time = chrono:: Utc :: now ( ) ;
646662 let source_id = setup_execution_ctx. import_ops [ src_eval_ctx. import_op_idx ] . source_id ;
@@ -689,10 +705,10 @@ pub async fn update_source_row(
689705 src_eval_ctx,
690706 & source_key_json,
691707 source_version,
692- current_hash,
708+ current_hash. as_slice ( ) ,
693709 existing_tracking_info,
694710 & existing_version,
695- & setup_execution_ctx . setup_state . tracking_table ,
711+ tracking_setup_state ,
696712 update_stats,
697713 pool,
698714 )
@@ -702,7 +718,7 @@ pub async fn update_source_row(
702718 }
703719 }
704720
705- let ( output, stored_mem_info) = {
721+ let ( output, stored_mem_info, source_fp ) = {
706722 let extracted_memoization_info = existing_tracking_info
707723 . and_then ( |info| info. memoization_info )
708724 . and_then ( |info| info. 0 ) ;
@@ -721,11 +737,15 @@ pub async fn update_source_row(
721737 let output =
722738 evaluate_source_entry ( src_eval_ctx, source_value, & evaluation_memory) . await ?;
723739 let mut stored_info = evaluation_memory. into_stored ( ) ?;
724- stored_info. content_hash = current_content_hash;
725-
726- ( Some ( output) , stored_info)
740+ let content_hash = current_content_hash. map ( |fp| fp. 0 . to_vec ( ) ) ;
741+ if tracking_setup_state. has_fast_fingerprint_column {
742+ ( Some ( output) , stored_info, content_hash)
743+ } else {
744+ stored_info. content_hash = content_hash. map ( |fp| BASE64_STANDARD . encode ( fp) ) ;
745+ ( Some ( output) , stored_info, None )
746+ }
727747 }
728- interface:: SourceValue :: NonExistence => ( None , Default :: default ( ) ) ,
748+ interface:: SourceValue :: NonExistence => ( None , Default :: default ( ) , None ) ,
729749 }
730750 } ;
731751
@@ -788,10 +808,11 @@ pub async fn update_source_row(
788808 source_id,
789809 & source_key_json,
790810 source_version,
811+ source_fp,
791812 & src_eval_ctx. plan . logic_fingerprint . 0 ,
792813 precommit_output. metadata ,
793814 & process_time,
794- & setup_execution_ctx . setup_state . tracking_table ,
815+ tracking_setup_state ,
795816 pool,
796817 )
797818 . await ?;
0 commit comments