Skip to content

Commit b595972

Browse files
romanbmxinden
andauthored
[ping] Refactor the ping protocol for conformity. (#1692)
* Refactor the ping protocol. Such that pings are sent over a single substream, as it is done in other libp2p implementations. Note that, since each peer sends its pings over a single, dedicated substream, every peer that participates in the protocol has effectively two open substreams. * Cleanup * Update ping changelog. * Update protocols/ping/src/protocol.rs Co-authored-by: Max Inden <[email protected]> Co-authored-by: Max Inden <[email protected]>
1 parent 5cd981b commit b595972

File tree

5 files changed

+348
-216
lines changed

5 files changed

+348
-216
lines changed

protocols/ping/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# 0.21.0 [unreleased]
22

3+
- Refactor the ping protocol for conformity by (re)using
4+
a single substream for outbound pings, addressing
5+
[#1601](https://github.com/libp2p/rust-libp2p/issues/1601).
6+
37
- Bump `libp2p-core` and `libp2p-swarm` dependencies.
48

59
# 0.20.0 [2020-07-01]

protocols/ping/src/handler.rs

Lines changed: 137 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,23 @@
2020

2121
use crate::protocol;
2222
use futures::prelude::*;
23+
use futures::future::BoxFuture;
2324
use libp2p_swarm::{
2425
KeepAlive,
26+
NegotiatedSubstream,
2527
SubstreamProtocol,
2628
ProtocolsHandler,
2729
ProtocolsHandlerUpgrErr,
2830
ProtocolsHandlerEvent
2931
};
30-
use std::{error::Error, io, fmt, num::NonZeroU32, pin::Pin, task::Context, task::Poll, time::Duration};
32+
use std::{
33+
error::Error,
34+
io,
35+
fmt,
36+
num::NonZeroU32,
37+
task::{Context, Poll},
38+
time::Duration
39+
};
3140
use std::collections::VecDeque;
3241
use wasm_timer::Delay;
3342
use void::Void;
@@ -160,23 +169,33 @@ impl Error for PingFailure {
160169
pub struct PingHandler {
161170
/// Configuration options.
162171
config: PingConfig,
163-
/// The timer for when to send the next ping.
164-
next_ping: Delay,
165-
/// The pending results from inbound or outbound pings, ready
166-
/// to be `poll()`ed.
167-
pending_results: VecDeque<PingResult>,
172+
/// The timer used for the delay to the next ping as well as
173+
/// the ping timeout.
174+
timer: Delay,
175+
/// Outbound ping failures that are pending to be processed by `poll()`.
176+
pending_errors: VecDeque<PingFailure>,
168177
/// The number of consecutive ping failures that occurred.
178+
///
179+
/// Each successful ping resets this counter to 0.
169180
failures: u32,
181+
/// The outbound ping state.
182+
outbound: Option<PingState>,
183+
/// The inbound pong handler, i.e. if there is an inbound
184+
/// substream, this is always a future that waits for the
185+
/// next inbound ping to be answered.
186+
inbound: Option<PongFuture>,
170187
}
171188

172189
impl PingHandler {
173190
/// Builds a new `PingHandler` with the given configuration.
174191
pub fn new(config: PingConfig) -> Self {
175192
PingHandler {
176193
config,
177-
next_ping: Delay::new(Duration::new(0, 0)),
178-
pending_results: VecDeque::with_capacity(2),
194+
timer: Delay::new(Duration::new(0, 0)),
195+
pending_errors: VecDeque::with_capacity(2),
179196
failures: 0,
197+
outbound: None,
198+
inbound: None,
180199
}
181200
}
182201
}
@@ -193,24 +212,25 @@ impl ProtocolsHandler for PingHandler {
193212
SubstreamProtocol::new(protocol::Ping)
194213
}
195214

196-
fn inject_fully_negotiated_inbound(&mut self, _: ()) {
197-
// A ping from a remote peer has been answered.
198-
self.pending_results.push_front(Ok(PingSuccess::Pong));
215+
fn inject_fully_negotiated_inbound(&mut self, stream: NegotiatedSubstream) {
216+
self.inbound = Some(protocol::recv_ping(stream).boxed());
199217
}
200218

201-
fn inject_fully_negotiated_outbound(&mut self, rtt: Duration, _info: ()) {
202-
// A ping initiated by the local peer was answered by the remote.
203-
self.pending_results.push_front(Ok(PingSuccess::Ping { rtt }));
219+
fn inject_fully_negotiated_outbound(&mut self, stream: NegotiatedSubstream, (): ()) {
220+
self.timer.reset(self.config.timeout);
221+
self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
204222
}
205223

206224
fn inject_event(&mut self, _: Void) {}
207225

208-
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<io::Error>) {
209-
self.pending_results.push_front(
210-
Err(match error {
226+
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) {
227+
self.outbound = None; // Request a new substream on the next `poll`.
228+
self.pending_errors.push_front(
229+
match error {
230+
// Note: This timeout only covers protocol negotiation.
211231
ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout,
212-
e => PingFailure::Other { error: Box::new(e) }
213-
}))
232+
e => PingFailure::Other { error: Box::new(e) },
233+
})
214234
}
215235

216236
fn connection_keep_alive(&self) -> KeepAlive {
@@ -222,117 +242,117 @@ impl ProtocolsHandler for PingHandler {
222242
}
223243

224244
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult, Self::Error>> {
225-
if let Some(result) = self.pending_results.pop_back() {
226-
if let Ok(PingSuccess::Ping { .. }) = result {
227-
self.failures = 0;
228-
self.next_ping.reset(self.config.interval);
229-
}
230-
if let Err(e) = result {
231-
self.failures += 1;
232-
if self.failures >= self.config.max_failures.get() {
233-
return Poll::Ready(ProtocolsHandlerEvent::Close(e))
234-
} else {
235-
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(e)))
245+
// Respond to inbound pings.
246+
if let Some(fut) = self.inbound.as_mut() {
247+
match fut.poll_unpin(cx) {
248+
Poll::Pending => {},
249+
Poll::Ready(Err(e)) => {
250+
log::debug!("Inbound ping error: {:?}", e);
251+
self.inbound = None;
252+
}
253+
Poll::Ready(Ok(stream)) => {
254+
// A ping from a remote peer has been answered, wait for the next.
255+
self.inbound = Some(protocol::recv_ping(stream).boxed());
256+
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Pong)))
236257
}
237258
}
238-
return Poll::Ready(ProtocolsHandlerEvent::Custom(result))
239-
}
240-
241-
match Future::poll(Pin::new(&mut self.next_ping), cx) {
242-
Poll::Ready(Ok(())) => {
243-
self.next_ping.reset(self.config.timeout);
244-
let protocol = SubstreamProtocol::new(protocol::Ping)
245-
.with_timeout(self.config.timeout);
246-
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
247-
protocol,
248-
info: (),
249-
})
250-
},
251-
Poll::Pending => Poll::Pending,
252-
Poll::Ready(Err(e)) =>
253-
Poll::Ready(ProtocolsHandlerEvent::Close(PingFailure::Other { error: Box::new(e) }))
254259
}
255-
}
256-
}
257-
258-
#[cfg(test)]
259-
mod tests {
260-
use super::*;
261-
262-
use futures::future;
263-
use quickcheck::*;
264-
use rand::Rng;
265-
266-
impl Arbitrary for PingConfig {
267-
fn arbitrary<G: Gen>(g: &mut G) -> PingConfig {
268-
PingConfig::new()
269-
.with_timeout(Duration::from_secs(g.gen_range(0, 3600)))
270-
.with_interval(Duration::from_secs(g.gen_range(0, 3600)))
271-
.with_max_failures(NonZeroU32::new(g.gen_range(1, 100)).unwrap())
272-
}
273-
}
274260

275-
fn tick(h: &mut PingHandler)
276-
-> ProtocolsHandlerEvent<protocol::Ping, (), PingResult, PingFailure>
277-
{
278-
async_std::task::block_on(future::poll_fn(|cx| h.poll(cx) ))
279-
}
261+
loop {
262+
// Check for outbound ping failures.
263+
if let Some(error) = self.pending_errors.pop_back() {
264+
log::debug!("Ping failure: {:?}", error);
280265

281-
#[test]
282-
fn ping_interval() {
283-
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
284-
let mut h = PingHandler::new(cfg);
266+
self.failures += 1;
285267

286-
// Send ping
287-
match tick(&mut h) {
288-
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => {
289-
// The handler must use the configured timeout.
290-
assert_eq!(protocol.timeout(), &h.config.timeout);
268+
// Note: For backward-compatibility, with configured
269+
// `max_failures == 1`, the first failure is always "free"
270+
// and silent. This allows peers who still use a new substream
271+
// for each ping to have successful ping exchanges with peers
272+
// that use a single substream, since every successful ping
273+
// resets `failures` to `0`, while at the same time emitting
274+
// events only for `max_failures - 1` failures, as before.
275+
if self.failures > 1 || self.config.max_failures.get() > 1 {
276+
if self.failures >= self.config.max_failures.get() {
277+
log::debug!("Too many failures ({}). Closing connection.", self.failures);
278+
return Poll::Ready(ProtocolsHandlerEvent::Close(error))
279+
}
280+
281+
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(error)))
291282
}
292-
e => panic!("Unexpected event: {:?}", e)
293283
}
294284

295-
// Receive pong
296-
h.inject_fully_negotiated_outbound(ping_rtt, ());
297-
match tick(&mut h) {
298-
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => {
299-
// The handler must report the given RTT.
300-
assert_eq!(rtt, ping_rtt);
285+
// Continue outbound pings.
286+
match self.outbound.take() {
287+
Some(PingState::Ping(mut ping)) => match ping.poll_unpin(cx) {
288+
Poll::Pending => {
289+
if self.timer.poll_unpin(cx).is_ready() {
290+
self.pending_errors.push_front(PingFailure::Timeout);
291+
} else {
292+
self.outbound = Some(PingState::Ping(ping));
293+
break
294+
}
295+
},
296+
Poll::Ready(Ok((stream, rtt))) => {
297+
self.failures = 0;
298+
self.timer.reset(self.config.interval);
299+
self.outbound = Some(PingState::Idle(stream));
300+
return Poll::Ready(
301+
ProtocolsHandlerEvent::Custom(
302+
Ok(PingSuccess::Ping { rtt })))
303+
}
304+
Poll::Ready(Err(e)) => {
305+
self.pending_errors.push_front(PingFailure::Other {
306+
error: Box::new(e)
307+
});
308+
}
309+
},
310+
Some(PingState::Idle(stream)) => match self.timer.poll_unpin(cx) {
311+
Poll::Pending => {
312+
self.outbound = Some(PingState::Idle(stream));
313+
break
314+
},
315+
Poll::Ready(Ok(())) => {
316+
self.timer.reset(self.config.timeout);
317+
self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed()));
318+
},
319+
Poll::Ready(Err(e)) => {
320+
return Poll::Ready(ProtocolsHandlerEvent::Close(
321+
PingFailure::Other {
322+
error: Box::new(e)
323+
}))
324+
}
325+
}
326+
Some(PingState::OpenStream) => {
327+
self.outbound = Some(PingState::OpenStream);
328+
break
329+
}
330+
None => {
331+
self.outbound = Some(PingState::OpenStream);
332+
let protocol = SubstreamProtocol::new(protocol::Ping)
333+
.with_timeout(self.config.timeout);
334+
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
335+
protocol,
336+
info: (),
337+
})
301338
}
302-
e => panic!("Unexpected event: {:?}", e)
303339
}
304-
305-
true
306340
}
307341

308-
quickcheck(prop as fn(_,_) -> _);
342+
Poll::Pending
309343
}
344+
}
310345

311-
#[test]
312-
fn max_failures() {
313-
let cfg = PingConfig::arbitrary(&mut StdGen::new(rand::thread_rng(), 100));
314-
let mut h = PingHandler::new(cfg);
315-
for _ in 0 .. h.config.max_failures.get() - 1 {
316-
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
317-
match tick(&mut h) {
318-
ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {}
319-
e => panic!("Unexpected event: {:?}", e)
320-
}
321-
}
322-
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
323-
match tick(&mut h) {
324-
ProtocolsHandlerEvent::Close(PingFailure::Timeout) => {
325-
assert_eq!(h.failures, h.config.max_failures.get());
326-
}
327-
e => panic!("Unexpected event: {:?}", e)
328-
}
329-
h.inject_fully_negotiated_outbound(Duration::from_secs(1), ());
330-
match tick(&mut h) {
331-
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => {
332-
// A success resets the counter for consecutive failures.
333-
assert_eq!(h.failures, 0);
334-
}
335-
e => panic!("Unexpected event: {:?}", e)
336-
}
337-
}
346+
type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>;
347+
type PongFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>;
348+
349+
/// The current state w.r.t. outbound pings.
350+
enum PingState {
351+
/// A new substream is being negotiated for the ping protocol.
352+
OpenStream,
353+
/// The substream is idle, waiting to send the next ping.
354+
Idle(NegotiatedSubstream),
355+
/// A ping is being sent and the response awaited.
356+
Ping(PingFuture),
338357
}
358+

protocols/ping/src/lib.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,14 @@
2828
//!
2929
//! The [`Ping`] struct implements the [`NetworkBehaviour`] trait. When used with a [`Swarm`],
3030
//! it will respond to inbound ping requests and as necessary periodically send outbound
31-
//! ping requests on every established connection. If a configurable number of pings fail,
32-
//! the connection will be closed.
31+
//! ping requests on every established connection. If a configurable number of consecutive
32+
//! pings fail, the connection will be closed.
3333
//!
3434
//! The `Ping` network behaviour produces [`PingEvent`]s, which may be consumed from the `Swarm`
3535
//! by an application, e.g. to collect statistics.
3636
//!
37-
//! > **Note**: The ping protocol does not keep otherwise idle connections alive,
38-
//! > it only adds an additional condition for terminating the connection, namely
39-
//! > a certain number of failed ping requests.
37+
//! > **Note**: The ping protocol does not keep otherwise idle connections alive
38+
//! > by default, see [`PingConfig::with_keep_alive`] for changing this behaviour.
4039
//!
4140
//! [`Swarm`]: libp2p_swarm::Swarm
4241
//! [`Transport`]: libp2p_core::Transport

0 commit comments

Comments
 (0)