@@ -413,14 +413,41 @@ async fn commit_source_tracking_info(
413413 Ok ( WithApplyStatus :: Normal ( ( ) ) )
414414}
415415
416+ /// Built an evaluation cache on the existing data.
417+ pub async fn evaluation_cache_on_existing_data (
418+ plan : & ExecutionPlan ,
419+ source_op_idx : usize ,
420+ key : & value:: KeyValue ,
421+ pool : & PgPool ,
422+ ) -> Result < EvaluationCache > {
423+ let source_id = plan. source_ops [ source_op_idx] . source_id ;
424+ let source_key_json = serde_json:: to_value ( key) ?;
425+ let existing_tracking_info = read_source_tracking_info (
426+ source_id,
427+ & source_key_json,
428+ & plan. tracking_table_setup ,
429+ pool,
430+ )
431+ . await ?;
432+ let process_timestamp = chrono:: Utc :: now ( ) ;
433+ let memoization_info = existing_tracking_info
434+ . map ( |info| info. memoization_info . map ( |info| info. 0 ) )
435+ . flatten ( )
436+ . flatten ( ) ;
437+ Ok ( EvaluationCache :: new (
438+ process_timestamp,
439+ memoization_info. map ( |info| info. cache ) ,
440+ ) )
441+ }
442+
416443pub async fn update_source_entry < ' a > (
417444 plan : & ExecutionPlan ,
418445 source_op_idx : usize ,
419446 schema : & schema:: DataSchema ,
420447 key : & value:: KeyValue ,
421448 pool : & PgPool ,
422449) -> Result < ( ) > {
423- let source_id = plan. source_ops [ source_op_idx as usize ] . source_id ;
450+ let source_id = plan. source_ops [ source_op_idx] . source_id ;
424451 let source_key_json = serde_json:: to_value ( key) ?;
425452 let process_timestamp = chrono:: Utc :: now ( ) ;
426453
@@ -532,7 +559,7 @@ async fn update_source(
532559 schema : & schema:: DataSchema ,
533560 pool : & PgPool ,
534561) -> Result < SourceUpdateInfo > {
535- let source_op = & plan. source_ops [ source_op_idx as usize ] ;
562+ let source_op = & plan. source_ops [ source_op_idx] ;
536563 let ( keys, existing_keys_json) = try_join (
537564 source_op. executor . list_keys ( ) ,
538565 db_tracking:: list_source_tracking_keys (
0 commit comments