Skip to content

Commit 91bc3a4

Browse files
committed
Switch to AtomicWaker to wake the endpoint driver
Groundwork for direct connection -> driver wake-ups, and yields one less piece of state inside a monolithic mutex.
1 parent 5a8483b commit 91bc3a4

File tree

2 files changed

+6
-9
lines changed

2 files changed

+6
-9
lines changed

quinn/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ maintenance = { status = "experimental" }
3535
[dependencies]
3636
async-io = { version = "2.0", optional = true }
3737
async-std = { version = "1.11", optional = true }
38+
atomic-waker = "1.1.2"
3839
bytes = "1"
3940
# Enables futures::io::{AsyncRead, AsyncWrite} support for streams
4041
futures-io = { version = "0.3.19", optional = true }

quinn/src/endpoint.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use std::{
88
pin::Pin,
99
str,
1010
sync::{Arc, Mutex},
11-
task::{Context, Poll, Waker},
11+
task::{Context, Poll},
1212
time::Instant,
1313
};
1414

1515
use crate::runtime::{default_runtime, AsyncUdpSocket, Runtime};
16+
use atomic_waker::AtomicWaker;
1617
use bytes::{Bytes, BytesMut};
1718
use pin_project_lite::pin_project;
1819
use proto::{
@@ -319,10 +320,8 @@ impl Future for EndpointDriver {
319320

320321
#[allow(unused_mut)] // MSRV
321322
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
323+
self.0.shared.driver.register(cx.waker());
322324
let mut endpoint = self.0.state.lock().unwrap();
323-
if endpoint.driver.is_none() {
324-
endpoint.driver = Some(cx.waker().clone());
325-
}
326325

327326
let now = Instant::now();
328327
let mut keep_going = false;
@@ -372,7 +371,6 @@ pub(crate) struct State {
372371
inner: proto::Endpoint,
373372
outgoing: VecDeque<udp::Transmit>,
374373
incoming: VecDeque<Connecting>,
375-
driver: Option<Waker>,
376374
ipv6: bool,
377375
connections: ConnectionSet,
378376
events: mpsc::UnboundedReceiver<(ConnectionHandle, EndpointEvent)>,
@@ -391,6 +389,7 @@ pub(crate) struct State {
391389
pub(crate) struct Shared {
392390
incoming: Notify,
393391
idle: Notify,
392+
driver: Arc<AtomicWaker>,
394393
}
395394

396395
impl State {
@@ -705,7 +704,6 @@ impl EndpointRef {
705704
events,
706705
outgoing: VecDeque::new(),
707706
incoming: VecDeque::new(),
708-
driver: None,
709707
connections: ConnectionSet {
710708
senders: FxHashMap::default(),
711709
sender,
@@ -738,9 +736,7 @@ impl Drop for EndpointRef {
738736
if x == 0 {
739737
// If the driver is about to be on its own, ensure it can shut down if the last
740738
// connection is gone.
741-
if let Some(task) = endpoint.driver.take() {
742-
task.wake();
743-
}
739+
self.0.shared.driver.wake();
744740
}
745741
}
746742
}

0 commit comments

Comments
 (0)