1- use anyhow:: Result ;
1+ use crate :: prelude:: * ;
2+
23use futures:: future:: { join, join_all, try_join, try_join_all} ;
34use itertools:: Itertools ;
45use log:: error;
@@ -13,7 +14,7 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz
1314use crate :: base:: schema;
1415use crate :: base:: value:: { self , FieldValues , KeyValue } ;
1516use crate :: builder:: plan:: * ;
16- use crate :: ops:: interface:: { ExportTargetMutation , ExportTargetUpsertEntry } ;
17+ use crate :: ops:: interface:: { ExportTargetMutation , ExportTargetUpsertEntry , Ordinal } ;
1718use crate :: utils:: db:: WriteAction ;
1819use crate :: utils:: fingerprint:: { Fingerprint , Fingerprinter } ;
1920
@@ -93,9 +94,9 @@ pub fn extract_primary_key(
9394 Ok ( key)
9495}
9596
96- enum WithApplyStatus < T = ( ) > {
97+ pub enum UnchangedOr < T > {
9798 Normal ( T ) ,
98- Collapsed ,
99+ Unchanged ,
99100}
100101
101102#[ derive( Default ) ]
@@ -140,7 +141,7 @@ async fn precommit_source_tracking_info(
140141 db_setup : & db_tracking_setup:: TrackingTableSetupState ,
141142 export_ops : & [ AnalyzedExportOp ] ,
142143 pool : & PgPool ,
143- ) -> Result < WithApplyStatus < PrecommitOutput > > {
144+ ) -> Result < UnchangedOr < PrecommitOutput > > {
144145 let mut txn = pool. begin ( ) . await ?;
145146
146147 let tracking_info = db_tracking:: read_source_tracking_info_for_precommit (
@@ -157,7 +158,7 @@ async fn precommit_source_tracking_info(
157158 . and_then ( |info| info. processed_source_ordinal )
158159 > source_ordinal
159160 {
160- return Ok ( WithApplyStatus :: Collapsed ) ;
161+ return Ok ( UnchangedOr :: Unchanged ) ;
161162 }
162163 let process_ordinal = ( tracking_info
163164 . as_ref ( )
@@ -323,7 +324,7 @@ async fn precommit_source_tracking_info(
323324
324325 txn. commit ( ) . await ?;
325326
326- Ok ( WithApplyStatus :: Normal ( PrecommitOutput {
327+ Ok ( UnchangedOr :: Normal ( PrecommitOutput {
327328 metadata : PrecommitMetadata {
328329 source_entry_exists : data. is_some ( ) ,
329330 process_ordinal,
@@ -343,7 +344,7 @@ async fn commit_source_tracking_info(
343344 process_timestamp : & chrono:: DateTime < chrono:: Utc > ,
344345 db_setup : & db_tracking_setup:: TrackingTableSetupState ,
345346 pool : & PgPool ,
346- ) -> Result < WithApplyStatus < ( ) > > {
347+ ) -> Result < UnchangedOr < ( ) > > {
347348 let mut txn = pool. begin ( ) . await ?;
348349
349350 let tracking_info = db_tracking:: read_source_tracking_info_for_commit (
@@ -357,7 +358,7 @@ async fn commit_source_tracking_info(
357358 if tracking_info. as_ref ( ) . and_then ( |info| info. process_ordinal )
358359 >= Some ( precommit_metadata. process_ordinal )
359360 {
360- return Ok ( WithApplyStatus :: Collapsed ) ;
361+ return Ok ( UnchangedOr :: Unchanged ) ;
361362 }
362363
363364 let cleaned_staging_target_keys = tracking_info
@@ -417,7 +418,7 @@ async fn commit_source_tracking_info(
417418
418419 txn. commit ( ) . await ?;
419420
420- Ok ( WithApplyStatus :: Normal ( ( ) ) )
421+ Ok ( UnchangedOr :: Normal ( ( ) ) )
421422}
422423
423424pub async fn evaluate_source_entry_with_memory (
@@ -444,8 +445,16 @@ pub async fn evaluate_source_entry_with_memory(
444445 None
445446 } ;
446447 let memory = EvaluationMemory :: new ( chrono:: Utc :: now ( ) , stored_info, options) ;
447- let data_builder = evaluate_source_entry ( plan, source_op, schema, key, & memory) . await ?;
448- Ok ( data_builder)
448+ let source_data = match source_op. executor . get_value ( key) . await ? {
449+ Some ( d) => d,
450+ None => return Ok ( None ) ,
451+ } ;
452+ let source_value = match source_data. value . await ? {
453+ Some ( value) => value,
454+ None => return Ok ( None ) ,
455+ } ;
456+ let output = evaluate_source_entry ( plan, source_op, schema, key, source_value, & memory) . await ?;
457+ Ok ( Some ( output) )
449458}
450459
451460pub async fn update_source_entry (
@@ -461,8 +470,6 @@ pub async fn update_source_entry(
461470 let process_timestamp = chrono:: Utc :: now ( ) ;
462471
463472 // Phase 1: Evaluate with memoization info.
464-
465- // TODO: Skip if the source is not newer and the processing logic is not changed.
466473 let existing_tracking_info = read_source_tracking_info (
467474 source_op. source_id ,
468475 & source_key_json,
@@ -471,66 +478,89 @@ pub async fn update_source_entry(
471478 )
472479 . await ?;
473480 let already_exists = existing_tracking_info. is_some ( ) ;
474- let memoization_info = existing_tracking_info
475- . and_then ( |info| info. memoization_info . map ( |info| info. 0 ) )
476- . flatten ( ) ;
477- let evaluation_memory = EvaluationMemory :: new (
478- process_timestamp,
479- memoization_info,
480- EvaluationMemoryOptions {
481- enable_cache : true ,
482- evaluation_only : false ,
483- } ,
484- ) ;
485- let value_builder = if !only_for_deletion {
486- evaluate_source_entry ( plan, source_op, schema, key, & evaluation_memory) . await ?
481+ let ( existing_source_ordinal, memoization_info) = match existing_tracking_info {
482+ Some ( info) => (
483+ info. processed_source_ordinal . map ( Ordinal ) ,
484+ info. memoization_info . map ( |info| info. 0 ) . flatten ( ) ,
485+ ) ,
486+ None => ( None , None ) ,
487+ } ;
488+ let ( source_ordinal, output, stored_mem_info) = if !only_for_deletion {
489+ let source_data = source_op. executor . get_value ( key) . await ?;
490+ let source_ordinal = source_data. as_ref ( ) . and_then ( |d| d. ordinal ) ;
491+ match ( source_ordinal, existing_source_ordinal) {
492+ // TODO: Collapse if the source is not newer and the processing logic is not changed.
493+ ( Some ( source_ordinal) , Some ( existing_source_ordinal) )
494+ if source_ordinal <= existing_source_ordinal =>
495+ {
496+ return Ok ( ( ) ) ;
497+ }
498+ _ => { }
499+ }
500+ let source_value = match source_data {
501+ Some ( d) => d. value . await ?,
502+ None => None ,
503+ } ;
504+ match source_value {
505+ Some ( source_value) => {
506+ let evaluation_memory = EvaluationMemory :: new (
507+ process_timestamp,
508+ memoization_info,
509+ EvaluationMemoryOptions {
510+ enable_cache : true ,
511+ evaluation_only : false ,
512+ } ,
513+ ) ;
514+ let output = evaluate_source_entry (
515+ plan,
516+ source_op,
517+ schema,
518+ key,
519+ source_value,
520+ & evaluation_memory,
521+ )
522+ . await ?;
523+ (
524+ source_ordinal,
525+ Some ( output) ,
526+ evaluation_memory. into_stored ( ) ?,
527+ )
528+ }
529+ None => Default :: default ( ) ,
530+ }
487531 } else {
488- None
532+ Default :: default ( )
489533 } ;
490- let exists = value_builder. is_some ( ) ;
491-
492534 if already_exists {
493- if exists {
535+ if output . is_some ( ) {
494536 stats. num_already_exists . fetch_add ( 1 , Relaxed ) ;
495537 } else {
496538 stats. num_deletions . fetch_add ( 1 , Relaxed ) ;
497539 }
498- } else if exists {
540+ } else if output . is_some ( ) {
499541 stats. num_insertions . fetch_add ( 1 , Relaxed ) ;
500542 } else {
501543 return Ok ( ( ) ) ;
502544 }
503545
504- let memoization_info = evaluation_memory. into_stored ( ) ?;
505- let ( source_ordinal, precommit_data) = match & value_builder {
506- Some ( scope_value) => {
507- (
508- // TODO: Generate the actual source ordinal.
509- Some ( 1 ) ,
510- Some ( PrecommitData {
511- scope_value,
512- memoization_info : & memoization_info,
513- } ) ,
514- )
515- }
516- None => ( None , None ) ,
517- } ;
518-
519546 // Phase 2 (precommit): Update with the memoization info and stage target keys.
520547 let precommit_output = precommit_source_tracking_info (
521548 source_op. source_id ,
522549 & source_key_json,
523- source_ordinal,
524- precommit_data,
550+ source_ordinal. map ( |o| o. into ( ) ) ,
551+ output. as_ref ( ) . map ( |scope_value| PrecommitData {
552+ scope_value,
553+ memoization_info : & stored_mem_info,
554+ } ) ,
525555 & process_timestamp,
526556 & plan. tracking_table_setup ,
527557 & plan. export_ops ,
528558 pool,
529559 )
530560 . await ?;
531561 let precommit_output = match precommit_output {
532- WithApplyStatus :: Normal ( output) => output,
533- WithApplyStatus :: Collapsed => return Ok ( ( ) ) ,
562+ UnchangedOr :: Normal ( output) => output,
563+ UnchangedOr :: Unchanged => return Ok ( ( ) ) ,
534564 } ;
535565
536566 // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
@@ -554,7 +584,7 @@ pub async fn update_source_entry(
554584 commit_source_tracking_info (
555585 source_op. source_id ,
556586 & source_key_json,
557- source_ordinal,
587+ source_ordinal. map ( |o| o . into ( ) ) ,
558588 & plan. logic_fingerprint ,
559589 precommit_output. metadata ,
560590 & process_timestamp,
0 commit comments