@@ -6,11 +6,12 @@ use std::collections::{HashMap, HashSet};
66
77use super :: db_tracking:: { self , read_source_tracking_info_for_processing, TrackedTargetKey } ;
88use super :: db_tracking_setup;
9- use super :: evaluator:: { evaluate_source_entry, EvaluateSourceEntryOutput } ;
9+ use super :: evaluator:: {
10+ evaluate_source_entry, EvaluateSourceEntryOutput , SourceRowEvaluationContext ,
11+ } ;
1012use super :: memoization:: { EvaluationMemory , EvaluationMemoryOptions , StoredMemoizationInfo } ;
1113use super :: stats;
1214
13- use crate :: base:: schema;
1415use crate :: base:: value:: { self , FieldValues , KeyValue } ;
1516use crate :: builder:: plan:: * ;
1617use crate :: ops:: interface:: {
@@ -439,19 +440,16 @@ async fn commit_source_tracking_info(
439440}
440441
441442pub async fn evaluate_source_entry_with_memory (
442- plan : & ExecutionPlan ,
443- import_op : & AnalyzedImportOp ,
444- schema : & schema:: FlowSchema ,
445- key : & value:: KeyValue ,
443+ src_eval_ctx : & SourceRowEvaluationContext < ' _ > ,
446444 options : EvaluationMemoryOptions ,
447445 pool : & PgPool ,
448446) -> Result < Option < EvaluateSourceEntryOutput > > {
449447 let stored_info = if options. enable_cache || !options. evaluation_only {
450- let source_key_json = serde_json:: to_value ( key) ?;
448+ let source_key_json = serde_json:: to_value ( src_eval_ctx . key ) ?;
451449 let existing_tracking_info = read_source_tracking_info_for_processing (
452- import_op. source_id ,
450+ src_eval_ctx . import_op . source_id ,
453451 & source_key_json,
454- & plan. tracking_table_setup ,
452+ & src_eval_ctx . plan . tracking_table_setup ,
455453 pool,
456454 )
457455 . await ?;
@@ -462,10 +460,11 @@ pub async fn evaluate_source_entry_with_memory(
462460 None
463461 } ;
464462 let memory = EvaluationMemory :: new ( chrono:: Utc :: now ( ) , stored_info, options) ;
465- let source_value = match import_op
463+ let source_value = match src_eval_ctx
464+ . import_op
466465 . executor
467466 . get_value (
468- key,
467+ src_eval_ctx . key ,
469468 & SourceExecutorGetOptions {
470469 include_value : true ,
471470 include_ordinal : false ,
@@ -477,28 +476,25 @@ pub async fn evaluate_source_entry_with_memory(
477476 Some ( d) => d,
478477 None => return Ok ( None ) ,
479478 } ;
480- let output = evaluate_source_entry ( plan , import_op , schema , key , source_value, & memory) . await ?;
479+ let output = evaluate_source_entry ( src_eval_ctx , source_value, & memory) . await ?;
481480 Ok ( Some ( output) )
482481}
483482
484483pub async fn update_source_row (
485- plan : & ExecutionPlan ,
486- import_op : & AnalyzedImportOp ,
487- schema : & schema:: FlowSchema ,
488- key : & value:: KeyValue ,
484+ src_eval_ctx : & SourceRowEvaluationContext < ' _ > ,
489485 source_value : Option < FieldValues > ,
490486 source_version : & SourceVersion ,
491487 pool : & PgPool ,
492488 update_stats : & stats:: UpdateStats ,
493489) -> Result < SkippedOr < ( ) > > {
494- let source_key_json = serde_json:: to_value ( key) ?;
495- let process_timestamp = chrono:: Utc :: now ( ) ;
490+ let source_key_json = serde_json:: to_value ( src_eval_ctx . key ) ?;
491+ let process_time = chrono:: Utc :: now ( ) ;
496492
497493 // Phase 1: Evaluate with memoization info.
498494 let existing_tracking_info = read_source_tracking_info_for_processing (
499- import_op. source_id ,
495+ src_eval_ctx . import_op . source_id ,
500496 & source_key_json,
501- & plan. tracking_table_setup ,
497+ & src_eval_ctx . plan . tracking_table_setup ,
502498 pool,
503499 )
504500 . await ?;
@@ -507,7 +503,7 @@ pub async fn update_source_row(
507503 let existing_version = SourceVersion :: from_stored (
508504 info. processed_source_ordinal ,
509505 & info. process_logic_fingerprint ,
510- plan. logic_fingerprint ,
506+ src_eval_ctx . plan . logic_fingerprint ,
511507 ) ;
512508 if existing_version. should_skip ( source_version, Some ( update_stats) ) {
513509 return Ok ( SkippedOr :: Skipped ( existing_version) ) ;
@@ -522,40 +518,33 @@ pub async fn update_source_row(
522518 let ( output, stored_mem_info) = match source_value {
523519 Some ( source_value) => {
524520 let evaluation_memory = EvaluationMemory :: new (
525- process_timestamp ,
521+ process_time ,
526522 memoization_info,
527523 EvaluationMemoryOptions {
528524 enable_cache : true ,
529525 evaluation_only : false ,
530526 } ,
531527 ) ;
532- let output = evaluate_source_entry (
533- plan,
534- import_op,
535- schema,
536- key,
537- source_value,
538- & evaluation_memory,
539- )
540- . await ?;
528+ let output =
529+ evaluate_source_entry ( src_eval_ctx, source_value, & evaluation_memory) . await ?;
541530 ( Some ( output) , evaluation_memory. into_stored ( ) ?)
542531 }
543532 None => Default :: default ( ) ,
544533 } ;
545534
546535 // Phase 2 (precommit): Update with the memoization info and stage target keys.
547536 let precommit_output = precommit_source_tracking_info (
548- import_op. source_id ,
537+ src_eval_ctx . import_op . source_id ,
549538 & source_key_json,
550539 source_version,
551- plan. logic_fingerprint ,
540+ src_eval_ctx . plan . logic_fingerprint ,
552541 output. as_ref ( ) . map ( |scope_value| PrecommitData {
553542 evaluate_output : scope_value,
554543 memoization_info : & stored_mem_info,
555544 } ) ,
556- & process_timestamp ,
557- & plan. tracking_table_setup ,
558- & plan. export_ops ,
545+ & process_time ,
546+ & src_eval_ctx . plan . tracking_table_setup ,
547+ & src_eval_ctx . plan . export_ops ,
559548 update_stats,
560549 pool,
561550 )
@@ -567,40 +556,44 @@ pub async fn update_source_row(
567556
568557 // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
569558 let mut target_mutations = precommit_output. target_mutations ;
570- let apply_futs = plan. export_op_groups . iter ( ) . filter_map ( |export_op_group| {
571- let mutations_w_ctx: Vec < _ > = export_op_group
572- . op_idx
573- . iter ( )
574- . filter_map ( |export_op_idx| {
575- let export_op = & plan. export_ops [ * export_op_idx] ;
576- target_mutations
577- . remove ( & export_op. target_id )
578- . filter ( |m| !m. is_empty ( ) )
579- . map ( |mutation| interface:: ExportTargetMutationWithContext {
580- mutation,
581- export_context : export_op. export_context . as_ref ( ) ,
582- } )
559+ let apply_futs = src_eval_ctx
560+ . plan
561+ . export_op_groups
562+ . iter ( )
563+ . filter_map ( |export_op_group| {
564+ let mutations_w_ctx: Vec < _ > = export_op_group
565+ . op_idx
566+ . iter ( )
567+ . filter_map ( |export_op_idx| {
568+ let export_op = & src_eval_ctx. plan . export_ops [ * export_op_idx] ;
569+ target_mutations
570+ . remove ( & export_op. target_id )
571+ . filter ( |m| !m. is_empty ( ) )
572+ . map ( |mutation| interface:: ExportTargetMutationWithContext {
573+ mutation,
574+ export_context : export_op. export_context . as_ref ( ) ,
575+ } )
576+ } )
577+ . collect ( ) ;
578+ ( !mutations_w_ctx. is_empty ( ) ) . then ( || {
579+ export_op_group
580+ . target_factory
581+ . apply_mutation ( mutations_w_ctx)
583582 } )
584- . collect ( ) ;
585- ( !mutations_w_ctx. is_empty ( ) ) . then ( || {
586- export_op_group
587- . target_factory
588- . apply_mutation ( mutations_w_ctx)
589- } )
590- } ) ;
583+ } ) ;
591584
592585 // TODO: Handle errors.
593586 try_join_all ( apply_futs) . await ?;
594587
595588 // Phase 4: Update the tracking record.
596589 commit_source_tracking_info (
597- import_op. source_id ,
590+ src_eval_ctx . import_op . source_id ,
598591 & source_key_json,
599592 source_version,
600- & plan. logic_fingerprint . 0 ,
593+ & src_eval_ctx . plan . logic_fingerprint . 0 ,
601594 precommit_output. metadata ,
602- & process_timestamp ,
603- & plan. tracking_table_setup ,
595+ & process_time ,
596+ & src_eval_ctx . plan . tracking_table_setup ,
604597 pool,
605598 )
606599 . await ?;
0 commit comments