@@ -1047,21 +1047,25 @@ where
10471047 }
10481048}
10491049
1050- macro_rules! drop_handled_events_and_abort { ( $self: expr, $res : expr , $offset : expr, $event_queue: expr) => {
1050+ macro_rules! drop_handled_events_and_abort { ( $self: expr, $res_iter : expr, $event_queue: expr) => {
10511051 // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
10521052 // successfully handled events from the given queue, reset the events processing flag, and
10531053 // return, to have the events eventually replayed upon next invocation.
10541054 {
10551055 let mut queue_lock = $event_queue. lock( ) . unwrap( ) ;
10561056
1057- // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1058- let mut res_iter = $res. iter( ) . skip( $offset) ;
1059-
10601057 // Keep all events which previously error'd *or* any that have been added since we dropped
10611058 // the Mutex before.
1062- queue_lock. retain( |_| res_iter. next( ) . map_or( true , |r| r. is_err( ) ) ) ;
1059+ let mut any_error = false ;
1060+ queue_lock. retain( |_| {
1061+ $res_iter. next( ) . map_or( true , |r| {
1062+ let is_err = r. is_err( ) ;
1063+ any_error |= is_err;
1064+ is_err
1065+ } )
1066+ } ) ;
10631067
1064- if $res . iter ( ) . any ( |r| r . is_err ( ) ) {
1068+ if any_error {
10651069 // We failed handling some events. Return to have them eventually replayed.
10661070 $self. pending_events_processor. store( false , Ordering :: Release ) ;
10671071 $self. event_notifier. notify( ) ;
@@ -1426,7 +1430,8 @@ where
14261430 }
14271431 // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
14281432 let res = MultiResultFuturePoller :: new ( futures) . await ;
1429- drop_handled_events_and_abort ! ( self , res, intercepted_msgs_offset, self . pending_intercepted_msgs_events) ;
1433+ let mut res_iter = res. iter ( ) . skip ( intercepted_msgs_offset) ;
1434+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_intercepted_msgs_events) ;
14301435 }
14311436
14321437 {
@@ -1449,7 +1454,8 @@ where
14491454 futures. push ( future) ;
14501455 }
14511456 let res = MultiResultFuturePoller :: new ( futures) . await ;
1452- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1457+ let mut res_iter = res. iter ( ) ;
1458+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_peer_connected_events) ;
14531459 }
14541460 }
14551461 self . pending_events_processor . store ( false , Ordering :: Release ) ;
@@ -1508,7 +1514,7 @@ where
15081514 {
15091515 let pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
15101516 intercepted_msgs = pending_intercepted_msgs_events. clone ( ) ;
1511- let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1517+ let pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
15121518 peer_connecteds = pending_peer_connected_events. clone ( ) ;
15131519 #[ cfg( debug_assertions) ] {
15141520 for ev in pending_intercepted_msgs_events. iter ( ) {
@@ -1518,14 +1524,47 @@ where
15181524 if let Event :: OnionMessagePeerConnected { .. } = ev { } else { panic ! ( ) ; }
15191525 }
15201526 }
1521- pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
15221527 }
15231528
1524- let res = intercepted_msgs. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1525- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_intercepted_msgs_events) ;
1529+ let mut handling_intercepted_msgs_failed = false ;
1530+ let mut num_handled_intercepted_events = 0 ;
1531+ for ev in intercepted_msgs {
1532+ match handler. handle_event ( ev) {
1533+ Ok ( ( ) ) => num_handled_intercepted_events += 1 ,
1534+ Err ( ReplayEvent ( ) ) => {
1535+ handling_intercepted_msgs_failed = true ;
1536+ break ;
1537+ }
1538+ }
1539+ }
1540+
1541+ {
1542+ let mut pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
1543+ pending_intercepted_msgs_events. drain ( ..num_handled_intercepted_events) ;
1544+ }
15261545
1527- let res = peer_connecteds. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1528- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1546+ if handling_intercepted_msgs_failed {
1547+ self . pending_events_processor . store ( false , Ordering :: Release ) ;
1548+ self . event_notifier . notify ( ) ;
1549+ return ;
1550+ }
1551+
1552+ let mut num_handled_peer_connecteds = 0 ;
1553+ for ev in peer_connecteds {
1554+ match handler. handle_event ( ev) {
1555+ Ok ( ( ) ) => num_handled_peer_connecteds += 1 ,
1556+ Err ( ReplayEvent ( ) ) => {
1557+ self . event_notifier . notify ( ) ;
1558+ break ;
1559+ }
1560+ }
1561+ }
1562+
1563+ {
1564+ let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1565+ pending_peer_connected_events. drain ( ..num_handled_peer_connecteds) ;
1566+ pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
1567+ }
15291568
15301569 self . pending_events_processor . store ( false , Ordering :: Release ) ;
15311570 }
0 commit comments