@@ -11,7 +11,6 @@ use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey};
1111use super :: db_tracking_setup;
1212use super :: memoization:: { EvaluationCache , MemoizationInfo } ;
1313use crate :: base:: schema;
14- use crate :: base:: spec:: FlowInstanceSpec ;
1514use crate :: base:: value:: { self , FieldValues , KeyValue } ;
1615use crate :: builder:: plan:: * ;
1716use crate :: ops:: interface:: { ExportTargetMutation , ExportTargetUpsertEntry } ;
@@ -420,15 +419,14 @@ async fn commit_source_tracking_info(
420419
421420pub async fn evaluate_source_entry_with_cache (
422421 plan : & ExecutionPlan ,
423- source_op_idx : usize ,
422+ source_op : & AnalyzedSourceOp ,
424423 schema : & schema:: DataSchema ,
425424 key : & value:: KeyValue ,
426425 pool : & PgPool ,
427426) -> Result < Option < value:: ScopeValue > > {
428- let source_id = plan. source_ops [ source_op_idx] . source_id ;
429427 let source_key_json = serde_json:: to_value ( key) ?;
430428 let existing_tracking_info = read_source_tracking_info (
431- source_id,
429+ source_op . source_id ,
432430 & source_key_json,
433431 & plan. tracking_table_setup ,
434432 pool,
@@ -441,28 +439,27 @@ pub async fn evaluate_source_entry_with_cache(
441439 let evaluation_cache =
442440 EvaluationCache :: new ( process_timestamp, memoization_info. map ( |info| info. cache ) ) ;
443441 let data_builder =
444- evaluate_source_entry ( plan, source_op_idx , schema, key, Some ( & evaluation_cache) ) . await ?;
442+ evaluate_source_entry ( plan, source_op , schema, key, Some ( & evaluation_cache) ) . await ?;
445443 Ok ( data_builder. map ( |builder| builder. into ( ) ) )
446444}
447445
448446pub async fn update_source_entry (
449447 plan : & ExecutionPlan ,
450- source_op_idx : usize ,
448+ source_op : & AnalyzedSourceOp ,
451449 schema : & schema:: DataSchema ,
452450 key : & value:: KeyValue ,
453451 only_for_deletion : bool ,
454452 pool : & PgPool ,
455453 stats : & UpdateStats ,
456454) -> Result < ( ) > {
457- let source_id = plan. source_ops [ source_op_idx] . source_id ;
458455 let source_key_json = serde_json:: to_value ( key) ?;
459456 let process_timestamp = chrono:: Utc :: now ( ) ;
460457
461458 // Phase 1: Evaluate with memoization info.
462459
463460 // TODO: Skip if the source is not newer and the processing logic is not changed.
464461 let existing_tracking_info = read_source_tracking_info (
465- source_id,
462+ source_op . source_id ,
466463 & source_key_json,
467464 & plan. tracking_table_setup ,
468465 pool,
@@ -475,7 +472,7 @@ pub async fn update_source_entry(
475472 let evaluation_cache =
476473 EvaluationCache :: new ( process_timestamp, memoization_info. map ( |info| info. cache ) ) ;
477474 let value_builder = if !only_for_deletion {
478- evaluate_source_entry ( plan, source_op_idx , schema, key, Some ( & evaluation_cache) ) . await ?
475+ evaluate_source_entry ( plan, source_op , schema, key, Some ( & evaluation_cache) ) . await ?
479476 } else {
480477 None
481478 } ;
@@ -512,7 +509,7 @@ pub async fn update_source_entry(
512509
513510 // Phase 2 (precommit): Update with the memoization info and stage target keys.
514511 let precommit_output = precommit_source_tracking_info (
515- source_id,
512+ source_op . source_id ,
516513 & source_key_json,
517514 source_ordinal,
518515 precommit_data,
@@ -546,7 +543,7 @@ pub async fn update_source_entry(
546543
547544 // Phase 4: Update the tracking record.
548545 commit_source_tracking_info (
549- source_id,
546+ source_op . source_id ,
550547 & source_key_json,
551548 source_ordinal,
552549 & plan. logic_fingerprint ,
@@ -562,23 +559,14 @@ pub async fn update_source_entry(
562559
563560async fn update_source_entry_with_err_handling (
564561 plan : & ExecutionPlan ,
565- source_op_idx : usize ,
562+ source_op : & AnalyzedSourceOp ,
566563 schema : & schema:: DataSchema ,
567564 key : & value:: KeyValue ,
568565 only_for_deletion : bool ,
569566 pool : & PgPool ,
570567 stats : & UpdateStats ,
571568) {
572- let r = update_source_entry (
573- plan,
574- source_op_idx,
575- schema,
576- key,
577- only_for_deletion,
578- pool,
579- stats,
580- )
581- . await ;
569+ let r = update_source_entry ( plan, source_op, schema, key, only_for_deletion, pool, stats) . await ;
582570 if let Err ( e) = r {
583571 stats. num_errors . fetch_add ( 1 , Relaxed ) ;
584572 error ! ( "{:?}" , e. context( "Error in indexing a source row" ) ) ;
@@ -588,11 +576,10 @@ async fn update_source_entry_with_err_handling(
588576async fn update_source (
589577 source_name : & str ,
590578 plan : & ExecutionPlan ,
591- source_op_idx : usize ,
579+ source_op : & AnalyzedSourceOp ,
592580 schema : & schema:: DataSchema ,
593581 pool : & PgPool ,
594582) -> Result < SourceUpdateInfo > {
595- let source_op = & plan. source_ops [ source_op_idx] ;
596583 let ( keys, existing_keys_json) = try_join (
597584 source_op. executor . list_keys ( ) ,
598585 db_tracking:: list_source_tracking_keys (
@@ -605,7 +592,7 @@ async fn update_source(
605592
606593 let stats = UpdateStats :: default ( ) ;
607594 let upsert_futs = join_all ( keys. iter ( ) . map ( |key| {
608- update_source_entry_with_err_handling ( plan, source_op_idx , schema, key, false , pool, & stats)
595+ update_source_entry_with_err_handling ( plan, source_op , schema, key, false , pool, & stats)
609596 } ) ) ;
610597 let deleted_keys = existing_keys_json
611598 . into_iter ( )
@@ -619,7 +606,7 @@ async fn update_source(
619606 . filter_ok ( |existing_key| !keys. contains ( existing_key) )
620607 . collect :: < Result < Vec < _ > > > ( ) ?;
621608 let delete_futs = join_all ( deleted_keys. iter ( ) . map ( |key| {
622- update_source_entry_with_err_handling ( plan, source_op_idx , schema, key, true , pool, & stats)
609+ update_source_entry_with_err_handling ( plan, source_op , schema, key, true , pool, & stats)
623610 } ) ) ;
624611 join ( upsert_futs, delete_futs) . await ;
625612
@@ -630,17 +617,15 @@ async fn update_source(
630617}
631618
632619pub async fn update (
633- spec : & FlowInstanceSpec ,
634620 plan : & ExecutionPlan ,
635621 schema : & schema:: DataSchema ,
636622 pool : & PgPool ,
637623) -> Result < IndexUpdateInfo > {
638624 let source_update_stats = try_join_all (
639- spec . source_ops
625+ plan . source_ops
640626 . iter ( )
641- . enumerate ( )
642- . map ( |( source_op_idx, source_op) | async move {
643- update_source ( source_op. name . as_str ( ) , plan, source_op_idx, schema, pool) . await
627+ . map ( |source_op| async move {
628+ update_source ( source_op. name . as_str ( ) , plan, source_op, schema, pool) . await
644629 } )
645630 . collect :: < Vec < _ > > ( ) ,
646631 )
0 commit comments