Skip to content

Commit 8e1d4ed

Browse files
romanbtwittner
andauthored
[core/swarm] Emit events for active connection close and fix disconnect(). (#1619)
* Emit events for active connection close and fix `disconnect()`. The `Network` does currently not emit events for actively closed connections, e.g. via `EstablishedConnection::close` or `ConnectedPeer::disconnect()`. As a result, when actively closing connections, there will be `ConnectionEstablished` events emitted without eventually a matching `ConnectionClosed` event. This seems undesirable and has the consequence that the `Swarm::ban_peer_id` feature in `libp2p-swarm` does not result in appropriate calls to `NetworkBehaviour::inject_connection_closed` and `NetworkBehaviour::inject_disconnected`. Furthermore, the `disconnect()` functionality in `libp2p-core` is currently broken as it leaves the `Pool` in an inconsistent state. This commit does the following: 1. When connection background tasks are dropped (i.e. removed from the `Manager`), they always terminate immediately, without attempting an orderly close of the connection. 2. An orderly close is sent to the background task of a connection as a regular command. The background task emits a `Closed` event before terminating. 3. `Pool::disconnect()` removes all connection tasks for the affected peer from the `Manager`, i.e. without an orderly close, thereby also fixing the discovered state inconsistency due to not removing the corresponding entries in the `Pool` itself after removing them from the `Manager`. 4. A new test is added to `libp2p-swarm` that exercises the ban/unban functionality and places assertions on the number and order of calls to the `NetworkBehaviour`. In that context some new testing utilities have been added to `libp2p-swarm`. This addresses #1584. * Update swarm/src/lib.rs Co-authored-by: Toralf Wittner <[email protected]> * Incorporate some review feedback. * Adapt to changes in master. * More verbose panic messages. * Simplify There is no need for a `StartClose` future. * Fix doc links. * Further small cleanup. * Update CHANGELOGs and versions. Co-authored-by: Toralf Wittner <[email protected]>
1 parent 67f1b94 commit 8e1d4ed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+884
-306
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
2424
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)
2525

26+
# Version 0.24.0 [unreleased]
27+
28+
- Update `libp2p-core`, `libp2p-swarm` and dependent crates.
29+
2630
# Version 0.23.0 (2020-08-03)
2731

2832
**NOTE**: For a smooth upgrade path from `0.21` to `> 0.22`

Cargo.toml

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,23 @@ atomic = "0.4.6"
6262
bytes = "0.5"
6363
futures = "0.3.1"
6464
lazy_static = "1.2"
65-
libp2p-core = { version = "0.20.0", path = "core" }
66-
libp2p-core-derive = { version = "0.20.0", path = "misc/core-derive" }
67-
libp2p-floodsub = { version = "0.20.0", path = "protocols/floodsub", optional = true }
68-
libp2p-gossipsub = { version = "0.20.0", path = "./protocols/gossipsub", optional = true }
69-
libp2p-identify = { version = "0.20.0", path = "protocols/identify", optional = true }
70-
libp2p-kad = { version = "0.21.0", path = "protocols/kad", optional = true }
71-
libp2p-mplex = { version = "0.20.0", path = "muxers/mplex", optional = true }
72-
libp2p-noise = { version = "0.22.0", path = "protocols/noise", optional = true }
73-
libp2p-ping = { version = "0.20.0", path = "protocols/ping", optional = true }
74-
libp2p-plaintext = { version = "0.20.0", path = "protocols/plaintext", optional = true }
65+
libp2p-core = { version = "0.21.0", path = "core" }
66+
libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" }
67+
libp2p-floodsub = { version = "0.21.0", path = "protocols/floodsub", optional = true }
68+
libp2p-gossipsub = { version = "0.21.0", path = "./protocols/gossipsub", optional = true }
69+
libp2p-identify = { version = "0.21.0", path = "protocols/identify", optional = true }
70+
libp2p-kad = { version = "0.22.0", path = "protocols/kad", optional = true }
71+
libp2p-mplex = { version = "0.21.0", path = "muxers/mplex", optional = true }
72+
libp2p-noise = { version = "0.23.0", path = "protocols/noise", optional = true }
73+
libp2p-ping = { version = "0.21.0", path = "protocols/ping", optional = true }
74+
libp2p-plaintext = { version = "0.21.0", path = "protocols/plaintext", optional = true }
7575
libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true }
76-
libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true }
77-
libp2p-secio = { version = "0.20.0", path = "protocols/secio", default-features = false, optional = true }
78-
libp2p-swarm = { version = "0.20.0", path = "swarm" }
79-
libp2p-uds = { version = "0.20.0", path = "transports/uds", optional = true }
80-
libp2p-wasm-ext = { version = "0.20.0", path = "transports/wasm-ext", optional = true }
81-
libp2p-yamux = { version = "0.20.0", path = "muxers/yamux", optional = true }
76+
libp2p-request-response = { version = "0.2.0", path = "protocols/request-response", optional = true }
77+
libp2p-secio = { version = "0.21.0", path = "protocols/secio", default-features = false, optional = true }
78+
libp2p-swarm = { version = "0.21.0", path = "swarm" }
79+
libp2p-uds = { version = "0.21.0", path = "transports/uds", optional = true }
80+
libp2p-wasm-ext = { version = "0.21.0", path = "transports/wasm-ext", optional = true }
81+
libp2p-yamux = { version = "0.21.0", path = "muxers/yamux", optional = true }
8282
multiaddr = { package = "parity-multiaddr", version = "0.9.1", path = "misc/multiaddr" }
8383
multihash = "0.11.0"
8484
parking_lot = "0.10.0"
@@ -87,11 +87,11 @@ smallvec = "1.0"
8787
wasm-timer = "0.2.4"
8888

8989
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
90-
libp2p-deflate = { version = "0.20.0", path = "protocols/deflate", optional = true }
91-
libp2p-dns = { version = "0.20.0", path = "transports/dns", optional = true }
92-
libp2p-mdns = { version = "0.20.0", path = "protocols/mdns", optional = true }
93-
libp2p-tcp = { version = "0.20.0", path = "transports/tcp", optional = true }
94-
libp2p-websocket = { version = "0.21.0", path = "transports/websocket", optional = true }
90+
libp2p-deflate = { version = "0.21.0", path = "protocols/deflate", optional = true }
91+
libp2p-dns = { version = "0.21.0", path = "transports/dns", optional = true }
92+
libp2p-mdns = { version = "0.21.0", path = "protocols/mdns", optional = true }
93+
libp2p-tcp = { version = "0.21.0", path = "transports/tcp", optional = true }
94+
libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional = true }
9595

9696
[dev-dependencies]
9797
async-std = "1.6.2"

core/CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
# 0.21.0 [unreleased]
2+
3+
- Refactoring of connection close and disconnect behaviour. In particular, the former
4+
`NetworkEvent::ConnectionError` is now `NetworkEvent::ConnectionClosed` with the `error`
5+
field being an `Option` and `None` indicating an active (but not necessarily orderly) close.
6+
This guarantees that `ConnectionEstablished` events are always eventually paired
7+
with `ConnectionClosed` events, regardless of how connections are closed.
8+
Correspondingly, `EstablishedConnection::close` is now `EstablishedConnection::start_close`
9+
to reflect that an orderly close completes asynchronously in the background, with the
10+
outcome observed by continued polling of the `Network`. In contrast, `disconnect`ing
11+
a peer takes effect immediately without an orderly connection shutdown.
12+
See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details.
13+
114
# 0.20.1 [2020-17-17]
215

316
- Update ed25519-dalek dependency.

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "libp2p-core"
33
edition = "2018"
44
description = "Core traits and structs of libp2p"
5-
version = "0.20.1"
5+
version = "0.21.0"
66
authors = ["Parity Technologies <[email protected]>"]
77
license = "MIT"
88
repository = "https://github.com/libp2p/rust-libp2p"

core/src/connection/manager.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -196,18 +196,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
196196
handler: H
197197
},
198198

199-
/// An established connection has encountered an error.
200-
ConnectionError {
199+
/// An established connection has been closed.
200+
ConnectionClosed {
201201
/// The connection ID.
202202
///
203-
/// As a result of the error, the connection has been removed
204-
/// from the `Manager` and is being closed. Hence this ID will
205-
/// no longer resolve to a valid entry in the manager.
203+
/// > **Note**: Closed connections are removed from the `Manager`.
204+
/// > Hence this ID will no longer resolve to a valid entry in
205+
/// > the manager.
206206
id: ConnectionId,
207-
/// Information about the connection that encountered the error.
207+
/// Information about the closed connection.
208208
connected: Connected<C>,
209-
/// The error that occurred.
210-
error: ConnectionError<HE>,
209+
/// The error that occurred, if any. If `None`, the connection
210+
/// has been actively closed.
211+
error: Option<ConnectionError<HE>>,
211212
},
212213

213214
/// A connection has been established.
@@ -348,11 +349,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
348349
/// Polls the manager for events relating to the managed connections.
349350
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
350351
// Advance the content of `local_spawns`.
351-
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
352+
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}
352353

353354
// Poll for the first event for which the manager still has a registered task, if any.
354355
let event = loop {
355-
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
356+
match self.events_rx.poll_next_unpin(cx) {
356357
Poll::Ready(Some(event)) => {
357358
if self.tasks.contains_key(event.id()) { // (1)
358359
break event
@@ -397,19 +398,18 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
397398
old_endpoint: old,
398399
new_endpoint: new,
399400
}
400-
},
401-
task::Event::Error { id, error } => {
401+
}
402+
task::Event::Closed { id, error } => {
402403
let id = ConnectionId(id);
403404
let task = task.remove();
404405
match task.state {
405406
TaskState::Established(connected) =>
406-
Event::ConnectionError { id, connected, error },
407+
Event::ConnectionClosed { id, connected, error },
407408
TaskState::Pending => unreachable!(
408-
"`Event::Error` implies (2) occurred on that task and thus (3)."
409+
"`Event::Closed` implies (2) occurred on that task and thus (3)."
409410
),
410411
}
411412
}
412-
413413
})
414414
} else {
415415
unreachable!("By (1)")
@@ -455,10 +455,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
455455
/// > task _may not be notified_ if sending the event fails due to
456456
/// > the connection handler not being ready at this time.
457457
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
458-
let cmd = task::Command::NotifyHandler(event);
458+
let cmd = task::Command::NotifyHandler(event); // (*)
459459
self.task.get_mut().sender.try_send(cmd)
460460
.map_err(|e| match e.into_inner() {
461-
task::Command::NotifyHandler(event) => event
461+
task::Command::NotifyHandler(event) => event,
462+
_ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
462463
})
463464
}
464465

@@ -472,6 +473,22 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
472473
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
473474
}
474475

476+
/// Sends a close command to the associated background task,
477+
/// thus initiating a graceful active close of the connection.
478+
///
479+
/// Has no effect if the connection is already closing.
480+
///
481+
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
482+
/// is emitted by [`Manager::poll`].
483+
pub fn start_close(mut self) {
484+
// Clone the sender so that we are guaranteed to have
485+
// capacity for the close command (every sender gets a slot).
486+
match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
487+
Ok(()) => {},
488+
Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
489+
}
490+
}
491+
475492
/// Obtains information about the established connection.
476493
pub fn connected(&self) -> &Connected<C> {
477494
match &self.task.get().state {
@@ -480,16 +497,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
480497
}
481498
}
482499

483-
/// Closes the connection represented by this entry,
484-
/// returning the connection information.
485-
pub fn close(self) -> Connected<C> {
500+
/// Instantly removes the entry from the manager, dropping
501+
/// the command channel to the background task of the connection,
502+
/// which will thus drop the connection asap without an orderly
503+
/// close or emitting another event.
504+
pub fn remove(self) -> Connected<C> {
486505
match self.task.remove().state {
487506
TaskState::Established(c) => c,
488507
TaskState::Pending => unreachable!("By Entry::new()")
489508
}
490509
}
491510

492-
/// Returns the connection id.
511+
/// Returns the connection ID.
493512
pub fn id(&self) -> ConnectionId {
494513
ConnectionId(*self.task.key())
495514
}
@@ -513,3 +532,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
513532
self.task.remove();
514533
}
515534
}
535+

0 commit comments

Comments
 (0)