Skip to content

Commit 30de0b4

Browse files
twittnerromanb
andauthored
Add Throttled to libp2p-request-response. (#1696)
* Use a single exchange instead of two one_shots. * Add `Throttled` to libp2p-request-response. Wraps the existing `RequestResponse` behaviour and applies strict limits to the number of inbound and outbound requests per peer. The wrapper is opt-in and if not used, the protocol behaviour of `RequestResponse` does not change. This PR also does not introduce an extra protocol, hence the limits applied need to be known a priori for all nodes which is not always possible or desirable. As mentioned in #1687 I think that we should eventually augment the protocol with metadata which allows a more dynamic exchange of requests and responses. This PR also replaces the two oneshot channels with a single one from the scambio crate which saves one allocation per request/response. If not desirable because the crate has seen less testing the first commit could be reverted. * Fix rustdoc error. * Remove some leftovers from development. * Add docs to `NetworBehaviourAction::{map_in,map_out}`. * Apply suggestions from code review Co-authored-by: Roman Borschel <[email protected]> * Add `ping_protocol_throttled` test. * Add another test. * Revert "Use a single exchange instead of two one_shots." This reverts commit e34e129. # Conflicts: # protocols/request-response/Cargo.toml # protocols/request-response/src/handler/protocol.rs * Update CHANGELOG. * Update CHANGELOG. Co-authored-by: Roman Borschel <[email protected]>
1 parent b595972 commit 30de0b4

File tree

8 files changed

+557
-16
lines changed

8 files changed

+557
-16
lines changed

protocols/request-response/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
# 0.2.0
1+
# 0.2.0 // unreleased
22

3+
- Added `RequestResponse::throttled` to wrap the behaviour into one that
4+
enforces limits on inbound and outbound requests per peer. The limits
5+
have to be known upfront by all nodes.
36
- Bump `libp2p-core` and `libp2p-swarm` dependencies.
47

58
# 0.1.1

protocols/request-response/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ async-trait = "0.1"
1414
futures = "0.3.1"
1515
libp2p-core = { version = "0.21.0", path = "../../core" }
1616
libp2p-swarm = { version = "0.21.0", path = "../../swarm" }
17+
log = "0.4.11"
18+
lru = "0.6"
19+
rand = "0.7"
1720
smallvec = "1.4"
1821
wasm-timer = "0.2"
1922

protocols/request-response/src/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ where
110110

111111
/// The events emitted by the [`RequestResponseHandler`].
112112
#[doc(hidden)]
113+
#[derive(Debug)]
113114
pub enum RequestResponseHandlerEvent<TCodec>
114115
where
115116
TCodec: RequestResponseCodec

protocols/request-response/src/handler/protocol.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,9 @@
2626
use crate::RequestId;
2727
use crate::codec::RequestResponseCodec;
2828

29-
use futures::{
30-
channel::oneshot,
31-
future::BoxFuture,
32-
prelude::*,
33-
};
34-
use libp2p_core::{
35-
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
36-
};
37-
use libp2p_swarm::{
38-
NegotiatedSubstream,
39-
};
29+
use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30+
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31+
use libp2p_swarm::NegotiatedSubstream;
4032
use smallvec::SmallVec;
4133
use std::io;
4234

protocols/request-response/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@
7070
7171
pub mod codec;
7272
pub mod handler;
73+
pub mod throttled;
7374

7475
pub use codec::{RequestResponseCodec, ProtocolName};
7576
pub use handler::ProtocolSupport;
77+
pub use throttled::Throttled;
7678

7779
use futures::{
7880
channel::oneshot,
@@ -309,6 +311,11 @@ where
309311
}
310312
}
311313

314+
/// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer.
315+
pub fn throttled(self) -> Throttled<TCodec> {
316+
Throttled::new(self)
317+
}
318+
312319
/// Initiates sending a request.
313320
///
314321
/// If the targeted peer is currently not connected, a dialing
@@ -604,4 +611,3 @@ struct Connection {
604611
id: ConnectionId,
605612
address: Option<Multiaddr>,
606613
}
607-
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
// Copyright 2020 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
22+
use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId};
23+
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
24+
use lru::LruCache;
25+
use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}};
26+
use std::{cmp::min, num::NonZeroU16};
27+
use super::{
28+
RequestId,
29+
RequestResponse,
30+
RequestResponseCodec,
31+
RequestResponseEvent,
32+
ResponseChannel
33+
};
34+
35+
/// A wrapper around [`RequestResponse`] which adds request limits per peer.
36+
///
37+
/// Each peer is assigned a default limit of concurrent requests and
38+
/// responses, which can be overriden per peer.
39+
///
40+
/// It is not possible to send more requests than configured and receiving
41+
/// more is reported as an error event. Since `libp2p-request-response` is
42+
/// not its own protocol, there is no way to communicate limits to peers,
43+
/// hence nodes must have pre-established knowledge about each other's limits.
44+
pub struct Throttled<C: RequestResponseCodec> {
45+
/// A random id used for logging.
46+
id: u32,
47+
/// The wrapped behaviour.
48+
behaviour: RequestResponse<C>,
49+
/// Limits per peer.
50+
limits: HashMap<PeerId, Limit>,
51+
/// After disconnects we keep limits around to prevent circumventing
52+
/// them by successive reconnects.
53+
previous: LruCache<PeerId, Limit>,
54+
/// The default limit applied to all peers unless overriden.
55+
default: Limit,
56+
/// Pending events to report in `Throttled::poll`.
57+
events: VecDeque<Event<C::Request, C::Response>>
58+
}
59+
60+
/// A `Limit` of inbound and outbound requests.
61+
#[derive(Clone, Debug)]
62+
struct Limit {
63+
/// The remaining number of outbound requests that can be send.
64+
send_budget: u16,
65+
/// The remaining number of inbound requests that can be received.
66+
recv_budget: u16,
67+
/// The original limit which applies to inbound and outbound requests.
68+
maximum: NonZeroU16
69+
}
70+
71+
impl Default for Limit {
72+
fn default() -> Self {
73+
let maximum = NonZeroU16::new(1).expect("1 > 0");
74+
Limit {
75+
send_budget: maximum.get(),
76+
recv_budget: maximum.get(),
77+
maximum
78+
}
79+
}
80+
}
81+
82+
/// A Wrapper around [`RequestResponseEvent`].
83+
#[derive(Debug)]
84+
pub enum Event<Req, Res> {
85+
/// A regular request-response event.
86+
Event(RequestResponseEvent<Req, Res>),
87+
/// We received more inbound requests than allowed.
88+
TooManyInboundRequests(PeerId),
89+
/// When previously reaching the send limit of a peer,
90+
/// this event is eventually emitted when sending is
91+
/// allowed to resume.
92+
ResumeSending(PeerId)
93+
}
94+
95+
impl<C: RequestResponseCodec + Clone> Throttled<C> {
96+
/// Wrap an existing `RequestResponse` behaviour and apply send/recv limits.
97+
pub fn new(behaviour: RequestResponse<C>) -> Self {
98+
Throttled {
99+
id: rand::random(),
100+
behaviour,
101+
limits: HashMap::new(),
102+
previous: LruCache::new(2048),
103+
default: Limit::default(),
104+
events: VecDeque::new()
105+
}
106+
}
107+
108+
/// Get the current default limit applied to all peers.
109+
pub fn default_limit(&self) -> u16 {
110+
self.default.maximum.get()
111+
}
112+
113+
/// Override the global default limit.
114+
///
115+
/// See [`Throttled::set_limit`] to override limits for individual peers.
116+
pub fn set_default_limit(&mut self, limit: NonZeroU16) {
117+
log::trace!("{:08x}: new default limit: {:?}", self.id, limit);
118+
self.default = Limit {
119+
send_budget: limit.get(),
120+
recv_budget: limit.get(),
121+
maximum: limit
122+
}
123+
}
124+
125+
/// Specify the send and receive limit for a single peer.
126+
pub fn set_limit(&mut self, id: &PeerId, limit: NonZeroU16) {
127+
log::trace!("{:08x}: new limit for {}: {:?}", self.id, id, limit);
128+
self.previous.pop(id);
129+
self.limits.insert(id.clone(), Limit {
130+
send_budget: limit.get(),
131+
recv_budget: limit.get(),
132+
maximum: limit
133+
});
134+
}
135+
136+
/// Has the limit of outbound requests been reached for the given peer?
137+
pub fn can_send(&mut self, id: &PeerId) -> bool {
138+
self.limits.get(id).map(|l| l.send_budget > 0).unwrap_or(true)
139+
}
140+
141+
/// Send a request to a peer.
142+
///
143+
/// If the limit of outbound requests has been reached, the request is
144+
/// returned. Sending more outbound requests should only be attempted
145+
/// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`].
146+
pub fn send_request(&mut self, id: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
147+
log::trace!("{:08x}: sending request to {}", self.id, id);
148+
149+
// Getting the limit is somewhat complicated due to the connection state.
150+
// Applications may try to send a request to a peer we have never been connected
151+
// to, or a peer we have previously been connected to. In the first case, the
152+
// default limit applies and in the latter, the cached limit from the previous
153+
// connection (if still available).
154+
let mut limit =
155+
if let Some(limit) = self.limits.get_mut(id) {
156+
limit
157+
} else {
158+
let limit = self.previous.pop(id).unwrap_or_else(|| self.default.clone());
159+
self.limits.entry(id.clone()).or_insert(limit)
160+
};
161+
162+
if limit.send_budget == 0 {
163+
log::trace!("{:08x}: no budget to send request to {}", self.id, id);
164+
return Err(req)
165+
}
166+
167+
limit.send_budget -= 1;
168+
169+
Ok(self.behaviour.send_request(id, req))
170+
}
171+
172+
/// Answer an inbound request with a response.
173+
///
174+
/// See [`RequestResponse::send_response`] for details.
175+
pub fn send_response(&mut self, ch: ResponseChannel<C::Response>, rs: C::Response) {
176+
if let Some(limit) = self.limits.get_mut(&ch.peer) {
177+
limit.recv_budget += 1;
178+
debug_assert!(limit.recv_budget <= limit.maximum.get())
179+
}
180+
self.behaviour.send_response(ch, rs)
181+
}
182+
183+
/// Add a known peer address.
184+
///
185+
/// See [`RequestResponse::add_address`] for details.
186+
pub fn add_address(&mut self, id: &PeerId, ma: Multiaddr) {
187+
self.behaviour.add_address(id, ma)
188+
}
189+
190+
/// Remove a previously added peer address.
191+
///
192+
/// See [`RequestResponse::remove_address`] for details.
193+
pub fn remove_address(&mut self, id: &PeerId, ma: &Multiaddr) {
194+
self.behaviour.remove_address(id, ma)
195+
}
196+
197+
/// Are we connected to the given peer?
198+
///
199+
/// See [`RequestResponse::is_connected`] for details.
200+
pub fn is_connected(&self, id: &PeerId) -> bool {
201+
self.behaviour.is_connected(id)
202+
}
203+
204+
/// Are we waiting for a response to the given request?
205+
///
206+
/// See [`RequestResponse::is_pending`] for details.
207+
pub fn is_pending(&self, id: &RequestId) -> bool {
208+
self.behaviour.is_pending(id)
209+
}
210+
}
211+
212+
impl<C> NetworkBehaviour for Throttled<C>
213+
where
214+
C: RequestResponseCodec + Send + Clone + 'static
215+
{
216+
type ProtocolsHandler = RequestResponseHandler<C>;
217+
type OutEvent = Event<C::Request, C::Response>;
218+
219+
fn new_handler(&mut self) -> Self::ProtocolsHandler {
220+
self.behaviour.new_handler()
221+
}
222+
223+
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
224+
self.behaviour.addresses_of_peer(peer)
225+
}
226+
227+
fn inject_connection_established(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
228+
self.behaviour.inject_connection_established(p, id, end)
229+
}
230+
231+
fn inject_connection_closed(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
232+
self.behaviour.inject_connection_closed(p, id, end);
233+
}
234+
235+
fn inject_connected(&mut self, peer: &PeerId) {
236+
log::trace!("{:08x}: connected to {}", self.id, peer);
237+
self.behaviour.inject_connected(peer);
238+
// The limit may have been added by [`Throttled::send_request`] already.
239+
if !self.limits.contains_key(peer) {
240+
let limit = self.previous.pop(peer).unwrap_or_else(|| self.default.clone());
241+
self.limits.insert(peer.clone(), limit);
242+
}
243+
}
244+
245+
fn inject_disconnected(&mut self, peer: &PeerId) {
246+
log::trace!("{:08x}: disconnected from {}", self.id, peer);
247+
self.behaviour.inject_disconnected(peer);
248+
// Save the limit in case the peer reconnects soon.
249+
if let Some(limit) = self.limits.remove(peer) {
250+
self.previous.put(peer.clone(), limit);
251+
}
252+
}
253+
254+
fn inject_dial_failure(&mut self, peer: &PeerId) {
255+
self.behaviour.inject_dial_failure(peer)
256+
}
257+
258+
fn inject_event(&mut self, p: PeerId, i: ConnectionId, e: RequestResponseHandlerEvent<C>) {
259+
match e {
260+
// Cases where an outbound request has been resolved.
261+
| RequestResponseHandlerEvent::Response {..}
262+
| RequestResponseHandlerEvent::OutboundTimeout (_)
263+
| RequestResponseHandlerEvent::OutboundUnsupportedProtocols (_) =>
264+
if let Some(limit) = self.limits.get_mut(&p) {
265+
if limit.send_budget == 0 {
266+
log::trace!("{:08x}: sending to peer {} can resume", self.id, p);
267+
self.events.push_back(Event::ResumeSending(p.clone()))
268+
}
269+
limit.send_budget = min(limit.send_budget + 1, limit.maximum.get())
270+
}
271+
// A new inbound request.
272+
| RequestResponseHandlerEvent::Request {..} =>
273+
if let Some(limit) = self.limits.get_mut(&p) {
274+
if limit.recv_budget == 0 {
275+
log::error!("{:08x}: peer {} exceeds its budget", self.id, p);
276+
return self.events.push_back(Event::TooManyInboundRequests(p))
277+
}
278+
limit.recv_budget -= 1
279+
}
280+
// The inbound request has expired so grant more budget to receive another one.
281+
| RequestResponseHandlerEvent::InboundTimeout =>
282+
if let Some(limit) = self.limits.get_mut(&p) {
283+
limit.recv_budget = min(limit.recv_budget + 1, limit.maximum.get())
284+
}
285+
// Nothing to do here ...
286+
| RequestResponseHandlerEvent::InboundUnsupportedProtocols => {}
287+
}
288+
self.behaviour.inject_event(p, i, e)
289+
}
290+
291+
fn poll(&mut self, cx: &mut Context<'_>, p: &mut impl PollParameters)
292+
-> Poll<NetworkBehaviourAction<RequestProtocol<C>, Self::OutEvent>>
293+
{
294+
if let Some(ev) = self.events.pop_front() {
295+
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
296+
} else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD {
297+
self.events.shrink_to_fit()
298+
}
299+
300+
self.behaviour.poll(cx, p).map(|a| a.map_out(Event::Event))
301+
}
302+
}

0 commit comments

Comments
 (0)