@@ -592,13 +592,19 @@ where
592592 // Transform the data before building a raw_message.
593593 let transformed_data = self
594594 . data_transform
595- . outbound_transform ( & topic, data. clone ( ) ) ?;
595+ . outbound_transform ( & topic. clone ( ) , data. clone ( ) ) ?;
596+
597+ let max_transmit_size_for_topic = self
598+ . config
599+ . protocol_config ( )
600+ . max_transmit_size_for_topic ( & topic) ;
596601
597602 // check that the size doesn't exceed the max transmission size.
598- if transformed_data. len ( ) > self . config . max_transmit_size ( ) {
603+ if transformed_data. len ( ) > max_transmit_size_for_topic {
599604 return Err ( PublishError :: MessageTooLarge ) ;
600605 }
601606
607+ let mesh_n = self . config . mesh_n_for_topic ( & topic) ;
602608 let raw_message = self . build_raw_message ( topic, transformed_data) ?;
603609
604610 // calculate the message id from the un-transformed data
@@ -648,7 +654,7 @@ where
648654 Some ( mesh_peers) => {
649655 // We have a mesh set. We want to make sure to publish to at least `mesh_n`
650656 // peers (if possible).
651- let needed_extra_peers = self . config . mesh_n ( ) . saturating_sub ( mesh_peers. len ( ) ) ;
657+ let needed_extra_peers = mesh_n. saturating_sub ( mesh_peers. len ( ) ) ;
652658
653659 if needed_extra_peers > 0 {
654660 // We don't have `mesh_n` peers in our mesh, we will randomly select extras
@@ -687,7 +693,6 @@ where
687693 }
688694 } else {
689695 // We have no fanout peers, select mesh_n of them and add them to the fanout
690- let mesh_n = self . config . mesh_n ( ) ;
691696 let new_peers =
692697 get_random_peers ( & self . connected_peers , & topic_hash, mesh_n, {
693698 |p| {
@@ -971,7 +976,7 @@ where
971976 }
972977
973978 let mut added_peers = HashSet :: new ( ) ;
974-
979+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash ) ;
975980 if let Some ( m) = self . metrics . as_mut ( ) {
976981 m. joined ( topic_hash)
977982 }
@@ -993,7 +998,7 @@ where
993998
994999 // Add up to mesh_n of them to the mesh
9951000 // NOTE: These aren't randomly added, currently FIFO
996- let add_peers = std:: cmp:: min ( peers. len ( ) , self . config . mesh_n ( ) ) ;
1001+ let add_peers = std:: cmp:: min ( peers. len ( ) , mesh_n) ;
9971002 tracing:: debug!(
9981003 topic=%topic_hash,
9991004 "JOIN: Adding {:?} peers from the fanout for topic" ,
@@ -1016,19 +1021,20 @@ where
10161021 }
10171022
10181023 // check if we need to get more peers, which we randomly select
1019- if added_peers. len ( ) < self . config . mesh_n ( ) {
1024+ if added_peers. len ( ) < mesh_n {
10201025 // get the peers
10211026 let new_peers = get_random_peers (
10221027 & self . connected_peers ,
10231028 topic_hash,
1024- self . config . mesh_n ( ) - added_peers. len ( ) ,
1029+ mesh_n - added_peers. len ( ) ,
10251030 |peer| {
10261031 !added_peers. contains ( peer)
10271032 && !self . explicit_peers . contains ( peer)
10281033 && !self . score_below_threshold ( peer, |_| 0.0 ) . 0
10291034 && !self . backoffs . is_backoff_with_slack ( topic_hash, peer)
10301035 } ,
10311036 ) ;
1037+
10321038 added_peers. extend ( new_peers. clone ( ) ) ;
10331039 // add them to the mesh
10341040 tracing:: debug!(
@@ -1468,9 +1474,9 @@ where
14681474
14691475 // check mesh upper bound and only allow graft if the upper bound is not reached
14701476 // or if it is an outbound peer
1471- if peers . len ( ) > = self . config . mesh_n_high ( )
1472- && ! self . outbound_peers . contains ( peer_id )
1473- {
1477+ let mesh_n_high = self . config . mesh_n_high_for_topic ( & topic_hash ) ;
1478+
1479+ if peers . len ( ) >= mesh_n_high && ! self . outbound_peers . contains ( peer_id ) {
14741480 to_prune_topics. insert ( topic_hash. clone ( ) ) ;
14751481 continue ;
14761482 }
@@ -1946,9 +1952,9 @@ where
19461952 . is_backoff_with_slack ( topic_hash, propagation_source)
19471953 {
19481954 if let Some ( peers) = self . mesh . get_mut ( topic_hash) {
1949- if peers . len ( ) < self . config . mesh_n_low ( )
1950- && peers . insert ( * propagation_source )
1951- {
1955+ let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash ) ;
1956+
1957+ if peers . len ( ) < mesh_n_low && peers . insert ( * propagation_source ) {
19521958 tracing:: debug!(
19531959 peer=%propagation_source,
19541960 topic=%topic_hash,
@@ -2102,6 +2108,11 @@ where
21022108 let backoffs = & self . backoffs ;
21032109 let outbound_peers = & self . outbound_peers ;
21042110
2111+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
2112+ let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash) ;
2113+ let mesh_n_high = self . config . mesh_n_high_for_topic ( topic_hash) ;
2114+ let mesh_outbound_min = self . config . mesh_outbound_min_for_topic ( topic_hash) ;
2115+
21052116 // drop all peers with negative score, without PX
21062117 // if there is at some point a stable retain method for BTreeSet the following can be
21072118 // written more efficiently with retain.
@@ -2138,15 +2149,15 @@ where
21382149 }
21392150
21402151 // too little peers - add some
2141- if peers. len ( ) < self . config . mesh_n_low ( ) {
2152+ if peers. len ( ) < mesh_n_low {
21422153 tracing:: debug!(
21432154 topic=%topic_hash,
21442155 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}" ,
21452156 peers. len( ) ,
2146- self . config . mesh_n_low( )
2157+ mesh_n_low
21472158 ) ;
21482159 // not enough peers - get mesh_n - current_length more
2149- let desired_peers = self . config . mesh_n ( ) - peers. len ( ) ;
2160+ let desired_peers = mesh_n - peers. len ( ) ;
21502161 let peer_list =
21512162 get_random_peers ( & self . connected_peers , topic_hash, desired_peers, |peer| {
21522163 !peers. contains ( peer)
@@ -2167,14 +2178,14 @@ where
21672178 }
21682179
21692180 // too many peers - remove some
2170- if peers. len ( ) > self . config . mesh_n_high ( ) {
2181+ if peers. len ( ) > mesh_n_high {
21712182 tracing:: debug!(
21722183 topic=%topic_hash,
21732184 "HEARTBEAT: Mesh high. Topic contains: {} needs: {}" ,
21742185 peers. len( ) ,
2175- self . config . mesh_n_high( )
2186+ mesh_n_high
21762187 ) ;
2177- let excess_peer_no = peers. len ( ) - self . config . mesh_n ( ) ;
2188+ let excess_peer_no = peers. len ( ) - mesh_n;
21782189
21792190 // shuffle the peers and then sort by score ascending beginning with the worst
21802191 let mut rng = thread_rng ( ) ;
@@ -2206,7 +2217,7 @@ where
22062217 break ;
22072218 }
22082219 if self . outbound_peers . contains ( & peer) {
2209- if outbound <= self . config . mesh_outbound_min ( ) {
2220+ if outbound <= mesh_outbound_min {
22102221 // do not remove anymore outbound peers
22112222 continue ;
22122223 }
@@ -2227,13 +2238,13 @@ where
22272238 }
22282239
22292240 // do we have enough outbound peers?
2230- if peers. len ( ) >= self . config . mesh_n_low ( ) {
2241+ if peers. len ( ) >= mesh_n_low {
22312242 // count number of outbound peers we have
22322243 let outbound = { peers. iter ( ) . filter ( |p| outbound_peers. contains ( * p) ) . count ( ) } ;
22332244
22342245 // if we have not enough outbound peers, graft to some new outbound peers
2235- if outbound < self . config . mesh_outbound_min ( ) {
2236- let needed = self . config . mesh_outbound_min ( ) - outbound;
2246+ if outbound < mesh_outbound_min {
2247+ let needed = mesh_outbound_min - outbound;
22372248 let peer_list =
22382249 get_random_peers ( & self . connected_peers , topic_hash, needed, |peer| {
22392250 !peers. contains ( peer)
@@ -2242,6 +2253,7 @@ where
22422253 && * scores. get ( peer) . unwrap_or ( & 0.0 ) >= 0.0
22432254 && outbound_peers. contains ( peer)
22442255 } ) ;
2256+
22452257 for peer in & peer_list {
22462258 let current_topic = to_graft. entry ( * peer) . or_insert_with ( Vec :: new) ;
22472259 current_topic. push ( topic_hash. clone ( ) ) ;
@@ -2356,6 +2368,8 @@ where
23562368 Some ( ( _, thresholds, _) ) => thresholds. publish_threshold ,
23572369 _ => 0.0 ,
23582370 } ;
2371+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
2372+
23592373 for peer_id in peers. iter ( ) {
23602374 // is the peer still subscribed to the topic?
23612375 let peer_score = * scores. get ( peer_id) . unwrap_or ( & 0.0 ) ;
@@ -2380,13 +2394,13 @@ where
23802394 }
23812395
23822396 // not enough peers
2383- if peers. len ( ) < self . config . mesh_n ( ) {
2397+ if peers. len ( ) < mesh_n {
23842398 tracing:: debug!(
23852399 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}" ,
23862400 peers. len( ) ,
2387- self . config . mesh_n( )
2401+ mesh_n
23882402 ) ;
2389- let needed_peers = self . config . mesh_n ( ) - peers. len ( ) ;
2403+ let needed_peers = mesh_n - peers. len ( ) ;
23902404 let explicit_peers = & self . explicit_peers ;
23912405 let new_peers =
23922406 get_random_peers ( & self . connected_peers , topic_hash, needed_peers, |peer_id| {
0 commit comments