@@ -65,7 +65,7 @@ use crate::{
6565 transform:: { DataTransform , IdentityTransform } ,
6666 types:: {
6767 ControlAction , Graft , IDontWant , IHave , IWant , Message , MessageAcceptance , MessageId ,
68- PeerConnections , PeerInfo , PeerKind , Prune , RawMessage , RpcOut , Subscription ,
68+ PeerDetails , PeerInfo , PeerKind , Prune , RawMessage , RpcOut , Subscription ,
6969 SubscriptionAction ,
7070 } ,
7171 FailedMessages , PublishError , SubscriptionError , TopicScoreParams , ValidationError ,
@@ -270,7 +270,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
270270
271271 /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
272272 /// the set of [`ConnectionId`]s.
273- connected_peers : HashMap < PeerId , PeerConnections > ,
273+ connected_peers : HashMap < PeerId , PeerDetails > ,
274274
275275 /// A set of all explicit peers. These are peers that remain connected and we unconditionally
276276 /// forward messages to, outside of the scoring system.
@@ -308,10 +308,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
308308 /// be removed from this list which may result in a true outbound rediscovery.
309309 px_peers : HashSet < PeerId > ,
310310
311- /// Set of connected outbound peers (we only consider true outbound peers found through
312- /// discovery and not by PX).
313- outbound_peers : HashSet < PeerId > ,
314-
315311 /// Stores optional peer score data together with thresholds, decay interval and gossip
316312 /// promises.
317313 peer_score : Option < ( PeerScore , PeerScoreThresholds , Delay ) > ,
@@ -465,7 +461,6 @@ where
465461 heartbeat : Delay :: new ( config. heartbeat_interval ( ) + config. heartbeat_initial_delay ( ) ) ,
466462 heartbeat_ticks : 0 ,
467463 px_peers : HashSet :: new ( ) ,
468- outbound_peers : HashSet :: new ( ) ,
469464 peer_score : None ,
470465 count_received_ihave : HashMap :: new ( ) ,
471466 count_sent_iwant : HashMap :: new ( ) ,
@@ -1380,6 +1375,8 @@ where
13801375 tracing:: error!( peer_id = %peer_id, "Peer non-existent when handling graft" ) ;
13811376 return ;
13821377 } ;
1378+ // Needs to be here to comply with the borrow checker.
1379+ let is_outbound = connected_peer. outbound ;
13831380
13841381 // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
13851382 // and they must be subscribed to the topic. Ensure we have recorded the mapping.
@@ -1468,7 +1465,7 @@ where
14681465 // or if it is an outbound peer
14691466 let mesh_n_high = self . config . mesh_n_high_for_topic ( & topic_hash) ;
14701467
1471- if peers. len ( ) >= mesh_n_high && !self . outbound_peers . contains ( peer_id ) {
1468+ if peers. len ( ) >= mesh_n_high && !is_outbound {
14721469 to_prune_topics. insert ( topic_hash. clone ( ) ) ;
14731470 continue ;
14741471 }
@@ -2102,7 +2099,6 @@ where
21022099 for ( topic_hash, peers) in self . mesh . iter_mut ( ) {
21032100 let explicit_peers = & self . explicit_peers ;
21042101 let backoffs = & self . backoffs ;
2105- let outbound_peers = & self . outbound_peers ;
21062102
21072103 let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
21082104 let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash) ;
@@ -2197,13 +2193,14 @@ where
21972193 shuffled[ ..peers. len ( ) - self . config . retain_scores ( ) ] . shuffle ( & mut rng) ;
21982194
21992195 // count total number of outbound peers
2200- let mut outbound = {
2201- let outbound_peers = & self . outbound_peers ;
2202- shuffled
2203- . iter ( )
2204- . filter ( |p| outbound_peers. contains ( * p) )
2205- . count ( )
2206- } ;
2196+ let mut outbound = shuffled
2197+ . iter ( )
2198+ . filter ( |peer_id| {
2199+ self . connected_peers
2200+ . get ( peer_id)
2201+ . is_some_and ( |peer| peer. outbound )
2202+ } )
2203+ . count ( ) ;
22072204
22082205 // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
22092206 // them to to_prune
@@ -2212,7 +2209,11 @@ where
22122209 if removed == excess_peer_no {
22132210 break ;
22142211 }
2215- if self . outbound_peers . contains ( & peer) {
2212+ if self
2213+ . connected_peers
2214+ . get ( & peer)
2215+ . is_some_and ( |peer| peer. outbound )
2216+ {
22162217 if outbound <= mesh_outbound_min {
22172218 // do not remove anymore outbound peers
22182219 continue ;
@@ -2236,18 +2237,28 @@ where
22362237 // do we have enough outbound peers?
22372238 if peers. len ( ) >= mesh_n_low {
22382239 // count number of outbound peers we have
2239- let outbound = { peers. iter ( ) . filter ( |p| outbound_peers. contains ( * p) ) . count ( ) } ;
2240+ let outbound = peers
2241+ . iter ( )
2242+ . filter ( |peer_id| {
2243+ self . connected_peers
2244+ . get ( peer_id)
2245+ . is_some_and ( |peer| peer. outbound )
2246+ } )
2247+ . count ( ) ;
22402248
22412249 // if we have not enough outbound peers, graft to some new outbound peers
22422250 if outbound < mesh_outbound_min {
22432251 let needed = mesh_outbound_min - outbound;
22442252 let peer_list =
2245- get_random_peers ( & self . connected_peers , topic_hash, needed, |peer| {
2246- !peers. contains ( peer)
2247- && !explicit_peers. contains ( peer)
2248- && !backoffs. is_backoff_with_slack ( topic_hash, peer)
2249- && * scores. get ( peer) . unwrap_or ( & 0.0 ) >= 0.0
2250- && outbound_peers. contains ( peer)
2253+ get_random_peers ( & self . connected_peers , topic_hash, needed, |peer_id| {
2254+ !peers. contains ( peer_id)
2255+ && !explicit_peers. contains ( peer_id)
2256+ && !backoffs. is_backoff_with_slack ( topic_hash, peer_id)
2257+ && * scores. get ( peer_id) . unwrap_or ( & 0.0 ) >= 0.0
2258+ && self
2259+ . connected_peers
2260+ . get ( peer_id)
2261+ . is_some_and ( |peer| peer. outbound )
22512262 } ) ;
22522263
22532264 for peer in & peer_list {
@@ -2904,15 +2915,6 @@ where
29042915 ..
29052916 } : ConnectionEstablished ,
29062917 ) {
2907- // Diverging from the go implementation we only want to consider a peer as outbound peer
2908- // if its first connection is outbound.
2909-
2910- if endpoint. is_dialer ( ) && other_established == 0 && !self . px_peers . contains ( & peer_id) {
2911- // The first connection is outbound and it is not a peer from peer exchange => mark
2912- // it as outbound peer
2913- self . outbound_peers . insert ( peer_id) ;
2914- }
2915-
29162918 // Add the IP to the peer scoring system
29172919 if let Some ( ( peer_score, ..) ) = & mut self . peer_score {
29182920 if let Some ( ip) = get_ip_addr ( endpoint. get_remote_address ( ) ) {
@@ -3030,7 +3032,6 @@ where
30303032
30313033 // Forget px and outbound status for this peer
30323034 self . px_peers . remove ( & peer_id) ;
3033- self . outbound_peers . remove ( & peer_id) ;
30343035
30353036 // If metrics are enabled, register the disconnection of a peer based on its protocol.
30363037 if let Some ( metrics) = self . metrics . as_mut ( ) {
@@ -3113,16 +3114,14 @@ where
31133114 // The protocol negotiation occurs once a message is sent/received. Once this happens we
31143115 // update the type of peer that this is in order to determine which kind of routing should
31153116 // occur.
3116- let connected_peer = self
3117- . connected_peers
3118- . entry ( peer_id)
3119- . or_insert ( PeerConnections {
3120- kind : PeerKind :: Floodsub ,
3121- connections : vec ! [ ] ,
3122- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3123- topics : Default :: default ( ) ,
3124- dont_send : LinkedHashMap :: new ( ) ,
3125- } ) ;
3117+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3118+ kind : PeerKind :: Floodsub ,
3119+ connections : vec ! [ ] ,
3120+ outbound : false ,
3121+ sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3122+ topics : Default :: default ( ) ,
3123+ dont_send : LinkedHashMap :: new ( ) ,
3124+ } ) ;
31263125 // Add the new connection
31273126 connected_peer. connections . push ( connection_id) ;
31283127
@@ -3140,16 +3139,16 @@ where
31403139 _: Endpoint ,
31413140 _: PortUse ,
31423141 ) -> Result < THandler < Self > , ConnectionDenied > {
3143- let connected_peer = self
3144- . connected_peers
3145- . entry ( peer_id )
3146- . or_insert ( PeerConnections {
3147- kind : PeerKind :: Floodsub ,
3148- connections : vec ! [ ] ,
3149- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3150- topics : Default :: default ( ) ,
3151- dont_send : LinkedHashMap :: new ( ) ,
3152- } ) ;
3142+ let connected_peer = self . connected_peers . entry ( peer_id ) . or_insert ( PeerDetails {
3143+ kind : PeerKind :: Floodsub ,
3144+ connections : vec ! [ ] ,
3145+ // Diverging from the go implementation we only want to consider a peer as outbound peer
3146+ // if its first connection is outbound.
3147+ outbound : ! self . px_peers . contains ( & peer_id ) ,
3148+ sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3149+ topics : Default :: default ( ) ,
3150+ dont_send : LinkedHashMap :: new ( ) ,
3151+ } ) ;
31533152 // Add the new connection
31543153 connected_peer. connections . push ( connection_id) ;
31553154
@@ -3399,7 +3398,7 @@ fn peer_added_to_mesh(
33993398 new_topics : Vec < & TopicHash > ,
34003399 mesh : & HashMap < TopicHash , BTreeSet < PeerId > > ,
34013400 events : & mut VecDeque < ToSwarm < Event , HandlerIn > > ,
3402- connections : & HashMap < PeerId , PeerConnections > ,
3401+ connections : & HashMap < PeerId , PeerDetails > ,
34033402) {
34043403 // Ensure there is an active connection
34053404 let connection_id = match connections. get ( & peer_id) {
@@ -3441,7 +3440,7 @@ fn peer_removed_from_mesh(
34413440 old_topic : & TopicHash ,
34423441 mesh : & HashMap < TopicHash , BTreeSet < PeerId > > ,
34433442 events : & mut VecDeque < ToSwarm < Event , HandlerIn > > ,
3444- connections : & HashMap < PeerId , PeerConnections > ,
3443+ connections : & HashMap < PeerId , PeerDetails > ,
34453444) {
34463445 // Ensure there is an active connection
34473446 let connection_id = match connections. get ( & peer_id) {
@@ -3479,7 +3478,7 @@ fn peer_removed_from_mesh(
34793478/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
34803479/// that gets as input the number of filtered peers.
34813480fn get_random_peers_dynamic (
3482- connected_peers : & HashMap < PeerId , PeerConnections > ,
3481+ connected_peers : & HashMap < PeerId , PeerDetails > ,
34833482 topic_hash : & TopicHash ,
34843483 // maps the number of total peers to the number of selected peers
34853484 n_map : impl Fn ( usize ) -> usize ,
@@ -3512,7 +3511,7 @@ fn get_random_peers_dynamic(
35123511/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
35133512/// filtered by the function `f`.
35143513fn get_random_peers (
3515- connected_peers : & HashMap < PeerId , PeerConnections > ,
3514+ connected_peers : & HashMap < PeerId , PeerDetails > ,
35163515 topic_hash : & TopicHash ,
35173516 n : usize ,
35183517 f : impl FnMut ( & PeerId ) -> bool ,
0 commit comments