Skip to content

Commit ee92ffd

Browse files
committed
perf(quic): batch recv on channel
1 parent 148f3ac commit ee92ffd

File tree

4 files changed

+77
-58
lines changed

4 files changed

+77
-58
lines changed

compio-quic/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ h3 = { version = "0.0.6", optional = true }
3333
bytes = { workspace = true }
3434
flume = { workspace = true }
3535
futures-util = { workspace = true }
36+
rustc-hash = "2.0.0"
3637
thiserror = "1.0.63"
3738

3839
# Windows specific dependencies

compio-quic/benches/quic.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ fn echo_quinn(b: &mut Bencher, content: &[u8], streams: usize) {
127127
client.set_default_client_config(client_config);
128128
let addr = server.local_addr().unwrap();
129129

130-
let (client_conn, server_conn) = futures_util::join!(
130+
let (client_conn, server_conn) = tokio::join!(
131131
async move { client.connect(addr, "localhost").unwrap().await.unwrap() },
132132
async move { server.accept().await.unwrap().await.unwrap() }
133133
);
134134

135135
let start = Instant::now();
136-
tokio::spawn(async move {
136+
let handle = tokio::spawn(async move {
137137
while let Ok((mut send, mut recv)) = server_conn.accept_bi().await {
138138
tokio::spawn(async move {
139139
echo_impl!(send, recv);
@@ -157,6 +157,7 @@ fn echo_quinn(b: &mut Bencher, content: &[u8], streams: usize) {
157157
.collect::<FuturesUnordered<_>>();
158158
while futures.next().await.is_some() {}
159159
}
160+
handle.abort();
160161
start.elapsed()
161162
});
162163
}

compio-quic/src/connection.rs

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{HashMap, VecDeque},
2+
collections::VecDeque,
33
io,
44
net::{IpAddr, SocketAddr},
55
pin::{pin, Pin},
@@ -21,6 +21,7 @@ use quinn_proto::{
2121
congestion::Controller, crypto::rustls::HandshakeData, ConnectionHandle, ConnectionStats, Dir,
2222
EndpointEvent, StreamEvent, StreamId, VarInt,
2323
};
24+
use rustc_hash::FxHashMap as HashMap;
2425
use thiserror::Error;
2526

2627
use crate::{RecvStream, SendStream, Socket};
@@ -37,7 +38,7 @@ pub(crate) struct ConnectionState {
3738
pub(crate) error: Option<ConnectionError>,
3839
connected: bool,
3940
worker: Option<JoinHandle<()>>,
40-
poll_waker: Option<Waker>,
41+
poller: Option<Waker>,
4142
on_connected: Option<Waker>,
4243
on_handshake_data: Option<Waker>,
4344
datagram_received: VecDeque<Waker>,
@@ -73,8 +74,14 @@ impl ConnectionState {
7374
wake_all_streams(&mut self.stopped);
7475
}
7576

77+
fn close(&mut self, error_code: VarInt, reason: Bytes) {
78+
self.conn.close(Instant::now(), error_code, reason);
79+
self.terminate(ConnectionError::LocallyClosed);
80+
self.wake();
81+
}
82+
7683
pub(crate) fn wake(&mut self) {
77-
if let Some(waker) = self.poll_waker.take() {
84+
if let Some(waker) = self.poller.take() {
7885
waker.wake()
7986
}
8087
}
@@ -110,6 +117,12 @@ pub(crate) struct ConnectionInner {
110117
events_rx: Receiver<ConnectionEvent>,
111118
}
112119

120+
fn implicit_close(this: &Arc<ConnectionInner>) {
121+
if Arc::strong_count(this) == 2 {
122+
this.state().close(0u32.into(), Bytes::new())
123+
}
124+
}
125+
113126
impl ConnectionInner {
114127
fn new(
115128
handle: ConnectionHandle,
@@ -124,16 +137,16 @@ impl ConnectionInner {
124137
connected: false,
125138
error: None,
126139
worker: None,
127-
poll_waker: None,
140+
poller: None,
128141
on_connected: None,
129142
on_handshake_data: None,
130143
datagram_received: VecDeque::new(),
131144
datagrams_unblocked: VecDeque::new(),
132145
stream_opened: [VecDeque::new(), VecDeque::new()],
133146
stream_available: [VecDeque::new(), VecDeque::new()],
134-
writable: HashMap::new(),
135-
readable: HashMap::new(),
136-
stopped: HashMap::new(),
147+
writable: HashMap::default(),
148+
readable: HashMap::default(),
149+
stopped: HashMap::default(),
137150
}),
138151
handle,
139152
socket,
@@ -157,25 +170,13 @@ impl ConnectionInner {
157170
}
158171
}
159172

160-
fn close(&self, error_code: VarInt, reason: Bytes) {
161-
let mut state = self.state();
162-
state.conn.close(Instant::now(), error_code, reason);
163-
state.terminate(ConnectionError::LocallyClosed);
164-
state.wake();
165-
}
166-
167-
async fn run(&self) -> io::Result<()> {
168-
let mut send_buf = Some(Vec::with_capacity(self.state().conn.current_mtu() as usize));
169-
let mut transmit_fut = pin!(Fuse::terminated());
170-
171-
let mut timer = Timer::new();
172-
173+
async fn run(self: &Arc<Self>) -> io::Result<()> {
173174
let mut poller = stream::poll_fn(|cx| {
174175
let mut state = self.state();
175-
let ready = state.poll_waker.is_none();
176-
match &state.poll_waker {
176+
let ready = state.poller.is_none();
177+
match &state.poller {
177178
Some(waker) if waker.will_wake(cx.waker()) => {}
178-
_ => state.poll_waker = Some(cx.waker().clone()),
179+
_ => state.poller = Some(cx.waker().clone()),
179180
};
180181
if ready {
181182
Poll::Ready(Some(()))
@@ -185,36 +186,46 @@ impl ConnectionInner {
185186
})
186187
.fuse();
187188

189+
let mut timer = Timer::new();
190+
let mut event_stream = self.events_rx.stream().ready_chunks(100);
191+
let mut send_buf = Some(Vec::with_capacity(self.state().conn.current_mtu() as usize));
192+
let mut transmit_fut = pin!(Fuse::terminated());
193+
188194
loop {
189-
select! {
190-
_ = poller.next() => {}
195+
let mut state = select! {
196+
_ = poller.select_next_some() => self.state(),
191197
_ = timer => {
192-
self.state().conn.handle_timeout(Instant::now());
193198
timer.reset(None);
199+
let mut state = self.state();
200+
state.conn.handle_timeout(Instant::now());
201+
state
194202
}
195-
ev = self.events_rx.recv_async() => match ev {
196-
Ok(ConnectionEvent::Close(error_code, reason)) => self.close(error_code, reason),
197-
Ok(ConnectionEvent::Proto(ev)) => self.state().conn.handle_event(ev),
198-
Err(_) => unreachable!("endpoint dropped connection"),
203+
events = event_stream.select_next_some() => {
204+
let mut state = self.state();
205+
for event in events {
206+
match event {
207+
ConnectionEvent::Close(error_code, reason) => state.close(error_code, reason),
208+
ConnectionEvent::Proto(event) => state.conn.handle_event(event),
209+
}
210+
}
211+
state
199212
},
200213
BufResult::<(), Vec<u8>>(res, mut buf) = transmit_fut => match res {
201214
Ok(()) => {
202215
buf.clear();
203216
send_buf = Some(buf);
217+
self.state()
204218
},
205219
Err(e) => break Err(e),
206220
},
207-
}
208-
209-
let now = Instant::now();
210-
let mut state = self.state();
221+
};
211222

212223
if let Some(mut buf) = send_buf.take() {
213-
if let Some(transmit) =
214-
state
215-
.conn
216-
.poll_transmit(now, self.socket.max_gso_segments(), &mut buf)
217-
{
224+
if let Some(transmit) = state.conn.poll_transmit(
225+
Instant::now(),
226+
self.socket.max_gso_segments(),
227+
&mut buf,
228+
) {
218229
transmit_fut.set(async move { self.socket.send(buf, &transmit).await }.fuse())
219230
} else {
220231
send_buf = Some(buf);
@@ -480,9 +491,7 @@ impl Future for Connecting {
480491

481492
impl Drop for Connecting {
482493
fn drop(&mut self) {
483-
if Arc::strong_count(&self.0) == 2 {
484-
self.0.close(0u32.into(), Bytes::new())
485-
}
494+
implicit_close(&self.0)
486495
}
487496
}
488497

@@ -593,7 +602,9 @@ impl Connection {
593602
/// [`Endpoint::shutdown()`]: crate::Endpoint::shutdown
594603
/// [`close()`]: Connection::close
595604
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
596-
self.0.close(error_code, Bytes::copy_from_slice(reason));
605+
self.0
606+
.state()
607+
.close(error_code, Bytes::copy_from_slice(reason));
597608
}
598609

599610
/// Wait for the connection to be closed for any reason.
@@ -838,9 +849,7 @@ impl Eq for Connection {}
838849

839850
impl Drop for Connection {
840851
fn drop(&mut self) {
841-
if Arc::strong_count(&self.0) == 2 {
842-
self.close(0u32.into(), b"")
843-
}
852+
implicit_close(&self.0)
844853
}
845854
}
846855

compio-quic/src/endpoint.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{HashMap, VecDeque},
2+
collections::VecDeque,
33
io,
44
mem::ManuallyDrop,
55
net::{SocketAddr, SocketAddrV6},
@@ -19,12 +19,13 @@ use futures_util::{
1919
future::{self},
2020
select,
2121
task::AtomicWaker,
22-
FutureExt,
22+
FutureExt, StreamExt,
2323
};
2424
use quinn_proto::{
2525
ClientConfig, ConnectError, ConnectionError, ConnectionHandle, DatagramEvent, EndpointConfig,
2626
EndpointEvent, ServerConfig, Transmit, VarInt,
2727
};
28+
use rustc_hash::FxHashMap as HashMap;
2829

2930
use crate::{Connecting, ConnectionEvent, Incoming, RecvMeta, Socket};
3031

@@ -153,7 +154,7 @@ impl EndpointInner {
153154
None,
154155
),
155156
worker: None,
156-
connections: HashMap::new(),
157+
connections: HashMap::default(),
157158
close: None,
158159
exit_on_idle: false,
159160
incoming: VecDeque::new(),
@@ -254,6 +255,8 @@ impl EndpointInner {
254255
}
255256

256257
async fn run(&self) -> io::Result<()> {
258+
let respond_fn = |buf: Vec<u8>, transmit: Transmit| self.respond(buf, transmit);
259+
257260
let mut recv_fut = pin!(
258261
self.socket
259262
.recv(Vec::with_capacity(
@@ -269,26 +272,31 @@ impl EndpointInner {
269272
.fuse()
270273
);
271274

272-
let respond_fn = |buf: Vec<u8>, transmit: Transmit| self.respond(buf, transmit);
275+
let mut event_stream = self.events.1.stream().ready_chunks(100);
273276

274277
loop {
275-
select! {
278+
let mut state = select! {
276279
BufResult(res, recv_buf) = recv_fut => {
280+
let mut state = self.state.lock().unwrap();
277281
match res {
278-
Ok(meta) => self.state.lock().unwrap().handle_data(meta, &recv_buf, respond_fn),
282+
Ok(meta) => state.handle_data(meta, &recv_buf, respond_fn),
279283
Err(e) if e.kind() == io::ErrorKind::ConnectionReset => {}
280284
#[cfg(windows)]
281285
Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PORT_UNREACHABLE as _) => {}
282286
Err(e) => break Err(e),
283287
}
284288
recv_fut.set(self.socket.recv(recv_buf).fuse());
289+
state
285290
},
286-
(ch, event) = self.events.1.recv_async().map(Result::unwrap) => {
287-
self.state.lock().unwrap().handle_event(ch, event);
291+
events = event_stream.select_next_some() => {
292+
let mut state = self.state.lock().unwrap();
293+
for (ch, event) in events {
294+
state.handle_event(ch, event);
295+
}
296+
state
288297
},
289-
}
298+
};
290299

291-
let mut state = self.state.lock().unwrap();
292300
if state.exit_on_idle && state.is_idle() {
293301
break Ok(());
294302
}

0 commit comments

Comments
 (0)