@@ -4,7 +4,7 @@ use std::{
4
4
convert:: TryFrom ,
5
5
fmt, io, mem,
6
6
net:: { IpAddr , SocketAddr } ,
7
- sync:: Arc ,
7
+ sync:: { mpsc , Arc } ,
8
8
time:: { Duration , Instant } ,
9
9
} ;
10
10
@@ -24,14 +24,11 @@ use crate::{
24
24
frame:: { Close , Datagram , FrameStruct } ,
25
25
packet:: { Header , LongType , Packet , PartialDecode , SpaceId } ,
26
26
range_set:: ArrayRangeSet ,
27
- shared:: {
28
- ConnectionEvent , ConnectionEventInner , ConnectionId , EcnCodepoint , EndpointEvent ,
29
- EndpointEventInner ,
30
- } ,
27
+ shared:: { ConnectionEvent , ConnectionEventInner , ConnectionId , EcnCodepoint , EndpointEvent } ,
31
28
token:: ResetToken ,
32
29
transport_parameters:: TransportParameters ,
33
- Dir , EndpointConfig , Frame , Side , StreamId , Transmit , TransportError , TransportErrorCode ,
34
- VarInt , MAX_STREAM_COUNT , MIN_INITIAL_SIZE , TIMER_GRANULARITY ,
30
+ ConnectionHandle , Dir , EndpointConfig , Frame , Side , StreamId , Transmit , TransportError ,
31
+ TransportErrorCode , VarInt , MAX_STREAM_COUNT , MIN_INITIAL_SIZE , TIMER_GRANULARITY ,
35
32
} ;
36
33
37
34
mod ack_frequency;
@@ -162,7 +159,7 @@ pub struct Connection {
162
159
/// Total number of outgoing packets that have been deemed lost
163
160
lost_packets : u64 ,
164
161
events : VecDeque < Event > ,
165
- endpoint_events : VecDeque < EndpointEventInner > ,
162
+ endpoint_events : EndpointEvents ,
166
163
/// Whether the spin bit is in use for this connection
167
164
spin_enabled : bool ,
168
165
/// Outgoing spin bit state
@@ -253,6 +250,7 @@ impl Connection {
253
250
version : u32 ,
254
251
allow_mtud : bool ,
255
252
rng_seed : [ u8 ; 32 ] ,
253
+ endpoint_events : EndpointEvents ,
256
254
) -> Self {
257
255
let side = if server_config. is_some ( ) {
258
256
Side :: Server
@@ -314,7 +312,7 @@ impl Connection {
314
312
retry_src_cid : None ,
315
313
lost_packets : 0 ,
316
314
events : VecDeque :: new ( ) ,
317
- endpoint_events : VecDeque :: new ( ) ,
315
+ endpoint_events,
318
316
spin_enabled : config. allow_spin && rng. gen_ratio ( 7 , 8 ) ,
319
317
spin : false ,
320
318
spaces : [ initial_space, PacketSpace :: new ( now) , PacketSpace :: new ( now) ] ,
@@ -407,10 +405,10 @@ impl Connection {
407
405
None
408
406
}
409
407
410
- /// Return endpoint-facing events
408
+ /// Whether [`Endpoint::handle_events`] must be called in the immediate future
411
409
#[ must_use]
412
- pub fn poll_endpoint_events ( & mut self ) -> Option < EndpointEvent > {
413
- self . endpoint_events . pop_front ( ) . map ( EndpointEvent )
410
+ pub fn poll_endpoint_events ( & mut self ) -> bool {
411
+ mem :: take ( & mut self . endpoint_events . dirty )
414
412
}
415
413
416
414
/// Provide control over streams
@@ -954,8 +952,8 @@ impl Connection {
954
952
/// Process `ConnectionEvent`s generated by the associated `Endpoint`
955
953
///
956
954
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
957
- /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
958
- /// extracted through the relevant methods.
955
+ /// (including application `Event`s, endpoint events, and outgoing datagrams) that should be
956
+ /// checked through the relevant methods.
959
957
pub fn handle_event ( & mut self , event : ConnectionEvent , now : Instant ) {
960
958
use self :: ConnectionEventInner :: * ;
961
959
match event. 0 {
@@ -1037,7 +1035,7 @@ impl Connection {
1037
1035
match timer {
1038
1036
Timer :: Close => {
1039
1037
self . state = State :: Drained ;
1040
- self . endpoint_events . push_back ( EndpointEventInner :: Drained ) ;
1038
+ self . endpoint_events . push ( EndpointEvent :: Drained ) ;
1041
1039
}
1042
1040
Timer :: Idle => {
1043
1041
self . kill ( ConnectionError :: TimedOut ) ;
@@ -1071,7 +1069,7 @@ impl Connection {
1071
1069
self . local_cid_state. retire_prior_to( )
1072
1070
) ;
1073
1071
self . endpoint_events
1074
- . push_back ( EndpointEventInner :: NeedIdentifiers ( num_new_cid) ) ;
1072
+ . push ( EndpointEvent :: NeedIdentifiers ( num_new_cid) ) ;
1075
1073
}
1076
1074
}
1077
1075
Timer :: MaxAckDelay => {
@@ -2168,7 +2166,7 @@ impl Connection {
2168
2166
}
2169
2167
}
2170
2168
if !was_drained && self . state . is_drained ( ) {
2171
- self . endpoint_events . push_back ( EndpointEventInner :: Drained ) ;
2169
+ self . endpoint_events . push ( EndpointEvent :: Drained ) ;
2172
2170
// Close timer may have been started previously, e.g. if we sent a close and got a
2173
2171
// stateless reset in response
2174
2172
self . timers . stop ( Timer :: Close ) ;
@@ -2351,7 +2349,7 @@ impl Connection {
2351
2349
}
2352
2350
if let Some ( token) = params. stateless_reset_token {
2353
2351
self . endpoint_events
2354
- . push_back ( EndpointEventInner :: ResetToken ( self . path . remote , token) ) ;
2352
+ . push ( EndpointEvent :: ResetToken ( self . path . remote , token) ) ;
2355
2353
}
2356
2354
self . handle_peer_params ( params) ?;
2357
2355
self . issue_first_cids ( ) ;
@@ -2661,10 +2659,7 @@ impl Connection {
2661
2659
. local_cid_state
2662
2660
. on_cid_retirement ( sequence, self . peer_params . issue_cids_limit ( ) ) ?;
2663
2661
self . endpoint_events
2664
- . push_back ( EndpointEventInner :: RetireConnectionId (
2665
- sequence,
2666
- allow_more_cids,
2667
- ) ) ;
2662
+ . push ( EndpointEvent :: RetireConnectionId ( sequence, allow_more_cids) ) ;
2668
2663
}
2669
2664
Frame :: NewConnectionId ( frame) => {
2670
2665
trace ! (
@@ -2880,10 +2875,7 @@ impl Connection {
2880
2875
2881
2876
fn set_reset_token ( & mut self , reset_token : ResetToken ) {
2882
2877
self . endpoint_events
2883
- . push_back ( EndpointEventInner :: ResetToken (
2884
- self . path . remote ,
2885
- reset_token,
2886
- ) ) ;
2878
+ . push ( EndpointEvent :: ResetToken ( self . path . remote , reset_token) ) ;
2887
2879
self . peer_params . stateless_reset_token = Some ( reset_token) ;
2888
2880
}
2889
2881
@@ -2895,8 +2887,7 @@ impl Connection {
2895
2887
2896
2888
// Subtract 1 to account for the CID we supplied while handshaking
2897
2889
let n = self . peer_params . issue_cids_limit ( ) - 1 ;
2898
- self . endpoint_events
2899
- . push_back ( EndpointEventInner :: NeedIdentifiers ( n) ) ;
2890
+ self . endpoint_events . push ( EndpointEvent :: NeedIdentifiers ( n) ) ;
2900
2891
}
2901
2892
2902
2893
fn populate_packet (
@@ -3373,8 +3364,7 @@ impl Connection {
3373
3364
#[ cfg( test) ]
3374
3365
pub ( crate ) fn rotate_local_cid ( & mut self , v : u64 ) {
3375
3366
let n = self . local_cid_state . assign_retire_seq ( v) ;
3376
- self . endpoint_events
3377
- . push_back ( EndpointEventInner :: NeedIdentifiers ( n) ) ;
3367
+ self . endpoint_events . push ( EndpointEvent :: NeedIdentifiers ( n) ) ;
3378
3368
}
3379
3369
3380
3370
/// Check the current active remote CID sequence
@@ -3415,7 +3405,7 @@ impl Connection {
3415
3405
self . close_common ( ) ;
3416
3406
self . error = Some ( reason) ;
3417
3407
self . state = State :: Drained ;
3418
- self . endpoint_events . push_back ( EndpointEventInner :: Drained ) ;
3408
+ self . endpoint_events . push ( EndpointEvent :: Drained ) ;
3419
3409
}
3420
3410
3421
3411
/// Storage size required for the largest packet known to be supported by the current path
@@ -3650,3 +3640,28 @@ impl SentFrames {
3650
3640
&& self . retransmits . is_empty ( streams)
3651
3641
}
3652
3642
}
3643
+
3644
+ pub ( crate ) struct EndpointEvents {
3645
+ ch : ConnectionHandle ,
3646
+ sender : mpsc:: Sender < ( ConnectionHandle , EndpointEvent ) > ,
3647
+ dirty : bool ,
3648
+ }
3649
+
3650
+ impl EndpointEvents {
3651
+ pub ( crate ) fn new (
3652
+ ch : ConnectionHandle ,
3653
+ sender : mpsc:: Sender < ( ConnectionHandle , EndpointEvent ) > ,
3654
+ ) -> Self {
3655
+ Self {
3656
+ ch,
3657
+ sender,
3658
+ dirty : false ,
3659
+ }
3660
+ }
3661
+
3662
+ fn push ( & mut self , event : EndpointEvent ) {
3663
+ // If the endpoint has gone away, assume the caller is winding down regardless.
3664
+ _ = self . sender . send ( ( self . ch , event) ) ;
3665
+ self . dirty = true ;
3666
+ }
3667
+ }
0 commit comments