@@ -820,6 +820,10 @@ pub async fn process_multiple_changes(
820820 warn ! ( "process_multiple_changes: removing duplicates took too long - {elapsed:?}" ) ;
821821 }
822822
823+ // Deduplicate
824+ let phase1_t = start. elapsed ( ) ;
825+ let t2 = Instant :: now ( ) ;
826+
823827 let mut conn = agent. pool ( ) . write_normal ( ) . await ?;
824828
825829 let changesets = block_in_place ( || {
@@ -1094,6 +1098,10 @@ pub async fn process_multiple_changes(
10941098 Ok :: < _ , ChangeError > ( changesets)
10951099 } ) ?;
10961100
1101+ // block_in place db + bookie stuff
1102+ let phase2_t = t2. elapsed ( ) ;
1103+ let t3 = Instant :: now ( ) ;
1104+
10971105 let mut change_chunk_size = 0 ;
10981106
10991107 for ( _actor_id, changeset, db_version, _src) in changesets {
@@ -1106,6 +1114,20 @@ pub async fn process_multiple_changes(
11061114 . record ( start. elapsed ( ) ) ;
11071115 histogram ! ( "corro.agent.changes.processing.chunk_size" ) . record ( change_chunk_size as f64 ) ;
11081116
1117+ // Broadcast changes
1118+ let phase3_t = t3. elapsed ( ) ;
1119+
1120+ let total = phase1_t + phase2_t + phase3_t;
1121+ info ! (
1122+ "preprocess: {} ({} of total), db_stuff: {} ({} of total), match_changes: {} ({} of total)" ,
1123+ phase1_t. as_micros( ) ,
1124+ phase1_t. as_secs_f64( ) / total. as_secs_f64( ) ,
1125+ phase2_t. as_micros( ) ,
1126+ phase2_t. as_secs_f64( ) / total. as_secs_f64( ) ,
1127+ phase3_t. as_micros( ) ,
1128+ phase3_t. as_secs_f64( ) / total. as_secs_f64( )
1129+ ) ;
1130+
11091131 Ok ( ( ) )
11101132}
11111133
0 commit comments