File tree Expand file tree Collapse file tree 2 files changed +9
-5
lines changed
crates/corro-agent/src/agent Expand file tree Collapse file tree 2 files changed +9
-5
lines changed Original file line number Diff line number Diff line change @@ -642,7 +642,7 @@ impl HandleChangesState {
642642 max_queue_len : usize ,
643643 ) -> Self {
644644 Self {
645- queue : VecDeque :: new ( ) ,
645+ queue : VecDeque :: with_capacity ( max_queue_len ) ,
646646 buf_cost : 0 ,
647647 current_batch_size : min_batch_size,
648648 processing_task : None ,
@@ -687,7 +687,7 @@ impl HandleChangesState {
687687 return None ;
688688 }
689689
690- let mut batch = Vec :: new ( ) ;
690+ let mut batch = Vec :: with_capacity ( self . current_batch_size ) ;
691691 let mut batch_cost = 0 ;
692692
693693 while let Some ( ( change, src, queued_at) ) = self . queue . pop_front ( ) {
Original file line number Diff line number Diff line change @@ -1104,11 +1104,15 @@ pub async fn process_multiple_changes(
11041104
11051105 let mut change_chunk_size = 0 ;
11061106
1107- for ( _actor_id, changeset, db_version , _src) in changesets {
1107+ for ( _actor_id, changeset, _db_version , _src) in & changesets {
11081108 change_chunk_size += changeset. len ( ) ;
1109- match_changes ( agent. subs_manager ( ) , & changeset, db_version) ;
1110- match_changes ( agent. updates_manager ( ) , & changeset, db_version) ;
11111109 }
1110+ tokio:: spawn ( async move {
1111+ for ( _actor_id, changeset, db_version, _src) in changesets {
1112+ match_changes ( agent. subs_manager ( ) , & changeset, db_version) ;
1113+ match_changes ( agent. updates_manager ( ) , & changeset, db_version) ;
1114+ }
1115+ } ) ;
11121116
11131117 histogram ! ( "corro.agent.changes.processing.time.seconds" , "source" => "remote" )
11141118 . record ( start. elapsed ( ) ) ;
You can’t perform that action at this time.
0 commit comments