@@ -24,7 +24,11 @@ mod timer;
2424
2525use std:: {
2626 cmp,
27- collections:: hash_map:: { Entry , HashMap } ,
27+ collections:: {
28+ hash_map:: { Entry , HashMap } ,
29+ VecDeque ,
30+ } ,
31+ convert:: Infallible ,
2832 fmt,
2933 future:: Future ,
3034 io,
@@ -188,6 +192,9 @@ where
188192 listen_addresses : Arc < RwLock < ListenAddresses > > ,
189193
190194 local_peer_id : PeerId ,
195+
196+ /// Pending behaviour events to be emitted.
197+ pending_events : VecDeque < ToSwarm < Event , Infallible > > ,
191198}
192199
193200impl < P > Behaviour < P >
@@ -208,6 +215,7 @@ where
208215 closest_expiration : Default :: default ( ) ,
209216 listen_addresses : Default :: default ( ) ,
210217 local_peer_id,
218+ pending_events : Default :: default ( ) ,
211219 } )
212220 }
213221
@@ -304,93 +312,113 @@ where
304312 & mut self ,
305313 cx : & mut Context < ' _ > ,
306314 ) -> Poll < ToSwarm < Self :: ToSwarm , THandlerInEvent < Self > > > {
307- // Poll ifwatch.
308- while let Poll :: Ready ( Some ( event) ) = Pin :: new ( & mut self . if_watch ) . poll_next ( cx) {
309- match event {
310- Ok ( IfEvent :: Up ( inet) ) => {
311- let addr = inet. addr ( ) ;
312- if addr. is_loopback ( ) {
313- continue ;
314- }
315- if addr. is_ipv4 ( ) && self . config . enable_ipv6
316- || addr. is_ipv6 ( ) && !self . config . enable_ipv6
317- {
318- continue ;
319- }
320- if let Entry :: Vacant ( e) = self . if_tasks . entry ( addr) {
321- match InterfaceState :: < P :: Socket , P :: Timer > :: new (
322- addr,
323- self . config . clone ( ) ,
324- self . local_peer_id ,
325- self . listen_addresses . clone ( ) ,
326- self . query_response_sender . clone ( ) ,
327- ) {
328- Ok ( iface_state) => {
329- e. insert ( P :: spawn ( iface_state) ) ;
330- }
331- Err ( err) => {
332- tracing:: error!( "failed to create `InterfaceState`: {}" , err)
315+ loop {
316+ // Check for pending events and emit them.
317+ if let Some ( event) = self . pending_events . pop_front ( ) {
318+ return Poll :: Ready ( event) ;
319+ }
320+
321+ // Poll ifwatch.
322+ while let Poll :: Ready ( Some ( event) ) = Pin :: new ( & mut self . if_watch ) . poll_next ( cx) {
323+ match event {
324+ Ok ( IfEvent :: Up ( inet) ) => {
325+ let addr = inet. addr ( ) ;
326+ if addr. is_loopback ( ) {
327+ continue ;
328+ }
329+ if addr. is_ipv4 ( ) && self . config . enable_ipv6
330+ || addr. is_ipv6 ( ) && !self . config . enable_ipv6
331+ {
332+ continue ;
333+ }
334+ if let Entry :: Vacant ( e) = self . if_tasks . entry ( addr) {
335+ match InterfaceState :: < P :: Socket , P :: Timer > :: new (
336+ addr,
337+ self . config . clone ( ) ,
338+ self . local_peer_id ,
339+ self . listen_addresses . clone ( ) ,
340+ self . query_response_sender . clone ( ) ,
341+ ) {
342+ Ok ( iface_state) => {
343+ e. insert ( P :: spawn ( iface_state) ) ;
344+ }
345+ Err ( err) => {
346+ tracing:: error!( "failed to create `InterfaceState`: {}" , err)
347+ }
333348 }
334349 }
335350 }
336- }
337- Ok ( IfEvent :: Down ( inet) ) => {
338- if let Some ( handle) = self . if_tasks . remove ( & inet. addr ( ) ) {
339- tracing:: info!( instance=%inet. addr( ) , "dropping instance" ) ;
351+ Ok ( IfEvent :: Down ( inet) ) => {
352+ if let Some ( handle) = self . if_tasks . remove ( & inet. addr ( ) ) {
353+ tracing:: info!( instance=%inet. addr( ) , "dropping instance" ) ;
340354
341- handle. abort ( ) ;
355+ handle. abort ( ) ;
356+ }
342357 }
358+ Err ( err) => tracing:: error!( "if watch returned an error: {}" , err) ,
343359 }
344- Err ( err) => tracing:: error!( "if watch returned an error: {}" , err) ,
345360 }
346- }
347- // Emit discovered event.
348- let mut discovered = Vec :: new ( ) ;
349-
350- while let Poll :: Ready ( Some ( ( peer, addr, expiration) ) ) =
351- self . query_response_receiver . poll_next_unpin ( cx)
352- {
353- if let Some ( ( _, _, cur_expires) ) = self
354- . discovered_nodes
355- . iter_mut ( )
356- . find ( |( p, a, _) | * p == peer && * a == addr)
361+ // Emit discovered event.
362+ let mut discovered = Vec :: new ( ) ;
363+
364+ while let Poll :: Ready ( Some ( ( peer, addr, expiration) ) ) =
365+ self . query_response_receiver . poll_next_unpin ( cx)
357366 {
358- * cur_expires = cmp:: max ( * cur_expires, expiration) ;
359- } else {
360- tracing:: info!( %peer, address=%addr, "discovered peer on address" ) ;
361- self . discovered_nodes . push ( ( peer, addr. clone ( ) , expiration) ) ;
362- discovered. push ( ( peer, addr) ) ;
367+ if let Some ( ( _, _, cur_expires) ) = self
368+ . discovered_nodes
369+ . iter_mut ( )
370+ . find ( |( p, a, _) | * p == peer && * a == addr)
371+ {
372+ * cur_expires = cmp:: max ( * cur_expires, expiration) ;
373+ } else {
374+ tracing:: info!( %peer, address=%addr, "discovered peer on address" ) ;
375+ self . discovered_nodes . push ( ( peer, addr. clone ( ) , expiration) ) ;
376+ discovered. push ( ( peer, addr. clone ( ) ) ) ;
377+
378+ self . pending_events
379+ . push_back ( ToSwarm :: NewExternalAddrOfPeer {
380+ peer_id : peer,
381+ address : addr,
382+ } ) ;
383+ }
363384 }
364- }
365385
366- if !discovered. is_empty ( ) {
367- let event = Event :: Discovered ( discovered) ;
368- return Poll :: Ready ( ToSwarm :: GenerateEvent ( event) ) ;
369- }
370- // Emit expired event.
371- let now = Instant :: now ( ) ;
372- let mut closest_expiration = None ;
373- let mut expired = Vec :: new ( ) ;
374- self . discovered_nodes . retain ( |( peer, addr, expiration) | {
375- if * expiration <= now {
376- tracing:: info!( %peer, address=%addr, "expired peer on address" ) ;
377- expired. push ( ( * peer, addr. clone ( ) ) ) ;
378- return false ;
386+ if !discovered. is_empty ( ) {
387+ let event = Event :: Discovered ( discovered) ;
388+ // Push to the front of the queue so that the behavior event is reported before
389+ // the individual discovered addresses.
390+ self . pending_events
391+ . push_front ( ToSwarm :: GenerateEvent ( event) ) ;
392+ continue ;
393+ }
394+ // Emit expired event.
395+ let now = Instant :: now ( ) ;
396+ let mut closest_expiration = None ;
397+ let mut expired = Vec :: new ( ) ;
398+ self . discovered_nodes . retain ( |( peer, addr, expiration) | {
399+ if * expiration <= now {
400+ tracing:: info!( %peer, address=%addr, "expired peer on address" ) ;
401+ expired. push ( ( * peer, addr. clone ( ) ) ) ;
402+ return false ;
403+ }
404+ closest_expiration =
405+ Some ( closest_expiration. unwrap_or ( * expiration) . min ( * expiration) ) ;
406+ true
407+ } ) ;
408+ if !expired. is_empty ( ) {
409+ let event = Event :: Expired ( expired) ;
410+ self . pending_events . push_back ( ToSwarm :: GenerateEvent ( event) ) ;
411+ continue ;
412+ }
413+ if let Some ( closest_expiration) = closest_expiration {
414+ let mut timer = P :: Timer :: at ( closest_expiration) ;
415+ let _ = Pin :: new ( & mut timer) . poll_next ( cx) ;
416+
417+ self . closest_expiration = Some ( timer) ;
379418 }
380- closest_expiration = Some ( closest_expiration. unwrap_or ( * expiration) . min ( * expiration) ) ;
381- true
382- } ) ;
383- if !expired. is_empty ( ) {
384- let event = Event :: Expired ( expired) ;
385- return Poll :: Ready ( ToSwarm :: GenerateEvent ( event) ) ;
386- }
387- if let Some ( closest_expiration) = closest_expiration {
388- let mut timer = P :: Timer :: at ( closest_expiration) ;
389- let _ = Pin :: new ( & mut timer) . poll_next ( cx) ;
390419
391- self . closest_expiration = Some ( timer ) ;
420+ return Poll :: Pending ;
392421 }
393- Poll :: Pending
394422 }
395423}
396424
0 commit comments