@@ -478,22 +478,27 @@ pub async fn update_source_entry(
478478 )
479479 . await ?;
480480 let already_exists = existing_tracking_info. is_some ( ) ;
481- let ( existing_source_ordinal, memoization_info) = match existing_tracking_info {
482- Some ( info) => (
483- info. processed_source_ordinal . map ( Ordinal ) ,
484- info. memoization_info . map ( |info| info. 0 ) . flatten ( ) ,
485- ) ,
486- None => ( None , None ) ,
487- } ;
481+ let ( existing_source_ordinal, existing_logic_fingerprint, memoization_info) =
482+ match existing_tracking_info {
483+ Some ( info) => (
484+ info. processed_source_ordinal . map ( Ordinal ) ,
485+ info. process_logic_fingerprint ,
486+ info. memoization_info . map ( |info| info. 0 ) . flatten ( ) ,
487+ ) ,
488+ None => Default :: default ( ) ,
489+ } ;
488490 let ( source_ordinal, output, stored_mem_info) = if !only_for_deletion {
489491 let source_data = source_op. executor . get_value ( key) . await ?;
490492 let source_ordinal = source_data. as_ref ( ) . and_then ( |d| d. ordinal ) ;
491493 match ( source_ordinal, existing_source_ordinal) {
492494 // TODO: Collapse if the source is not newer and the processing logic is not changed.
493- ( Some ( source_ordinal) , Some ( existing_source_ordinal) )
494- if source_ordinal <= existing_source_ordinal =>
495- {
496- return Ok ( ( ) ) ;
495+ ( Some ( source_ordinal) , Some ( existing_source_ordinal) ) => {
496+ if source_ordinal < existing_source_ordinal
497+ || ( source_ordinal == existing_source_ordinal
498+ && existing_logic_fingerprint == source_op. )
499+ {
500+ return Ok ( ( ) ) ;
501+ }
497502 }
498503 _ => { }
499504 }
0 commit comments