@@ -61,7 +61,7 @@ use crate::{
61
61
mcache:: MessageCache ,
62
62
peer_score:: { PeerScore , PeerScoreParams , PeerScoreState , PeerScoreThresholds , RejectReason } ,
63
63
protocol:: SIGNING_PREFIX ,
64
- rpc :: Sender ,
64
+ queue :: Queue ,
65
65
rpc_proto:: proto,
66
66
subscription_filter:: { AllowAllSubscriptionFilter , TopicSubscriptionFilter } ,
67
67
time_cache:: DuplicateCache ,
@@ -751,6 +751,7 @@ where
751
751
if self . send_message (
752
752
* peer_id,
753
753
RpcOut :: Publish {
754
+ message_id : msg_id. clone ( ) ,
754
755
message : raw_message. clone ( ) ,
755
756
timeout : Delay :: new ( self . config . publish_queue_duration ( ) ) ,
756
757
} ,
@@ -1341,6 +1342,7 @@ where
1341
1342
self . send_message (
1342
1343
* peer_id,
1343
1344
RpcOut :: Forward {
1345
+ message_id : id. clone ( ) ,
1344
1346
message : msg,
1345
1347
timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
1346
1348
} ,
@@ -2081,9 +2083,9 @@ where
2081
2083
// steady-state size of the queues.
2082
2084
#[ cfg( feature = "metrics" ) ]
2083
2085
if let Some ( m) = & mut self . metrics {
2084
- for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. sender ) {
2085
- m. observe_priority_queue_size ( sender_queue. priority_queue_len ( ) ) ;
2086
- m. observe_non_priority_queue_size ( sender_queue. non_priority_queue_len ( ) ) ;
2086
+ for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. messages ) {
2087
+ m. observe_priority_queue_size ( sender_queue. priority_len ( ) ) ;
2088
+ m. observe_non_priority_queue_size ( sender_queue. non_priority_len ( ) ) ;
2087
2089
}
2088
2090
}
2089
2091
@@ -2499,6 +2501,11 @@ where
2499
2501
// Report expired messages
2500
2502
for ( peer_id, failed_messages) in self . failed_messages . drain ( ) {
2501
2503
tracing:: debug!( "Peer couldn't consume messages: {:?}" , failed_messages) ;
2504
+ #[ cfg( feature = "metrics" ) ]
2505
+ if let Some ( metrics) = self . metrics . as_mut ( ) {
2506
+ metrics. observe_failed_priority_messages ( failed_messages. priority ) ;
2507
+ metrics. observe_failed_non_priority_messages ( failed_messages. non_priority ) ;
2508
+ }
2502
2509
self . events
2503
2510
. push_back ( ToSwarm :: GenerateEvent ( Event :: SlowPeer {
2504
2511
peer_id,
@@ -2746,6 +2753,7 @@ where
2746
2753
self . send_message (
2747
2754
* peer_id,
2748
2755
RpcOut :: Forward {
2756
+ message_id : msg_id. clone ( ) ,
2749
2757
message : message. clone ( ) ,
2750
2758
timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
2751
2759
} ,
@@ -2874,33 +2882,20 @@ where
2874
2882
return false ;
2875
2883
}
2876
2884
2877
- // Try sending the message to the connection handler.
2878
- match peer. sender . send_message ( rpc) {
2885
+ // Try sending the message to the connection handler,
2886
+ // High priority messages should not fail.
2887
+ match peer. messages . try_push ( rpc) {
2879
2888
Ok ( ( ) ) => true ,
2880
2889
Err ( rpc) => {
2881
2890
// Sending failed because the channel is full.
2882
2891
tracing:: warn!( peer=%peer_id, "Send Queue full. Could not send {:?}." , rpc) ;
2883
2892
2884
2893
// Update failed message counter.
2885
2894
let failed_messages = self . failed_messages . entry ( peer_id) . or_default ( ) ;
2886
- match rpc {
2887
- RpcOut :: Publish { .. } => {
2888
- failed_messages. priority += 1 ;
2889
- failed_messages. publish += 1 ;
2890
- }
2891
- RpcOut :: Forward { .. } => {
2892
- failed_messages. non_priority += 1 ;
2893
- failed_messages. forward += 1 ;
2894
- }
2895
- RpcOut :: IWant ( _) | RpcOut :: IHave ( _) | RpcOut :: IDontWant ( _) => {
2896
- failed_messages. non_priority += 1 ;
2897
- }
2898
- RpcOut :: Graft ( _)
2899
- | RpcOut :: Prune ( _)
2900
- | RpcOut :: Subscribe ( _)
2901
- | RpcOut :: Unsubscribe ( _) => {
2902
- unreachable ! ( "Channel for highpriority control messages is unbounded and should always be open." )
2903
- }
2895
+ if rpc. priority ( ) {
2896
+ failed_messages. priority += 1 ;
2897
+ } else {
2898
+ failed_messages. non_priority += 1 ;
2904
2899
}
2905
2900
2906
2901
// Update peer score.
@@ -3125,23 +3120,22 @@ where
3125
3120
// The protocol negotiation occurs once a message is sent/received. Once this happens we
3126
3121
// update the type of peer that this is in order to determine which kind of routing should
3127
3122
// occur.
3128
- let connected_peer = self
3129
- . connected_peers
3130
- . entry ( peer_id)
3131
- . or_insert_with ( || PeerDetails {
3132
- kind : PeerKind :: Floodsub ,
3133
- connections : vec ! [ ] ,
3134
- outbound : false ,
3135
- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3136
- topics : Default :: default ( ) ,
3137
- dont_send : LinkedHashMap :: new ( ) ,
3138
- } ) ;
3123
+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3124
+ kind : PeerKind :: Floodsub ,
3125
+ connections : vec ! [ ] ,
3126
+ outbound : false ,
3127
+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
3128
+ topics : Default :: default ( ) ,
3129
+ dont_send : LinkedHashMap :: new ( ) ,
3130
+ } ) ;
3139
3131
// Add the new connection
3140
3132
connected_peer. connections . push ( connection_id) ;
3141
3133
3134
+ // This clones a reference to the Queue so any new handlers reference the same underlying
3135
+ // queue. No data is actually cloned here.
3142
3136
Ok ( Handler :: new (
3143
3137
self . config . protocol_config ( ) ,
3144
- connected_peer. sender . new_receiver ( ) ,
3138
+ connected_peer. messages . clone ( ) ,
3145
3139
) )
3146
3140
}
3147
3141
@@ -3153,25 +3147,24 @@ where
3153
3147
_: Endpoint ,
3154
3148
_: PortUse ,
3155
3149
) -> Result < THandler < Self > , ConnectionDenied > {
3156
- let connected_peer = self
3157
- . connected_peers
3158
- . entry ( peer_id)
3159
- . or_insert_with ( || PeerDetails {
3160
- kind : PeerKind :: Floodsub ,
3161
- connections : vec ! [ ] ,
3162
- // Diverging from the go implementation we only want to consider a peer as outbound
3163
- // peer if its first connection is outbound.
3164
- outbound : !self . px_peers . contains ( & peer_id) ,
3165
- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3166
- topics : Default :: default ( ) ,
3167
- dont_send : LinkedHashMap :: new ( ) ,
3168
- } ) ;
3150
+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3151
+ kind : PeerKind :: Floodsub ,
3152
+ connections : vec ! [ ] ,
3153
+ // Diverging from the go implementation we only want to consider a peer as outbound peer
3154
+ // if its first connection is outbound.
3155
+ outbound : !self . px_peers . contains ( & peer_id) ,
3156
+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
3157
+ topics : Default :: default ( ) ,
3158
+ dont_send : LinkedHashMap :: new ( ) ,
3159
+ } ) ;
3169
3160
// Add the new connection
3170
3161
connected_peer. connections . push ( connection_id) ;
3171
3162
3163
+ // This clones a reference to the Queue so any new handlers reference the same underlying
3164
+ // queue. No data is actually cloned here.
3172
3165
Ok ( Handler :: new (
3173
3166
self . config . protocol_config ( ) ,
3174
- connected_peer. sender . new_receiver ( ) ,
3167
+ connected_peer. messages . clone ( ) ,
3175
3168
) )
3176
3169
}
3177
3170
@@ -3213,6 +3206,8 @@ where
3213
3206
}
3214
3207
}
3215
3208
}
3209
+ // rpc is only used for metrics code.
3210
+ #[ allow( unused_variables) ]
3216
3211
HandlerEvent :: MessageDropped ( rpc) => {
3217
3212
// Account for this in the scoring logic
3218
3213
if let PeerScoreState :: Active ( peer_score) = & mut self . peer_score {
@@ -3221,32 +3216,7 @@ where
3221
3216
3222
3217
// Keep track of expired messages for the application layer.
3223
3218
let failed_messages = self . failed_messages . entry ( propagation_source) . or_default ( ) ;
3224
- failed_messages. timeout += 1 ;
3225
- match rpc {
3226
- RpcOut :: Publish { .. } => {
3227
- failed_messages. publish += 1 ;
3228
- }
3229
- RpcOut :: Forward { .. } => {
3230
- failed_messages. forward += 1 ;
3231
- }
3232
- _ => { }
3233
- }
3234
-
3235
- // Record metrics on the failure.
3236
- #[ cfg( feature = "metrics" ) ]
3237
- if let Some ( metrics) = self . metrics . as_mut ( ) {
3238
- match rpc {
3239
- RpcOut :: Publish { message, .. } => {
3240
- metrics. publish_msg_dropped ( & message. topic ) ;
3241
- metrics. timeout_msg_dropped ( & message. topic ) ;
3242
- }
3243
- RpcOut :: Forward { message, .. } => {
3244
- metrics. forward_msg_dropped ( & message. topic ) ;
3245
- metrics. timeout_msg_dropped ( & message. topic ) ;
3246
- }
3247
- _ => { }
3248
- }
3249
- }
3219
+ failed_messages. non_priority += 1 ;
3250
3220
}
3251
3221
HandlerEvent :: Message {
3252
3222
rpc,
@@ -3345,10 +3315,17 @@ where
3345
3315
"Could not handle IDONTWANT, peer doesn't exist in connected peer list" ) ;
3346
3316
continue ;
3347
3317
} ;
3318
+
3319
+ // Remove messages from the queue.
3320
+ #[ allow( unused) ]
3321
+ let removed = peer. messages . remove_data_messages ( & message_ids) ;
3322
+
3348
3323
#[ cfg( feature = "metrics" ) ]
3349
3324
if let Some ( metrics) = self . metrics . as_mut ( ) {
3350
3325
metrics. register_idontwant ( message_ids. len ( ) ) ;
3326
+ metrics. register_removed_messages ( removed) ;
3351
3327
}
3328
+
3352
3329
for message_id in message_ids {
3353
3330
peer. dont_send . insert ( message_id, Instant :: now ( ) ) ;
3354
3331
// Don't exceed capacity.
0 commit comments