-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: move Identify I/O from NetworkBehaviour to ConnectionHandler #3208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+292
−226
Merged
Changes from 7 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
cff7c4a
Identify: rename pending IdentifyHandler references on doc.
jxs d703ba6
Identify: start moving I/O from NetworkBehaviour,
jxs 3db1ec1
identify: Move I/O from Networkehaviour,
jxs e01c02e
review: Don't bubble the Substream from the handler
jxs 11a9254
review: improve Request doc wording.
jxs 57013bb
review: separate Info details provided by the behaviour,
jxs a84178f
Merge branch 'master' of github.com:libp2p/rust-libp2p into identify-…
jxs 0db0c52
review: handler, request info on each new substream,
jxs aa0d81a
review: use FuturesUnordered for pending replies.
jxs fa05a2e
review: deprecate ReplySubstream.
jxs 7aab6a6
review: rename Protocol and ProtocolPush to Identify and Push.
jxs 963f11d
review: merge BehaviorInfo and InEvent,
jxs 8e88bf6
review: merge Behaviour pending_push into requests,
jxs c0afa8f
Merge branch 'master' of github.com:libp2p/rust-libp2p into identify-…
jxs 60470d2
review: fix doc typos.
jxs b484572
Merge branch 'master' of github.com:libp2p/rust-libp2p into identify-…
jxs d457cbd
review: add CHANGELOG.md entry
jxs bbe124d
Merge branch 'master' of github.com:libp2p/rust-libp2p into identify-…
jxs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,24 +18,21 @@ | |
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | ||
// DEALINGS IN THE SOFTWARE. | ||
|
||
use crate::handler::{self, Proto, Push}; | ||
use crate::protocol::{Info, ReplySubstream, UpgradeError}; | ||
use futures::prelude::*; | ||
use crate::handler::{self, BehaviourInfo, InEvent, Proto}; | ||
use crate::protocol::{Info, UpgradeError}; | ||
use libp2p_core::{ | ||
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, | ||
}; | ||
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; | ||
use libp2p_swarm::{ | ||
dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, | ||
IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, | ||
NotifyHandler, PollParameters, | ||
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, | ||
}; | ||
use lru::LruCache; | ||
use std::num::NonZeroUsize; | ||
use std::{ | ||
collections::{HashMap, HashSet, VecDeque}, | ||
iter::FromIterator, | ||
pin::Pin, | ||
task::Context, | ||
task::Poll, | ||
time::Duration, | ||
|
@@ -51,8 +48,8 @@ pub struct Behaviour { | |
config: Config, | ||
/// For each peer we're connected to, the observed address to send back to it. | ||
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>, | ||
/// Pending replies to send. | ||
pending_replies: VecDeque<Reply>, | ||
/// Information requests from the handlers to be fullfiled. | ||
requests: VecDeque<PeerId>, | ||
/// Pending events to be emitted when polled. | ||
events: VecDeque<NetworkBehaviourAction<Event, Proto>>, | ||
/// Peers to which an active push with current information about | ||
|
@@ -62,21 +59,6 @@ pub struct Behaviour { | |
discovered_peers: PeerCache, | ||
} | ||
|
||
/// A pending reply to an inbound identification request. | ||
enum Reply { | ||
/// The reply is queued for sending. | ||
Queued { | ||
peer: PeerId, | ||
io: ReplySubstream<NegotiatedSubstream>, | ||
observed: Multiaddr, | ||
}, | ||
/// The reply is being sent. | ||
Sending { | ||
peer: PeerId, | ||
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>, | ||
}, | ||
} | ||
|
||
/// Configuration for the [`identify::Behaviour`](Behaviour). | ||
#[non_exhaustive] | ||
#[derive(Debug, Clone)] | ||
|
@@ -184,7 +166,7 @@ impl Behaviour { | |
Self { | ||
config, | ||
connected: HashMap::new(), | ||
pending_replies: VecDeque::new(), | ||
requests: VecDeque::new(), | ||
events: VecDeque::new(), | ||
pending_push: HashSet::new(), | ||
discovered_peers, | ||
|
@@ -240,13 +222,19 @@ impl NetworkBehaviour for Behaviour { | |
type OutEvent = Event; | ||
|
||
fn new_handler(&mut self) -> Self::ConnectionHandler { | ||
Proto::new(self.config.initial_delay, self.config.interval) | ||
Proto::new( | ||
self.config.initial_delay, | ||
self.config.interval, | ||
self.config.local_public_key.clone(), | ||
self.config.protocol_version.clone(), | ||
self.config.agent_version.clone(), | ||
) | ||
} | ||
|
||
fn on_connection_handler_event( | ||
&mut self, | ||
peer_id: PeerId, | ||
connection: ConnectionId, | ||
_connection: ConnectionId, | ||
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent, | ||
) { | ||
match event { | ||
|
@@ -271,27 +259,20 @@ impl NetworkBehaviour for Behaviour { | |
score: AddressScore::Finite(1), | ||
}); | ||
} | ||
handler::Event::Identification(peer) => { | ||
self.events | ||
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { | ||
peer_id: peer, | ||
})); | ||
} | ||
handler::Event::IdentificationPushed => { | ||
self.events | ||
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { | ||
peer_id, | ||
})); | ||
} | ||
handler::Event::Identify(sender) => { | ||
let observed = self | ||
.connected | ||
.get(&peer_id) | ||
.and_then(|addrs| addrs.get(&connection)) | ||
.expect( | ||
"`on_connection_handler_event` is only called \ | ||
with an established connection and calling `NetworkBehaviour::on_event` \ | ||
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed", | ||
); | ||
self.pending_replies.push_back(Reply::Queued { | ||
peer: peer_id, | ||
io: sender, | ||
observed: observed.clone(), | ||
}); | ||
handler::Event::Identify => { | ||
self.requests.push_back(peer_id); | ||
} | ||
handler::Event::IdentificationError(error) => { | ||
self.events | ||
|
@@ -305,7 +286,7 @@ impl NetworkBehaviour for Behaviour { | |
|
||
fn poll( | ||
&mut self, | ||
cx: &mut Context<'_>, | ||
_cx: &mut Context<'_>, | ||
params: &mut impl PollParameters, | ||
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { | ||
if let Some(event) = self.events.pop_front() { | ||
|
@@ -333,7 +314,7 @@ impl NetworkBehaviour for Behaviour { | |
observed_addr, | ||
}; | ||
|
||
(*peer, Push(info)) | ||
(*peer, InEvent::Push(info)) | ||
}) | ||
}); | ||
|
||
|
@@ -346,55 +327,17 @@ impl NetworkBehaviour for Behaviour { | |
}); | ||
} | ||
|
||
// Check for pending replies to send. | ||
if let Some(r) = self.pending_replies.pop_front() { | ||
let mut sending = 0; | ||
let to_send = self.pending_replies.len() + 1; | ||
let mut reply = Some(r); | ||
loop { | ||
match reply { | ||
Some(Reply::Queued { peer, io, observed }) => { | ||
let info = Info { | ||
listen_addrs: listen_addrs(params), | ||
protocols: supported_protocols(params), | ||
public_key: self.config.local_public_key.clone(), | ||
protocol_version: self.config.protocol_version.clone(), | ||
agent_version: self.config.agent_version.clone(), | ||
observed_addr: observed, | ||
}; | ||
let io = Box::pin(io.send(info)); | ||
reply = Some(Reply::Sending { peer, io }); | ||
} | ||
Some(Reply::Sending { peer, mut io }) => { | ||
sending += 1; | ||
match Future::poll(Pin::new(&mut io), cx) { | ||
Poll::Ready(Ok(())) => { | ||
let event = Event::Sent { peer_id: peer }; | ||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); | ||
} | ||
Poll::Pending => { | ||
self.pending_replies.push_back(Reply::Sending { peer, io }); | ||
if sending == to_send { | ||
// All remaining futures are NotReady | ||
break; | ||
} else { | ||
reply = self.pending_replies.pop_front(); | ||
} | ||
} | ||
Poll::Ready(Err(err)) => { | ||
let event = Event::Error { | ||
peer_id: peer, | ||
error: ConnectionHandlerUpgrErr::Upgrade( | ||
libp2p_core::upgrade::UpgradeError::Apply(err), | ||
), | ||
}; | ||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); | ||
} | ||
} | ||
} | ||
None => unreachable!(), | ||
} | ||
} | ||
// Check for information requests from the handlers. | ||
if let Some(peer) = self.requests.pop_front() { | ||
let info = BehaviourInfo { | ||
listen_addrs: listen_addrs(params), | ||
protocols: supported_protocols(params), | ||
}; | ||
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { | ||
peer_id: peer, | ||
handler: NotifyHandler::Any, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the curlpit! This will be buggy with > 1 connection per peer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks Thomas, addressed. |
||
event: InEvent::Identify(info), | ||
}); | ||
} | ||
|
||
Poll::Pending | ||
|
@@ -557,6 +500,7 @@ impl PeerCache { | |
mod tests { | ||
use super::*; | ||
use futures::pin_mut; | ||
use futures::prelude::*; | ||
use libp2p::mplex::MplexConfig; | ||
use libp2p::noise; | ||
use libp2p::tcp; | ||
|
@@ -618,7 +562,7 @@ mod tests { | |
|
||
// nb. Either swarm may receive the `Identified` event first, upon which | ||
// it will permit the connection to be closed, as defined by | ||
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if | ||
// `Handler::connection_keep_alive`. Hence the test succeeds if | ||
// either `Identified` event arrives correctly. | ||
async_std::task::block_on(async move { | ||
loop { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll need to track the connection ID here so you can respond to the correct handler!