@@ -969,12 +969,6 @@ where
969969 fn join ( & mut self , topic_hash : & TopicHash ) {
970970 tracing:: debug!( topic=%topic_hash, "Running JOIN for topic" ) ;
971971
972- // if we are already in the mesh, return
973- if self . mesh . contains_key ( topic_hash) {
974- tracing:: debug!( topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN" ) ;
975- return ;
976- }
977-
978972 let mut added_peers = HashSet :: new ( ) ;
979973 let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
980974 if let Some ( m) = self . metrics . as_mut ( ) {
@@ -1015,9 +1009,9 @@ where
10151009 self . fanout_last_pub . remove ( topic_hash) ;
10161010 }
10171011
1018- let fanaout_added = added_peers. len ( ) ;
1012+ let fanout_added = added_peers. len ( ) ;
10191013 if let Some ( m) = self . metrics . as_mut ( ) {
1020- m. peers_included ( topic_hash, Inclusion :: Fanout , fanaout_added )
1014+ m. peers_included ( topic_hash, Inclusion :: Fanout , fanout_added )
10211015 }
10221016
10231017 // check if we need to get more peers, which we randomly select
@@ -1045,7 +1039,7 @@ where
10451039 mesh_peers. extend ( new_peers) ;
10461040 }
10471041
1048- let random_added = added_peers. len ( ) - fanaout_added ;
1042+ let random_added = added_peers. len ( ) - fanout_added ;
10491043 if let Some ( m) = self . metrics . as_mut ( ) {
10501044 m. peers_included ( topic_hash, Inclusion :: Random , random_added)
10511045 }
@@ -1251,14 +1245,6 @@ where
12511245
12521246 let mut iwant_ids = HashSet :: new ( ) ;
12531247
1254- let want_message = |id : & MessageId | {
1255- if self . duplicate_cache . contains ( id) {
1256- return false ;
1257- }
1258-
1259- !self . gossip_promises . contains ( id)
1260- } ;
1261-
12621248 for ( topic, ids) in ihave_msgs {
12631249 // only process the message if we are subscribed
12641250 if !self . mesh . contains_key ( & topic) {
@@ -1269,7 +1255,13 @@ where
12691255 continue ;
12701256 }
12711257
1272- for id in ids. into_iter ( ) . filter ( want_message) {
1258+ for id in ids. into_iter ( ) . filter ( |id| {
1259+ if self . duplicate_cache . contains ( id) {
1260+ return false ;
1261+ }
1262+
1263+ !self . gossip_promises . contains ( id)
1264+ } ) {
12731265 // have not seen this message and are not currently requesting it
12741266 if iwant_ids. insert ( id) {
12751267 // Register the IWANT metric
@@ -2158,7 +2150,7 @@ where
21582150 topic=%topic_hash,
21592151 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}" ,
21602152 peers. len( ) ,
2161- mesh_n_low
2153+ self . config . mesh_n ( )
21622154 ) ;
21632155 // not enough peers - get mesh_n - current_length more
21642156 let desired_peers = mesh_n - peers. len ( ) ;
@@ -2185,9 +2177,9 @@ where
21852177 if peers. len ( ) > mesh_n_high {
21862178 tracing:: debug!(
21872179 topic=%topic_hash,
2188- "HEARTBEAT: Mesh high. Topic contains: {} needs : {}" ,
2180+ "HEARTBEAT: Mesh high. Topic contains: {} will reduce to : {}" ,
21892181 peers. len( ) ,
2190- mesh_n_high
2182+ self . config . mesh_n ( )
21912183 ) ;
21922184 let excess_peer_no = peers. len ( ) - mesh_n;
21932185
0 commit comments