Skip to content

Commit 0692179

Browse files
committed
Pass proto-layer endpoint events internally
Groundwork for replacing generic message-passing with lighter event-representation-aware communication strategies.
1 parent 302b336 commit 0692179

File tree

8 files changed

+115
-101
lines changed

8 files changed

+115
-101
lines changed

quinn-proto/src/connection/mod.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
convert::TryFrom,
55
fmt, io, mem,
66
net::{IpAddr, SocketAddr},
7-
sync::Arc,
7+
sync::{mpsc, Arc},
88
time::{Duration, Instant},
99
};
1010

@@ -24,14 +24,11 @@ use crate::{
2424
frame::{Close, Datagram, FrameStruct},
2525
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
2626
range_set::ArrayRangeSet,
27-
shared::{
28-
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
29-
EndpointEventInner,
30-
},
27+
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent},
3128
token::ResetToken,
3229
transport_parameters::TransportParameters,
33-
Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError, TransportErrorCode,
34-
VarInt, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, TIMER_GRANULARITY,
30+
ConnectionHandle, Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError,
31+
TransportErrorCode, VarInt, MAX_STREAM_COUNT, MIN_INITIAL_SIZE, TIMER_GRANULARITY,
3532
};
3633

3734
mod ack_frequency;
@@ -162,7 +159,7 @@ pub struct Connection {
162159
/// Total number of outgoing packets that have been deemed lost
163160
lost_packets: u64,
164161
events: VecDeque<Event>,
165-
endpoint_events: VecDeque<EndpointEventInner>,
162+
endpoint_events: EndpointEvents,
166163
/// Whether the spin bit is in use for this connection
167164
spin_enabled: bool,
168165
/// Outgoing spin bit state
@@ -253,6 +250,7 @@ impl Connection {
253250
version: u32,
254251
allow_mtud: bool,
255252
rng_seed: [u8; 32],
253+
endpoint_events: EndpointEvents,
256254
) -> Self {
257255
let side = if server_config.is_some() {
258256
Side::Server
@@ -314,7 +312,7 @@ impl Connection {
314312
retry_src_cid: None,
315313
lost_packets: 0,
316314
events: VecDeque::new(),
317-
endpoint_events: VecDeque::new(),
315+
endpoint_events,
318316
spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
319317
spin: false,
320318
spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
@@ -407,10 +405,10 @@ impl Connection {
407405
None
408406
}
409407

410-
/// Return endpoint-facing events
408+
/// Whether [`Endpoint::handle_events`] must be called in the immediate future
411409
#[must_use]
412-
pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
413-
self.endpoint_events.pop_front().map(EndpointEvent)
410+
pub fn poll_endpoint_events(&mut self) -> bool {
411+
mem::take(&mut self.endpoint_events.dirty)
414412
}
415413

416414
/// Provide control over streams
@@ -954,8 +952,8 @@ impl Connection {
954952
/// Process `ConnectionEvent`s generated by the associated `Endpoint`
955953
///
956954
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
957-
/// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
958-
/// extracted through the relevant methods.
955+
/// (including application `Event`s, endpoint events, and outgoing datagrams) that should be
956+
/// checked through the relevant methods.
959957
pub fn handle_event(&mut self, event: ConnectionEvent, now: Instant) {
960958
use self::ConnectionEventInner::*;
961959
match event.0 {
@@ -1037,7 +1035,7 @@ impl Connection {
10371035
match timer {
10381036
Timer::Close => {
10391037
self.state = State::Drained;
1040-
self.endpoint_events.push_back(EndpointEventInner::Drained);
1038+
self.endpoint_events.push(EndpointEvent::Drained);
10411039
}
10421040
Timer::Idle => {
10431041
self.kill(ConnectionError::TimedOut);
@@ -1071,7 +1069,7 @@ impl Connection {
10711069
self.local_cid_state.retire_prior_to()
10721070
);
10731071
self.endpoint_events
1074-
.push_back(EndpointEventInner::NeedIdentifiers(num_new_cid));
1072+
.push(EndpointEvent::NeedIdentifiers(num_new_cid));
10751073
}
10761074
}
10771075
Timer::MaxAckDelay => {
@@ -2168,7 +2166,7 @@ impl Connection {
21682166
}
21692167
}
21702168
if !was_drained && self.state.is_drained() {
2171-
self.endpoint_events.push_back(EndpointEventInner::Drained);
2169+
self.endpoint_events.push(EndpointEvent::Drained);
21722170
// Close timer may have been started previously, e.g. if we sent a close and got a
21732171
// stateless reset in response
21742172
self.timers.stop(Timer::Close);
@@ -2351,7 +2349,7 @@ impl Connection {
23512349
}
23522350
if let Some(token) = params.stateless_reset_token {
23532351
self.endpoint_events
2354-
.push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2352+
.push(EndpointEvent::ResetToken(self.path.remote, token));
23552353
}
23562354
self.handle_peer_params(params)?;
23572355
self.issue_first_cids();
@@ -2661,10 +2659,7 @@ impl Connection {
26612659
.local_cid_state
26622660
.on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
26632661
self.endpoint_events
2664-
.push_back(EndpointEventInner::RetireConnectionId(
2665-
sequence,
2666-
allow_more_cids,
2667-
));
2662+
.push(EndpointEvent::RetireConnectionId(sequence, allow_more_cids));
26682663
}
26692664
Frame::NewConnectionId(frame) => {
26702665
trace!(
@@ -2880,10 +2875,7 @@ impl Connection {
28802875

28812876
fn set_reset_token(&mut self, reset_token: ResetToken) {
28822877
self.endpoint_events
2883-
.push_back(EndpointEventInner::ResetToken(
2884-
self.path.remote,
2885-
reset_token,
2886-
));
2878+
.push(EndpointEvent::ResetToken(self.path.remote, reset_token));
28872879
self.peer_params.stateless_reset_token = Some(reset_token);
28882880
}
28892881

@@ -2895,8 +2887,7 @@ impl Connection {
28952887

28962888
// Subtract 1 to account for the CID we supplied while handshaking
28972889
let n = self.peer_params.issue_cids_limit() - 1;
2898-
self.endpoint_events
2899-
.push_back(EndpointEventInner::NeedIdentifiers(n));
2890+
self.endpoint_events.push(EndpointEvent::NeedIdentifiers(n));
29002891
}
29012892

29022893
fn populate_packet(
@@ -3373,8 +3364,7 @@ impl Connection {
33733364
#[cfg(test)]
33743365
pub(crate) fn rotate_local_cid(&mut self, v: u64) {
33753366
let n = self.local_cid_state.assign_retire_seq(v);
3376-
self.endpoint_events
3377-
.push_back(EndpointEventInner::NeedIdentifiers(n));
3367+
self.endpoint_events.push(EndpointEvent::NeedIdentifiers(n));
33783368
}
33793369

33803370
/// Check the current active remote CID sequence
@@ -3415,7 +3405,7 @@ impl Connection {
34153405
self.close_common();
34163406
self.error = Some(reason);
34173407
self.state = State::Drained;
3418-
self.endpoint_events.push_back(EndpointEventInner::Drained);
3408+
self.endpoint_events.push(EndpointEvent::Drained);
34193409
}
34203410

34213411
/// Storage size required for the largest packet known to be supported by the current path
@@ -3650,3 +3640,28 @@ impl SentFrames {
36503640
&& self.retransmits.is_empty(streams)
36513641
}
36523642
}
3643+
3644+
pub(crate) struct EndpointEvents {
3645+
ch: ConnectionHandle,
3646+
sender: mpsc::Sender<(ConnectionHandle, EndpointEvent)>,
3647+
dirty: bool,
3648+
}
3649+
3650+
impl EndpointEvents {
3651+
pub(crate) fn new(
3652+
ch: ConnectionHandle,
3653+
sender: mpsc::Sender<(ConnectionHandle, EndpointEvent)>,
3654+
) -> Self {
3655+
Self {
3656+
ch,
3657+
sender,
3658+
dirty: false,
3659+
}
3660+
}
3661+
3662+
fn push(&mut self, event: EndpointEvent) {
3663+
// If the endpoint has gone away, assume the caller is winding down regardless.
3664+
_ = self.sender.send((self.ch, event));
3665+
self.dirty = true;
3666+
}
3667+
}

quinn-proto/src/endpoint.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
fmt, iter,
55
net::{IpAddr, SocketAddr},
66
ops::{Index, IndexMut},
7-
sync::Arc,
7+
sync::{mpsc, Arc},
88
time::{Instant, SystemTime},
99
};
1010

@@ -19,13 +19,12 @@ use crate::{
1919
cid_generator::{ConnectionIdGenerator, RandomConnectionIdGenerator},
2020
coding::BufMutExt,
2121
config::{ClientConfig, EndpointConfig, ServerConfig},
22-
connection::{Connection, ConnectionError},
22+
connection::{Connection, ConnectionError, EndpointEvents},
2323
crypto::{self, Keys, UnsupportedVersion},
2424
frame,
2525
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
2626
shared::{
27-
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
28-
EndpointEventInner, IssuedCid,
27+
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent, IssuedCid,
2928
},
3029
transport_parameters::TransportParameters,
3130
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
@@ -45,6 +44,8 @@ pub struct Endpoint {
4544
server_config: Option<Arc<ServerConfig>>,
4645
/// Whether the underlying UDP socket promises not to fragment packets
4746
allow_mtud: bool,
47+
event_send: mpsc::Sender<(ConnectionHandle, EndpointEvent)>,
48+
event_recv: mpsc::Receiver<(ConnectionHandle, EndpointEvent)>,
4849
}
4950

5051
impl Endpoint {
@@ -59,6 +60,7 @@ impl Endpoint {
5960
allow_mtud: bool,
6061
rng_seed: Option<[u8; 32]>,
6162
) -> Self {
63+
let (event_send, event_recv) = mpsc::channel();
6264
Self {
6365
rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
6466
index: ConnectionIndex::default(),
@@ -67,6 +69,8 @@ impl Endpoint {
6769
config,
6870
server_config,
6971
allow_mtud,
72+
event_send,
73+
event_recv,
7074
}
7175
}
7276

@@ -75,16 +79,25 @@ impl Endpoint {
7579
self.server_config = server_config;
7680
}
7781

78-
/// Process `EndpointEvent`s emitted from related `Connection`s
82+
/// Process events from [`Connection`]s that have returned `true` from [`Connection::poll_endpoint_events`]
7983
///
80-
/// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
81-
pub fn handle_event(
84+
/// May return a `ConnectionEvent` for any `Connection`. Call until `None` is returned.
85+
pub fn handle_events(&mut self) -> Option<(ConnectionHandle, ConnectionEvent)> {
86+
while let Ok((ch, event)) = self.event_recv.try_recv() {
87+
if let Some(response) = self.handle_event(ch, event) {
88+
return Some((ch, response));
89+
}
90+
}
91+
None
92+
}
93+
94+
fn handle_event(
8295
&mut self,
8396
ch: ConnectionHandle,
8497
event: EndpointEvent,
8598
) -> Option<ConnectionEvent> {
86-
use EndpointEventInner::*;
87-
match event.0 {
99+
use EndpointEvent::*;
100+
match event {
88101
NeedIdentifiers(n) => {
89102
return Some(self.send_new_identifiers(ch, n));
90103
}
@@ -564,7 +577,7 @@ impl Endpoint {
564577
}
565578
Err(e) => {
566579
debug!("handshake failed: {}", e);
567-
self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
580+
self.handle_event(ch, EndpointEvent::Drained);
568581
match e {
569582
ConnectionError::TransportError(e) => Some(DatagramEvent::Response(
570583
self.initial_close(version, addresses, crypto, &src_cid, e, buf),
@@ -605,6 +618,7 @@ impl Endpoint {
605618
version,
606619
self.allow_mtud,
607620
rng_seed,
621+
EndpointEvents::new(ch, self.event_send.clone()),
608622
);
609623

610624
let id = self.connections.insert(ConnectionMeta {

quinn-proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ mod endpoint;
6464
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};
6565

6666
mod shared;
67-
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent};
67+
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};
6868

6969
mod transport_error;
7070
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};

quinn-proto/src/shared.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,8 @@ pub(crate) enum ConnectionEventInner {
2222
NewIdentifiers(Vec<IssuedCid>),
2323
}
2424

25-
/// Events sent from a Connection to an Endpoint
26-
#[derive(Debug)]
27-
pub struct EndpointEvent(pub(crate) EndpointEventInner);
28-
29-
impl EndpointEvent {
30-
/// Construct an event that indicating that a `Connection` will no longer emit events
31-
///
32-
/// Useful for notifying an `Endpoint` that a `Connection` has been destroyed outside of the
33-
/// usual state machine flow, e.g. when being dropped by the user.
34-
pub fn drained() -> Self {
35-
Self(EndpointEventInner::Drained)
36-
}
37-
38-
/// Determine whether this is the last event a `Connection` will emit
39-
///
40-
/// Useful for determining when connection-related event loop state can be freed.
41-
pub fn is_drained(&self) -> bool {
42-
self.0 == EndpointEventInner::Drained
43-
}
44-
}
45-
4625
#[derive(Clone, Debug, Eq, PartialEq)]
47-
pub(crate) enum EndpointEventInner {
26+
pub(crate) enum EndpointEvent {
4827
/// The connection has been drained
4928
Drained,
5029
/// The reset token and/or address eligible for generating resets has been updated

quinn-proto/src/tests/util.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ impl TestEndpoint {
362362
}
363363

364364
loop {
365-
let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![];
366-
for (ch, conn) in self.connections.iter_mut() {
365+
let mut endpoint_events = false;
366+
for (_, conn) in self.connections.iter_mut() {
367367
if self.timeout.map_or(false, |x| x <= now) {
368368
self.timeout = None;
369369
conn.handle_timeout(now);
@@ -375,9 +375,7 @@ impl TestEndpoint {
375375
}
376376
}
377377

378-
while let Some(event) = conn.poll_endpoint_events() {
379-
endpoint_events.push((*ch, event));
380-
}
378+
endpoint_events |= conn.poll_endpoint_events();
381379
while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) {
382380
let size = transmit.size;
383381
self.outbound
@@ -386,15 +384,13 @@ impl TestEndpoint {
386384
self.timeout = conn.poll_timeout();
387385
}
388386

389-
if endpoint_events.is_empty() {
387+
if !endpoint_events {
390388
break;
391389
}
392390

393-
for (ch, event) in endpoint_events {
394-
if let Some(event) = self.handle_event(ch, event) {
395-
if let Some(conn) = self.connections.get_mut(&ch) {
396-
conn.handle_event(event, now);
397-
}
391+
while let Some((ch, event)) = self.handle_events() {
392+
if let Some(conn) = self.connections.get_mut(&ch) {
393+
conn.handle_event(event, now);
398394
}
399395
}
400396
}

0 commit comments

Comments
 (0)