Skip to content

Commit f12b2e3

Browse files
committed
mdns: bounded address-update sender
Requires us to handle the case that the channel is full. Solved by adding a `pending_address_update` queue that is processed in `Behavior::poll`.
1 parent 82da43d commit f12b2e3

File tree

2 files changed

+46
-17
lines changed

2 files changed

+46
-17
lines changed

protocols/mdns/src/behaviour.rs

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ use std::{
3131
convert::Infallible,
3232
fmt,
3333
future::Future,
34-
io,
34+
io, mem,
3535
net::IpAddr,
3636
pin::Pin,
37-
task::{Context, Poll},
37+
task::{Context, Poll, Waker},
3838
time::Instant,
3939
};
4040

@@ -144,8 +144,8 @@ where
144144
/// Iface watcher.
145145
if_watch: P::Watcher,
146146

147-
/// Handles to tasks running the mDNS queries.
148-
if_tasks: HashMap<IpAddr, mpsc::UnboundedSender<ListenAddressUpdate>>,
147+
/// Channel for sending address updates to interface tasks.
148+
if_tasks: HashMap<IpAddr, mpsc::Sender<ListenAddressUpdate>>,
149149

150150
query_response_receiver: mpsc::Receiver<(PeerId, Multiaddr, Instant)>,
151151
query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
@@ -168,6 +168,11 @@ where
168168

169169
/// Pending behaviour events to be emitted.
170170
pending_events: VecDeque<ToSwarm<Event, Infallible>>,
171+
172+
/// Pending address updates to send to interfaces.
173+
pending_address_updates: Vec<ListenAddressUpdate>,
174+
175+
waker: Waker,
171176
}
172177

173178
impl<P> Behaviour<P>
@@ -189,6 +194,8 @@ where
189194
listen_addresses: Default::default(),
190195
local_peer_id,
191196
pending_events: Default::default(),
197+
pending_address_updates: Default::default(),
198+
waker: Waker::noop().clone(),
192199
})
193200
}
194201

@@ -214,6 +221,27 @@ where
214221
}
215222
self.closest_expiration = Some(P::Timer::at(now));
216223
}
224+
225+
fn try_send_address_update(
226+
&mut self,
227+
cx: &mut Context<'_>,
228+
update: ListenAddressUpdate,
229+
) -> Option<ListenAddressUpdate> {
230+
let ip = update.ip_addr()?;
231+
let tx = self.if_tasks.get_mut(&ip)?;
232+
match tx.poll_ready(cx) {
233+
Poll::Ready(Ok(())) => {
234+
tx.start_send(update).expect("Channel is ready.");
235+
None
236+
}
237+
Poll::Ready(Err(e)) if e.is_disconnected() => {
238+
tracing::error!("`InterfaceState` for ip {ip} dropped");
239+
self.if_tasks.remove(&ip);
240+
None
241+
}
242+
_ => Some(update),
243+
}
244+
}
217245
}
218246

219247
impl<P> NetworkBehaviour for Behaviour<P>
@@ -277,16 +305,10 @@ where
277305
if !self.listen_addresses.on_swarm_event(&event) {
278306
return;
279307
}
280-
if let Some(update) = ListenAddressUpdate::from_swarm(event) {
281-
// Send address update to matching interface task.
282-
if let Some(ip) = update.ip_addr() {
283-
if let Some(tx) = self.if_tasks.get_mut(&ip) {
284-
if tx.unbounded_send(update).is_err() {
285-
tracing::error!("`InterfaceState` for ip {ip} dropped");
286-
self.if_tasks.remove(&ip);
287-
}
288-
}
289-
}
308+
if let Some(update) = ListenAddressUpdate::from_swarm(event).and_then(|update| {
309+
self.try_send_address_update(&mut Context::from_waker(&self.waker.clone()), update)
310+
}) {
311+
self.pending_address_updates.push(update);
290312
}
291313
}
292314

@@ -296,6 +318,13 @@ where
296318
cx: &mut Context<'_>,
297319
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
298320
loop {
321+
// Send address updates to interface tasks.
322+
for update in mem::take(&mut self.pending_address_updates) {
323+
if let Some(update) = self.try_send_address_update(cx, update) {
324+
self.pending_address_updates.push(update);
325+
}
326+
}
327+
299328
// Check for pending events and emit them.
300329
if let Some(event) = self.pending_events.pop_front() {
301330
return Poll::Ready(event);
@@ -315,7 +344,7 @@ where
315344
continue;
316345
}
317346
if let Entry::Vacant(e) = self.if_tasks.entry(ip_addr) {
318-
let (addr_tx, addr_rx) = mpsc::unbounded();
347+
let (addr_tx, addr_rx) = mpsc::channel(10); // Chosen arbitrarily.
319348
let listen_addresses = self
320349
.listen_addresses
321350
.iter()

protocols/mdns/src/behaviour/iface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ pub(crate) struct InterfaceState<U, T> {
115115
/// Current listening addresses.
116116
listen_addresses: Vec<Multiaddr>,
117117
/// Receiver for listening-address updates from the swarm.
118-
listen_addresses_rx: mpsc::UnboundedReceiver<ListenAddressUpdate>,
118+
listen_addresses_rx: mpsc::Receiver<ListenAddressUpdate>,
119119

120120
query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
121121

@@ -153,7 +153,7 @@ where
153153
config: Config,
154154
local_peer_id: PeerId,
155155
listen_addresses: Vec<Multiaddr>,
156-
listen_addresses_rx: mpsc::UnboundedReceiver<ListenAddressUpdate>,
156+
listen_addresses_rx: mpsc::Receiver<ListenAddressUpdate>,
157157
query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>,
158158
) -> io::Result<Self> {
159159
tracing::info!(address=%addr, "creating instance on iface address");

0 commit comments

Comments
 (0)