Skip to content

Commit 3e7c8fd

Browse files
committed
Pass proto-layer connection events internally
Groundwork for replacing generic message-passing with lighter event-representation-aware communication strategies.
1 parent 253eba9 commit 3e7c8fd

File tree

9 files changed

+91
-78
lines changed

9 files changed

+91
-78
lines changed

quinn-proto/src/connection/mod.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
frame::{Close, Datagram, FrameStruct},
2525
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
2626
range_set::ArrayRangeSet,
27-
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent},
27+
shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent},
2828
token::ResetToken,
2929
transport_parameters::TransportParameters,
3030
ConnectionHandle, Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError,
@@ -128,6 +128,7 @@ pub struct Connection {
128128
server_config: Option<Arc<ServerConfig>>,
129129
config: Arc<TransportConfig>,
130130
rng: StdRng,
131+
connection_events: mpsc::Receiver<ConnectionEvent>,
131132
crypto: Box<dyn crypto::Session>,
132133
/// The CID we initially chose, for use during the handshake
133134
handshake_cid: ConnectionId,
@@ -251,6 +252,7 @@ impl Connection {
251252
allow_mtud: bool,
252253
rng_seed: [u8; 32],
253254
endpoint_events: EndpointEvents,
255+
connection_events: mpsc::Receiver<ConnectionEvent>,
254256
) -> Self {
255257
let side = if server_config.is_some() {
256258
Side::Server
@@ -271,6 +273,7 @@ impl Connection {
271273
let mut this = Self {
272274
endpoint_config,
273275
server_config,
276+
connection_events,
274277
crypto,
275278
handshake_cid: loc_cid,
276279
rem_handshake_cid: rem_cid,
@@ -949,14 +952,20 @@ impl Connection {
949952
SendableFrames::empty()
950953
}
951954

952-
/// Process `ConnectionEvent`s generated by the associated `Endpoint`
955+
/// Process events from the associated [`Endpoint`](crate::Endpoint)
953956
///
954957
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
955958
/// (including application `Event`s, endpoint events, and outgoing datagrams) that should be
956959
/// checked through the relevant methods.
957-
pub fn handle_event(&mut self, event: ConnectionEvent, now: Instant) {
958-
use self::ConnectionEventInner::*;
959-
match event.0 {
960+
pub fn handle_events(&mut self, now: Instant) {
961+
while let Ok(event) = self.connection_events.try_recv() {
962+
self.handle_event(event, now);
963+
}
964+
}
965+
966+
fn handle_event(&mut self, event: ConnectionEvent, now: Instant) {
967+
use self::ConnectionEvent::*;
968+
match event {
960969
Datagram {
961970
now,
962971
remote,
@@ -3274,22 +3283,9 @@ impl Connection {
32743283

32753284
/// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
32763285
#[cfg(test)]
3277-
pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3278-
let (first_decode, remaining) = match &event.0 {
3279-
ConnectionEventInner::Datagram {
3280-
first_decode,
3281-
remaining,
3282-
..
3283-
} => (first_decode, remaining),
3284-
_ => return None,
3285-
};
3286-
3287-
if remaining.is_some() {
3288-
panic!("Packets should never be coalesced in tests");
3289-
}
3290-
3286+
pub(crate) fn decode_packet(&self, packet: PartialDecode) -> Option<Vec<u8>> {
32913287
let decrypted_header = packet_crypto::unprotect_header(
3292-
first_decode.clone(),
3288+
packet.clone(),
32933289
&self.spaces,
32943290
self.zero_rtt_crypto.as_ref(),
32953291
self.peer_params.stateless_reset_token,

quinn-proto/src/endpoint.rs

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use crate::{
2323
crypto::{self, Keys, UnsupportedVersion},
2424
frame,
2525
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
26-
shared::{
27-
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent, IssuedCid,
28-
},
26+
shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent, IssuedCid},
2927
transport_parameters::TransportParameters,
3028
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
3129
MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE,
@@ -81,25 +79,23 @@ impl Endpoint {
8179

8280
/// Process events from [`Connection`]s that have returned `true` from [`Connection::poll_endpoint_events`]
8381
///
84-
/// May return a `ConnectionEvent` for any `Connection`. Call until `None` is returned.
85-
pub fn handle_events(&mut self) -> Option<(ConnectionHandle, ConnectionEvent)> {
82+
/// May return the [`ConnectionHandle`] of a [`Connection`] for which
83+
/// [`Connection::handle_events`] must be called. Call until `None` is returned.
84+
pub fn handle_events(&mut self) -> Option<ConnectionHandle> {
8685
while let Ok((ch, event)) = self.event_recv.try_recv() {
87-
if let Some(response) = self.handle_event(ch, event) {
88-
return Some((ch, response));
86+
if self.handle_event(ch, event) {
87+
return Some(ch);
8988
}
9089
}
9190
None
9291
}
9392

94-
fn handle_event(
95-
&mut self,
96-
ch: ConnectionHandle,
97-
event: EndpointEvent,
98-
) -> Option<ConnectionEvent> {
93+
fn handle_event(&mut self, ch: ConnectionHandle, event: EndpointEvent) -> bool {
9994
use EndpointEvent::*;
10095
match event {
10196
NeedIdentifiers(n) => {
102-
return Some(self.send_new_identifiers(ch, n));
97+
self.send_new_identifiers(ch, n);
98+
return true;
10399
}
104100
ResetToken(remote, token) => {
105101
if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
@@ -114,7 +110,8 @@ impl Endpoint {
114110
trace!("peer retired CID {}: {}", seq, cid);
115111
self.index.retire(&cid);
116112
if allow_more_cids {
117-
return Some(self.send_new_identifiers(ch, 1));
113+
self.send_new_identifiers(ch, 1);
114+
return true;
118115
}
119116
}
120117
}
@@ -129,7 +126,27 @@ impl Endpoint {
129126
}
130127
}
131128
}
132-
None
129+
false
130+
}
131+
132+
#[cfg(test)]
133+
pub(crate) fn decode_packet(
134+
&self,
135+
datagram: BytesMut,
136+
) -> Result<PartialDecode, PacketDecodeError> {
137+
PartialDecode::new(
138+
datagram,
139+
self.local_cid_generator.cid_len(),
140+
&self.config.supported_versions,
141+
self.config.grease_quic_bit,
142+
)
143+
.map(|(packet, rest)| {
144+
assert!(
145+
rest.is_none(),
146+
"capturing decoded coalesced packets in tests is unimplemented"
147+
);
148+
packet
149+
})
133150
}
134151

135152
/// Process an incoming UDP datagram
@@ -196,16 +213,16 @@ impl Endpoint {
196213

197214
let addresses = FourTuple { remote, local_ip };
198215
if let Some(ch) = self.index.get(&addresses, &first_decode) {
199-
return Some(DatagramEvent::ConnectionEvent(
200-
ch,
201-
ConnectionEvent(ConnectionEventInner::Datagram {
216+
_ = self.connections[ch.0]
217+
.events
218+
.send(ConnectionEvent::Datagram {
202219
now,
203220
remote: addresses.remote,
204221
ecn,
205222
first_decode,
206223
remaining,
207-
}),
208-
));
224+
});
225+
return Some(DatagramEvent::ConnectionEvent(ch));
209226
}
210227

211228
//
@@ -375,7 +392,7 @@ impl Endpoint {
375392
Ok((ch, conn))
376393
}
377394

378-
fn send_new_identifiers(&mut self, ch: ConnectionHandle, num: u64) -> ConnectionEvent {
395+
fn send_new_identifiers(&mut self, ch: ConnectionHandle, num: u64) {
379396
let mut ids = vec![];
380397
for _ in 0..num {
381398
let id = self.new_cid(ch);
@@ -389,7 +406,9 @@ impl Endpoint {
389406
reset_token: ResetToken::new(&*self.config.reset_key, &id),
390407
});
391408
}
392-
ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids))
409+
_ = self.connections[ch]
410+
.events
411+
.send(ConnectionEvent::NewIdentifiers(ids));
393412
}
394413

395414
/// Generate a connection ID for `ch`
@@ -603,6 +622,7 @@ impl Endpoint {
603622
) -> Connection {
604623
let mut rng_seed = [0; 32];
605624
self.rng.fill_bytes(&mut rng_seed);
625+
let (send, recv) = mpsc::channel();
606626
let conn = Connection::new(
607627
self.config.clone(),
608628
server_config,
@@ -619,6 +639,7 @@ impl Endpoint {
619639
self.allow_mtud,
620640
rng_seed,
621641
EndpointEvents::new(ch, self.event_send.clone()),
642+
recv,
622643
);
623644

624645
let id = self.connections.insert(ConnectionMeta {
@@ -627,6 +648,7 @@ impl Endpoint {
627648
loc_cids: iter::once((0, loc_cid)).collect(),
628649
addresses,
629650
reset_token: None,
651+
events: send,
630652
});
631653
debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
632654

@@ -836,6 +858,7 @@ pub(crate) struct ConnectionMeta {
836858
/// Reset token provided by the peer for the CID we're currently sending to, and the address
837859
/// being sent to
838860
reset_token: Option<(SocketAddr, ResetToken)>,
861+
events: mpsc::Sender<ConnectionEvent>,
839862
}
840863

841864
/// Internal identifier for a `Connection` currently associated with an endpoint
@@ -864,8 +887,8 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
864887
/// Event resulting from processing a single datagram
865888
#[allow(clippy::large_enum_variant)] // Not passed around extensively
866889
pub enum DatagramEvent {
867-
/// The datagram is redirected to its `Connection`
868-
ConnectionEvent(ConnectionHandle, ConnectionEvent),
890+
/// [`Connection::handle_events`] must be called on the associated [`Connection`]
891+
ConnectionEvent(ConnectionHandle),
869892
/// The datagram has resulted in starting a new `Connection`
870893
NewConnection(ConnectionHandle, Connection),
871894
/// Response generated directly by the endpoint

quinn-proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ mod endpoint;
6464
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};
6565

6666
mod shared;
67-
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};
67+
pub use crate::shared::{ConnectionId, EcnCodepoint};
6868

6969
mod transport_error;
7070
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};

quinn-proto/src/shared.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@ use bytes::{Buf, BufMut, BytesMut};
44

55
use crate::{coding::BufExt, packet::PartialDecode, ResetToken, MAX_CID_SIZE};
66

7-
/// Events sent from an Endpoint to a Connection
87
#[derive(Debug)]
9-
pub struct ConnectionEvent(pub(crate) ConnectionEventInner);
10-
11-
#[derive(Debug)]
12-
pub(crate) enum ConnectionEventInner {
8+
pub(crate) enum ConnectionEvent {
139
/// A datagram has been received for the Connection
1410
Datagram {
1511
now: Instant,

quinn-proto/src/tests/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ fn version_negotiate_client() {
7777
.unwrap();
7878
let now = Instant::now();
7979
let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize);
80-
let opt_event = client.handle(
80+
client.handle(
8181
now,
8282
server_addr,
8383
None,
@@ -90,9 +90,7 @@ fn version_negotiate_client() {
9090
.into(),
9191
&mut buf,
9292
);
93-
if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event {
94-
client_ch.handle_event(event, now);
95-
}
93+
client_ch.handle_events(now);
9694
assert_matches!(
9795
client_ch.poll(),
9896
Some(Event::ConnectionLost {

quinn-proto/src/tests/util.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
cmp,
3-
collections::{HashMap, VecDeque},
3+
collections::{HashMap, HashSet, VecDeque},
44
env,
55
io::{self, Write},
66
mem,
@@ -289,7 +289,7 @@ pub(super) struct TestEndpoint {
289289
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, BytesMut)>,
290290
accepted: Option<ConnectionHandle>,
291291
pub(super) connections: HashMap<ConnectionHandle, Connection>,
292-
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
292+
conn_events: HashSet<ConnectionHandle>,
293293
pub(super) captured_packets: Vec<Vec<u8>>,
294294
pub(super) capture_inbound_packets: bool,
295295
}
@@ -315,7 +315,7 @@ impl TestEndpoint {
315315
inbound: VecDeque::new(),
316316
accepted: None,
317317
connections: HashMap::default(),
318-
conn_events: HashMap::default(),
318+
conn_events: HashSet::default(),
319319
captured_packets: Vec::new(),
320320
capture_inbound_packets: false,
321321
}
@@ -335,22 +335,24 @@ impl TestEndpoint {
335335

336336
while self.inbound.front().map_or(false, |x| x.0 <= now) {
337337
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
338-
if let Some(event) = self
339-
.endpoint
340-
.handle(recv_time, remote, None, ecn, packet, &mut buf)
338+
if let Some(event) =
339+
self.endpoint
340+
.handle(recv_time, remote, None, ecn, packet.clone(), &mut buf)
341341
{
342342
match event {
343343
DatagramEvent::NewConnection(ch, conn) => {
344344
self.connections.insert(ch, conn);
345345
self.accepted = Some(ch);
346346
}
347-
DatagramEvent::ConnectionEvent(ch, event) => {
347+
DatagramEvent::ConnectionEvent(ch) => {
348348
if self.capture_inbound_packets {
349-
let packet = self.connections[&ch].decode_packet(&event);
349+
let packet = self
350+
.decode_packet(packet)
351+
.ok()
352+
.and_then(|x| self.connections[&ch].decode_packet(x));
350353
self.captured_packets.extend(packet);
351354
}
352-
353-
self.conn_events.entry(ch).or_default().push_back(event);
355+
self.conn_events.insert(ch);
354356
}
355357
DatagramEvent::Response(transmit) => {
356358
let size = transmit.size;
@@ -363,16 +365,14 @@ impl TestEndpoint {
363365

364366
loop {
365367
let mut endpoint_events = false;
366-
for (_, conn) in self.connections.iter_mut() {
368+
for (ch, conn) in self.connections.iter_mut() {
367369
if self.timeout.map_or(false, |x| x <= now) {
368370
self.timeout = None;
369371
conn.handle_timeout(now);
370372
}
371373

372-
for (_, mut events) in self.conn_events.drain() {
373-
for event in events.drain(..) {
374-
conn.handle_event(event, now);
375-
}
374+
if self.conn_events.remove(ch) {
375+
conn.handle_events(now);
376376
}
377377

378378
endpoint_events |= conn.poll_endpoint_events();
@@ -388,9 +388,9 @@ impl TestEndpoint {
388388
break;
389389
}
390390

391-
while let Some((ch, event)) = self.handle_events() {
391+
while let Some(ch) = self.handle_events() {
392392
if let Some(conn) = self.connections.get_mut(&ch) {
393-
conn.handle_event(event, now);
393+
conn.handle_events(now);
394394
}
395395
}
396396
}

quinn/src/connection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -934,8 +934,8 @@ impl State {
934934
Poll::Ready(Some(ConnectionEvent::Ping)) => {
935935
self.inner.ping();
936936
}
937-
Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
938-
self.inner.handle_event(event, now);
937+
Poll::Ready(Some(ConnectionEvent::Proto)) => {
938+
self.inner.handle_events(now);
939939
}
940940
Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
941941
self.close(error_code, reason, shared);

0 commit comments

Comments
 (0)