-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(mdns): only send listening addresses that match interface #6003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
elenaf9
wants to merge
8
commits into
libp2p:master
Choose a base branch
from
elenaf9:feat/mdns/only-send-if-addresses
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
4e1d0e9
refactor(mdns): use channel instead of shared lock for listen addrs
elenaf9 807508d
feat(mdns): only send listen addrs that match interface
elenaf9 82da43d
fix docs
elenaf9 f12b2e3
mdns: bounded address-update sender
elenaf9 40a3ad0
fixup! mdns: bounded address-update sender
elenaf9 8a4da0c
fix(mdns/tests): also listen on ipv6 address
elenaf9 aa8d379
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into f…
elenaf9 32e9877
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into f…
elenaf9 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,17 +31,17 @@ | |
| convert::Infallible, | ||
| fmt, | ||
| future::Future, | ||
| io, | ||
| io, mem, | ||
| net::IpAddr, | ||
| pin::Pin, | ||
| sync::{Arc, RwLock}, | ||
| task::{Context, Poll}, | ||
| task::{Context, Poll, Waker}, | ||
| time::Instant, | ||
| }; | ||
|
|
||
| use futures::{channel::mpsc, Stream, StreamExt}; | ||
| use if_watch::IfEvent; | ||
| use libp2p_core::{transport::PortUse, Endpoint, Multiaddr}; | ||
| use iface::ListenAddressUpdate; | ||
| use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr}; | ||
| use libp2p_identity::PeerId; | ||
| use libp2p_swarm::{ | ||
| behaviour::FromSwarm, dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, | ||
|
|
@@ -64,18 +64,11 @@ | |
| /// The IfWatcher type. | ||
| type Watcher: Stream<Item = std::io::Result<IfEvent>> + fmt::Debug + Unpin; | ||
|
|
||
| type TaskHandle: Abort; | ||
|
|
||
| /// Create a new instance of the `IfWatcher` type. | ||
| fn new_watcher() -> Result<Self::Watcher, std::io::Error>; | ||
|
|
||
| #[track_caller] | ||
| fn spawn(task: impl Future<Output = ()> + Send + 'static) -> Self::TaskHandle; | ||
| } | ||
|
|
||
| #[allow(unreachable_pub)] // Not re-exported. | ||
| pub trait Abort { | ||
| fn abort(self); | ||
| fn spawn(task: impl Future<Output = ()> + Send + 'static); | ||
| } | ||
|
|
||
| /// The type of a [`Behaviour`] using the `tokio` implementation. | ||
|
|
@@ -84,10 +77,9 @@ | |
| use std::future::Future; | ||
|
|
||
| use if_watch::tokio::IfWatcher; | ||
| use tokio::task::JoinHandle; | ||
|
|
||
| use super::Provider; | ||
| use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer, Abort}; | ||
| use crate::behaviour::{socket::tokio::TokioUdpSocket, timer::tokio::TokioTimer}; | ||
|
|
||
| #[doc(hidden)] | ||
| pub enum Tokio {} | ||
|
|
@@ -96,20 +88,13 @@ | |
| type Socket = TokioUdpSocket; | ||
| type Timer = TokioTimer; | ||
| type Watcher = IfWatcher; | ||
| type TaskHandle = JoinHandle<()>; | ||
|
|
||
| fn new_watcher() -> Result<Self::Watcher, std::io::Error> { | ||
| IfWatcher::new() | ||
| } | ||
|
|
||
| fn spawn(task: impl Future<Output = ()> + Send + 'static) -> Self::TaskHandle { | ||
| tokio::spawn(task) | ||
| } | ||
| } | ||
|
|
||
| impl Abort for JoinHandle<()> { | ||
| fn abort(self) { | ||
| JoinHandle::abort(&self) | ||
| fn spawn(task: impl Future<Output = ()> + Send + 'static) { | ||
| tokio::spawn(task); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -129,8 +114,8 @@ | |
| /// Iface watcher. | ||
| if_watch: P::Watcher, | ||
|
|
||
| /// Handles to tasks running the mDNS queries. | ||
| if_tasks: HashMap<IpAddr, P::TaskHandle>, | ||
| /// Channel for sending address updates to interface tasks. | ||
| if_tasks: HashMap<IpAddr, mpsc::Sender<ListenAddressUpdate>>, | ||
|
|
||
| query_response_receiver: mpsc::Receiver<(PeerId, Multiaddr, Instant)>, | ||
| query_response_sender: mpsc::Sender<(PeerId, Multiaddr, Instant)>, | ||
|
|
@@ -147,16 +132,17 @@ | |
| closest_expiration: Option<P::Timer>, | ||
|
|
||
| /// The current set of listen addresses. | ||
| /// | ||
| /// This is shared across all interface tasks using an [`RwLock`]. | ||
| /// The [`Behaviour`] updates this upon new [`FromSwarm`] | ||
| /// events where as [`InterfaceState`]s read from it to answer inbound mDNS queries. | ||
| listen_addresses: Arc<RwLock<ListenAddresses>>, | ||
| listen_addresses: ListenAddresses, | ||
|
|
||
| local_peer_id: PeerId, | ||
|
|
||
| /// Pending behaviour events to be emitted. | ||
| pending_events: VecDeque<ToSwarm<Event, Infallible>>, | ||
|
|
||
| /// Pending address updates to send to interfaces. | ||
| pending_address_updates: Vec<ListenAddressUpdate>, | ||
|
|
||
| waker: Waker, | ||
| } | ||
|
|
||
| impl<P> Behaviour<P> | ||
|
|
@@ -178,6 +164,8 @@ | |
| listen_addresses: Default::default(), | ||
| local_peer_id, | ||
| pending_events: Default::default(), | ||
| pending_address_updates: Default::default(), | ||
| waker: Waker::noop().clone(), | ||
|
Check failure on line 168 in protocols/mdns/src/behaviour.rs
|
||
| }) | ||
| } | ||
|
|
||
|
|
@@ -203,6 +191,30 @@ | |
| } | ||
| self.closest_expiration = Some(P::Timer::at(now)); | ||
| } | ||
|
|
||
| /// Try to send an address update to the interface task that matches the address' IP. | ||
| /// | ||
| /// Returns the address if the sending failed due to a full channel. | ||
| fn try_send_address_update( | ||
| &mut self, | ||
| cx: &mut Context<'_>, | ||
| update: ListenAddressUpdate, | ||
| ) -> Option<ListenAddressUpdate> { | ||
| let ip = update.ip_addr()?; | ||
| let tx = self.if_tasks.get_mut(&ip)?; | ||
| match tx.poll_ready(cx) { | ||
| Poll::Ready(Ok(())) => { | ||
| tx.start_send(update).expect("Channel is ready."); | ||
| None | ||
| } | ||
| Poll::Ready(Err(e)) if e.is_disconnected() => { | ||
| tracing::error!("`InterfaceState` for ip {ip} dropped"); | ||
| self.if_tasks.remove(&ip); | ||
| None | ||
| } | ||
| _ => Some(update), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<P> NetworkBehaviour for Behaviour<P> | ||
|
|
@@ -262,10 +274,14 @@ | |
| } | ||
|
|
||
| fn on_swarm_event(&mut self, event: FromSwarm) { | ||
| self.listen_addresses | ||
| .write() | ||
| .unwrap_or_else(|e| e.into_inner()) | ||
| .on_swarm_event(&event); | ||
| if !self.listen_addresses.on_swarm_event(&event) { | ||
| return; | ||
| } | ||
| if let Some(update) = ListenAddressUpdate::from_swarm(event).and_then(|update| { | ||
| self.try_send_address_update(&mut Context::from_waker(&self.waker.clone()), update) | ||
| }) { | ||
| self.pending_address_updates.push(update); | ||
| } | ||
| } | ||
|
|
||
| #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))] | ||
|
|
@@ -274,6 +290,13 @@ | |
| cx: &mut Context<'_>, | ||
| ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> { | ||
| loop { | ||
| // Send address updates to interface tasks. | ||
| for update in mem::take(&mut self.pending_address_updates) { | ||
| if let Some(update) = self.try_send_address_update(cx, update) { | ||
| self.pending_address_updates.push(update); | ||
| } | ||
| } | ||
|
|
||
| // Check for pending events and emit them. | ||
| if let Some(event) = self.pending_events.pop_front() { | ||
| return Poll::Ready(event); | ||
|
|
@@ -283,25 +306,34 @@ | |
| while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { | ||
| match event { | ||
| Ok(IfEvent::Up(inet)) => { | ||
| let addr = inet.addr(); | ||
| if addr.is_loopback() { | ||
| let ip_addr = inet.addr(); | ||
| if ip_addr.is_loopback() { | ||
| continue; | ||
| } | ||
| if addr.is_ipv4() && self.config.enable_ipv6 | ||
| || addr.is_ipv6() && !self.config.enable_ipv6 | ||
| if ip_addr.is_ipv4() && self.config.enable_ipv6 | ||
| || ip_addr.is_ipv6() && !self.config.enable_ipv6 | ||
| { | ||
| continue; | ||
| } | ||
| if let Entry::Vacant(e) = self.if_tasks.entry(addr) { | ||
| if let Entry::Vacant(e) = self.if_tasks.entry(ip_addr) { | ||
| let (addr_tx, addr_rx) = mpsc::channel(10); // Chosen arbitrarily. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curiosity Elena, why did you prefer an "unbounded" |
||
| let listen_addresses = self | ||
| .listen_addresses | ||
| .iter() | ||
| .filter(|multiaddr| multiaddr_matches_ip(multiaddr, &ip_addr)) | ||
| .cloned() | ||
| .collect(); | ||
| match InterfaceState::<P::Socket, P::Timer>::new( | ||
| addr, | ||
| ip_addr, | ||
| self.config.clone(), | ||
| self.local_peer_id, | ||
| self.listen_addresses.clone(), | ||
| listen_addresses, | ||
| addr_rx, | ||
| self.query_response_sender.clone(), | ||
| ) { | ||
| Ok(iface_state) => { | ||
| e.insert(P::spawn(iface_state)); | ||
| P::spawn(iface_state); | ||
| e.insert(addr_tx); | ||
| } | ||
| Err(err) => { | ||
| tracing::error!("failed to create `InterfaceState`: {}", err) | ||
|
|
@@ -310,10 +342,8 @@ | |
| } | ||
| } | ||
| Ok(IfEvent::Down(inet)) => { | ||
| if let Some(handle) = self.if_tasks.remove(&inet.addr()) { | ||
| if self.if_tasks.remove(&inet.addr()).is_some() { | ||
| tracing::info!(instance=%inet.addr(), "dropping instance"); | ||
|
|
||
| handle.abort(); | ||
| } | ||
| } | ||
| Err(err) => tracing::error!("if watch returned an error: {}", err), | ||
|
|
@@ -378,11 +408,20 @@ | |
| self.closest_expiration = Some(timer); | ||
| } | ||
|
|
||
| self.waker = cx.waker().clone(); | ||
| return Poll::Pending; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn multiaddr_matches_ip(addr: &Multiaddr, ip: &IpAddr) -> bool { | ||
| match addr.iter().next() { | ||
| Some(Protocol::Ip4(ipv4)) => &IpAddr::V4(ipv4) == ip, | ||
| Some(Protocol::Ip6(ipv6)) => &IpAddr::V6(ipv6) == ip, | ||
| _ => false, | ||
| } | ||
| } | ||
|
|
||
| /// Event that can be produced by the `Mdns` behaviour. | ||
| #[derive(Debug, Clone)] | ||
| pub enum Event { | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::task::Waker::noopwas only stabilized in1.85our current MSRV is1.83.