11use anyhow:: Result ;
2- use futures:: future:: { join_all, try_join, try_join_all} ;
2+ use futures:: future:: { join, join_all, try_join, try_join_all} ;
3+ use itertools:: Itertools ;
34use log:: error;
45use serde:: Serialize ;
56use sqlx:: PgPool ;
67use std:: collections:: { HashMap , HashSet } ;
8+ use std:: sync:: atomic:: { AtomicUsize , Ordering :: Relaxed } ;
79
810use super :: db_tracking:: { self , read_source_tracking_info, TrackedTargetKey } ;
911use super :: db_tracking_setup;
@@ -20,23 +22,27 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
2022
2123#[ derive( Debug , Serialize , Default ) ]
2224pub struct UpdateStats {
23- pub num_insertions : usize ,
24- pub num_deletions : usize ,
25- pub num_already_exists : usize ,
26- pub num_errors : usize ,
25+ pub num_insertions : AtomicUsize ,
26+ pub num_deletions : AtomicUsize ,
27+ pub num_already_exists : AtomicUsize ,
28+ pub num_errors : AtomicUsize ,
2729}
2830
2931impl std:: fmt:: Display for UpdateStats {
3032 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
31- let num_source_rows = self . num_insertions + self . num_deletions + self . num_already_exists ;
33+ let num_source_rows = self . num_insertions . load ( Relaxed )
34+ + self . num_deletions . load ( Relaxed )
35+ + self . num_already_exists . load ( Relaxed ) ;
3236 write ! ( f, "{num_source_rows} source rows processed" , ) ?;
33- if self . num_errors > 0 {
34- write ! ( f, " with {} ERRORS" , self . num_errors) ?;
37+ if self . num_errors . load ( Relaxed ) > 0 {
38+ write ! ( f, " with {} ERRORS" , self . num_errors. load ( Relaxed ) ) ?;
3539 }
3640 write ! (
3741 f,
3842 ": {} added, {} removed, {} already exists" ,
39- self . num_insertions, self . num_deletions, self . num_already_exists
43+ self . num_insertions. load( Relaxed ) ,
44+ self . num_deletions. load( Relaxed ) ,
45+ self . num_already_exists. load( Relaxed )
4046 ) ?;
4147 Ok ( ( ) )
4248 }
@@ -442,7 +448,9 @@ pub async fn update_source_entry(
442448 source_op_idx : usize ,
443449 schema : & schema:: DataSchema ,
444450 key : & value:: KeyValue ,
451+ only_for_deletion : bool ,
445452 pool : & PgPool ,
453+ stats : & UpdateStats ,
446454) -> Result < ( ) > {
447455 let source_id = plan. source_ops [ source_op_idx] . source_id ;
448456 let source_key_json = serde_json:: to_value ( key) ?;
@@ -464,11 +472,22 @@ pub async fn update_source_entry(
464472 . flatten ( ) ;
465473 let evaluation_cache =
466474 EvaluationCache :: new ( process_timestamp, memoization_info. map ( |info| info. cache ) ) ;
467- let value_builder =
468- evaluate_source_entry ( plan, source_op_idx, schema, key, Some ( & evaluation_cache) ) . await ?;
475+ let value_builder = if !only_for_deletion {
476+ evaluate_source_entry ( plan, source_op_idx, schema, key, Some ( & evaluation_cache) ) . await ?
477+ } else {
478+ None
479+ } ;
480+ let exists = value_builder. is_some ( ) ;
469481
470- // Didn't exist and still doesn't exist. No need to apply any changes.
471- if !already_exists && value_builder. is_none ( ) {
482+ if already_exists {
483+ if exists {
484+ stats. num_already_exists . fetch_add ( 1 , Relaxed ) ;
485+ } else {
486+ stats. num_deletions . fetch_add ( 1 , Relaxed ) ;
487+ }
488+ } else if exists {
489+ stats. num_insertions . fetch_add ( 1 , Relaxed ) ;
490+ } else {
472491 return Ok ( ( ) ) ;
473492 }
474493
@@ -548,6 +567,31 @@ pub async fn update_source_entry(
548567 Ok ( ( ) )
549568}
550569
570+ async fn update_source_entry_with_err_handling (
571+ plan : & ExecutionPlan ,
572+ source_op_idx : usize ,
573+ schema : & schema:: DataSchema ,
574+ key : & value:: KeyValue ,
575+ only_for_deletion : bool ,
576+ pool : & PgPool ,
577+ stats : & UpdateStats ,
578+ ) {
579+ let r = update_source_entry (
580+ plan,
581+ source_op_idx,
582+ schema,
583+ key,
584+ only_for_deletion,
585+ pool,
586+ stats,
587+ )
588+ . await ;
589+ if let Err ( e) = r {
590+ stats. num_errors . fetch_add ( 1 , Relaxed ) ;
591+ error ! ( "{:?}" , e. context( "Error in indexing a source row" ) ) ;
592+ }
593+ }
594+
551595async fn update_source (
552596 source_name : & str ,
553597 plan : & ExecutionPlan ,
@@ -566,45 +610,29 @@ async fn update_source(
566610 )
567611 . await ?;
568612
569- let num_new_keys = keys. len ( ) ;
570- let mut num_updates = 0 ;
571- let mut num_deletions = 0 ;
572-
573- let mut all_keys_set = keys. into_iter ( ) . collect :: < HashSet < _ > > ( ) ;
574- for existing_key_json in existing_keys_json. into_iter ( ) {
575- let existing_key = value:: Value :: < value:: ScopeValue > :: from_json (
576- existing_key_json. source_key ,
577- & source_op. primary_key_type ,
578- ) ?;
579- let inserted = all_keys_set. insert ( existing_key. as_key ( ) ?) ;
580- if inserted {
581- num_deletions += 1 ;
582- } else {
583- num_updates += 1 ;
584- }
585- }
586-
587- let num_errors = join_all ( all_keys_set. into_iter ( ) . map ( |key| async move {
588- let result = update_source_entry ( plan, source_op_idx, schema, & key, pool) . await ;
589- if let Err ( e) = result {
590- error ! ( "{:?}" , e. context( "Error in indexing a source row" ) ) ;
591- 1
592- } else {
593- 0
594- }
595- } ) )
596- . await
597- . iter ( )
598- . sum ( ) ;
613+ let stats = UpdateStats :: default ( ) ;
614+ let upsert_futs = join_all ( keys. iter ( ) . map ( |key| {
615+ update_source_entry_with_err_handling ( plan, source_op_idx, schema, key, false , pool, & stats)
616+ } ) ) ;
617+ let deleted_keys = existing_keys_json
618+ . into_iter ( )
619+ . map ( |existing_key_json| {
620+ value:: Value :: < value:: ScopeValue > :: from_json (
621+ existing_key_json. source_key ,
622+ & source_op. primary_key_type ,
623+ ) ?
624+ . as_key ( )
625+ } )
626+ . filter_ok ( |existing_key| !keys. contains ( existing_key) )
627+ . collect :: < Result < Vec < _ > > > ( ) ?;
628+ let delete_futs = join_all ( deleted_keys. iter ( ) . map ( |key| {
629+ update_source_entry_with_err_handling ( plan, source_op_idx, schema, key, true , pool, & stats)
630+ } ) ) ;
631+ join ( upsert_futs, delete_futs) . await ;
599632
600633 Ok ( SourceUpdateInfo {
601634 source_name : source_name. to_string ( ) ,
602- stats : UpdateStats {
603- num_insertions : num_new_keys - num_updates,
604- num_deletions,
605- num_already_exists : num_updates,
606- num_errors,
607- } ,
635+ stats,
608636 } )
609637}
610638
0 commit comments