Skip to content

Commit 589f29f

Browse files
author
Surkov Kirill
authored
Merge branch 'libp2p:master' into master
2 parents dc2f682 + aefbfbd commit 589f29f

File tree

6 files changed

+153
-55
lines changed

6 files changed

+153
-55
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ libp2p-swarm-test = { version = "0.6.0", path = "swarm-test" }
108108
libp2p-tcp = { version = "0.44.0", path = "transports/tcp" }
109109
libp2p-tls = { version = "0.6.2", path = "transports/tls" }
110110
libp2p-uds = { version = "0.43.0", path = "transports/uds" }
111-
libp2p-upnp = { version = "0.5.0", path = "protocols/upnp" }
111+
libp2p-upnp = { version = "0.5.1", path = "protocols/upnp" }
112112
libp2p-webrtc = { version = "0.9.0-alpha.1", path = "transports/webrtc" }
113113
libp2p-webrtc-utils = { version = "0.4.0", path = "misc/webrtc-utils" }
114114
libp2p-webrtc-websys = { version = "0.4.0", path = "transports/webrtc-websys" }

misc/peer-store/src/memory_store.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! ```
1010
1111
use std::{
12-
collections::{HashMap, VecDeque},
12+
collections::VecDeque,
1313
num::NonZeroUsize,
1414
task::{Poll, Waker},
1515
};
@@ -49,10 +49,9 @@ pub enum Event {
4949

5050
/// A in-memory store that uses LRU cache for bounded storage of addresses
5151
/// and a frequency-based ordering of addresses.
52-
#[derive(Default)]
5352
pub struct MemoryStore<T = ()> {
5453
/// The internal store.
55-
records: HashMap<PeerId, PeerRecord<T>>,
54+
records: LruCache<PeerId, PeerRecord<T>>,
5655
/// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm).
5756
pending_events: VecDeque<Event>,
5857
/// Config of the store.
@@ -65,8 +64,8 @@ impl<T> MemoryStore<T> {
6564
/// Create a new [`MemoryStore`] with the given config.
6665
pub fn new(config: Config) -> Self {
6766
Self {
67+
records: LruCache::new(config.peer_capacity().get()),
6868
config,
69-
records: HashMap::new(),
7069
pending_events: VecDeque::default(),
7170
waker: None,
7271
}
@@ -137,7 +136,7 @@ impl<T> MemoryStore<T> {
137136

138137
/// Get a reference to a peer's custom data.
139138
pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
140-
self.records.get(peer).and_then(|r| r.get_custom_data())
139+
self.records.peek(peer).and_then(|r| r.get_custom_data())
141140
}
142141

143142
/// Take ownership of the internal data, leaving `None` in its place.
@@ -240,7 +239,7 @@ impl<T> Store for MemoryStore<T> {
240239
}
241240

242241
fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
243-
self.records.get(peer).map(|record| record.addresses())
242+
self.records.peek(peer).map(|record| record.addresses())
244243
}
245244

246245
fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Self::Event> {
@@ -257,26 +256,40 @@ impl<T> Store for MemoryStore<T> {
257256
/// Config for [`MemoryStore`]. The available options are documented via their setters.
258257
#[derive(Debug, Clone)]
259258
pub struct Config {
259+
peer_capacity: NonZeroUsize,
260260
record_capacity: NonZeroUsize,
261261
remove_addr_on_dial_error: bool,
262262
}
263263

264264
impl Default for Config {
265265
fn default() -> Self {
266266
Self {
267+
peer_capacity: NonZeroUsize::try_from(1000).expect("1000 > 0"),
267268
record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"),
268269
remove_addr_on_dial_error: true,
269270
}
270271
}
271272
}
272273

273274
impl Config {
275+
pub fn peer_capacity(&self) -> &NonZeroUsize {
276+
&self.peer_capacity
277+
}
278+
/// The capacity of the address store per peer.
279+
///
280+
/// The least recently updated peer will be discarded to make room for a new peer.
281+
///
282+
/// `1000` by default.
283+
pub fn set_peer_capacity(mut self, capacity: NonZeroUsize) -> Self {
284+
self.peer_capacity = capacity;
285+
self
286+
}
274287
pub fn record_capacity(&self) -> &NonZeroUsize {
275288
&self.record_capacity
276289
}
277-
/// The capacity of an address store.
290+
/// The capacity of the address store per peer.
278291
///
279-
/// The least active address will be discarded to make room for new address.
292+
/// The least active address will be discarded to make room for a new address.
280293
///
281294
/// `8` by default.
282295
pub fn set_record_capacity(mut self, capacity: NonZeroUsize) -> Self {

protocols/upnp/CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
## 0.5.1
2+
3+
- Skip port mapping when an active port mapping is present.
4+
Previously, the behavior would skip creating new mappings if any mapping
5+
(active or inactive or pending) existed for the same port. Now it correctly only
6+
checks active mappings on the gateway.
7+
See [PR 6127](https://github.com/libp2p/rust-libp2p/pull/6127).
8+
9+
- Fix excessive retry attempts for failed port mappings by implementing exponential backoff.
10+
Failed mappings now retry up to 5 times with increasing delays (30s to 480s) before giving up.
11+
This prevents continuous retry loops.
12+
See [PR 6128](https://github.com/libp2p/rust-libp2p/pull/6128).
13+
114
## 0.5.0
215

316
- update igd-next to 0.16.1

protocols/upnp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-upnp"
33
edition.workspace = true
44
rust-version.workspace = true
55
description = "UPnP support for libp2p transports"
6-
version = "0.5.0"
6+
version = "0.5.1"
77
license = "MIT"
88
repository = "https://github.com/libp2p/rust-libp2p"
99
keywords = ["peer-to-peer", "libp2p", "networking"]

protocols/upnp/src/behaviour.rs

Lines changed: 116 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ const MAPPING_DURATION: u32 = 3600;
5353
/// Renew the Mapping every half of `MAPPING_DURATION` to avoid the port being unmapped.
5454
const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
5555

56+
/// Maximum number of retry attempts for failed mappings.
57+
const MAX_RETRY_ATTEMPTS: u32 = 5;
58+
59+
/// Base delay in seconds for exponential backoff (will be multiplied by 2^retry_count).
60+
const BASE_RETRY_DELAY_SECS: u64 = 30;
61+
62+
/// Maximum delay in seconds between retry attempts.
63+
const MAX_RETRY_DELAY_SECS: u64 = 1800;
64+
5665
/// A [`Gateway`] Request.
5766
#[derive(Debug)]
5867
pub(crate) enum GatewayRequest {
@@ -122,11 +131,19 @@ enum MappingState {
122131
/// Port mapping is inactive, will be requested or re-requested on the next iteration.
123132
Inactive,
124133
/// Port mapping/removal has been requested on the gateway.
125-
Pending,
134+
Pending {
135+
/// Number of times we've tried to map this port.
136+
retry_count: u32,
137+
},
126138
/// Port mapping is active with the inner timeout.
127139
Active(Delay),
128-
/// Port mapping failed, we will try again.
129-
Failed,
140+
/// Port mapping failed with retry information.
141+
Failed {
142+
/// Number of times we've tried to map this port.
143+
retry_count: u32,
144+
/// When we should try again (None means no more retries).
145+
next_retry: Option<Delay>,
146+
},
130147
}
131148

132149
/// Current state of the UPnP [`Gateway`].
@@ -174,7 +191,7 @@ impl MappingList {
174191
fn renew(&mut self, gateway: &mut Gateway, cx: &mut Context<'_>) {
175192
for (mapping, state) in self.iter_mut() {
176193
match state {
177-
MappingState::Inactive | MappingState::Failed => {
194+
MappingState::Inactive => {
178195
let duration = MAPPING_DURATION;
179196
if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
180197
mapping: mapping.clone(),
@@ -185,8 +202,34 @@ impl MappingList {
185202
"could not request port mapping for multiaddress on the gateway: {}",
186203
err
187204
);
205+
} else {
206+
*state = MappingState::Pending { retry_count: 0 };
207+
}
208+
}
209+
MappingState::Failed {
210+
retry_count,
211+
next_retry,
212+
} => {
213+
if let Some(delay) = next_retry {
214+
if Pin::new(delay).poll(cx).is_ready() {
215+
let duration = MAPPING_DURATION;
216+
if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
217+
mapping: mapping.clone(),
218+
duration,
219+
}) {
220+
tracing::debug!(
221+
multiaddress=%mapping.multiaddr,
222+
retry_count=%retry_count,
223+
"could not retry port mapping for multiaddress on the gateway: {}",
224+
err
225+
);
226+
} else {
227+
*state = MappingState::Pending {
228+
retry_count: *retry_count,
229+
};
230+
}
231+
}
188232
}
189-
*state = MappingState::Pending;
190233
}
191234
MappingState::Active(timeout) => {
192235
if Pin::new(timeout).poll(cx).is_ready() {
@@ -203,7 +246,7 @@ impl MappingList {
203246
}
204247
}
205248
}
206-
MappingState::Pending => {}
249+
MappingState::Pending { .. } => {}
207250
}
208251
}
209252
}
@@ -270,15 +313,14 @@ impl NetworkBehaviour for Behaviour {
270313
return;
271314
};
272315

273-
if let Some((mapping, _state)) = self
274-
.mappings
275-
.iter()
276-
.find(|(mapping, _state)| mapping.internal_addr.port() == addr.port())
277-
{
316+
if let Some((mapping, _state)) = self.mappings.iter().find(|(mapping, state)| {
317+
matches!(state, MappingState::Active(_))
318+
&& mapping.internal_addr.port() == addr.port()
319+
}) {
278320
tracing::debug!(
279321
multiaddress=%multiaddr,
280322
mapped_multiaddress=%mapping.multiaddr,
281-
"port from multiaddress is already being mapped"
323+
"port from multiaddress is already mapped on the gateway"
282324
);
283325
return;
284326
}
@@ -318,7 +360,8 @@ impl NetworkBehaviour for Behaviour {
318360
);
319361
}
320362

321-
self.mappings.insert(mapping, MappingState::Pending);
363+
self.mappings
364+
.insert(mapping, MappingState::Pending { retry_count: 0 });
322365
}
323366
GatewayState::GatewayNotFound => {
324367
tracing::debug!(
@@ -352,7 +395,8 @@ impl NetworkBehaviour for Behaviour {
352395
err
353396
);
354397
}
355-
self.mappings.insert(mapping, MappingState::Pending);
398+
self.mappings
399+
.insert(mapping, MappingState::Pending { retry_count: 0 });
356400
}
357401
}
358402
}
@@ -428,7 +472,7 @@ impl NetworkBehaviour for Behaviour {
428472
.insert(mapping.clone(), new_state)
429473
.expect("mapping should exist")
430474
{
431-
MappingState::Pending => {
475+
MappingState::Pending { .. } => {
432476
let external_multiaddr =
433477
mapping.external_addr(gateway.external_addr);
434478
self.pending_events.push_back(Event::NewExternalAddr(
@@ -454,36 +498,64 @@ impl NetworkBehaviour for Behaviour {
454498
}
455499
}
456500
GatewayEvent::MapFailure(mapping, err) => {
457-
match self
458-
.mappings
459-
.insert(mapping.clone(), MappingState::Failed)
460-
.expect("mapping should exist")
461-
{
462-
MappingState::Active(_) => {
463-
tracing::debug!(
464-
address=%mapping.internal_addr,
465-
protocol=%mapping.protocol,
466-
"failed to remap UPnP mapped for protocol: {err}"
467-
);
468-
let external_multiaddr =
469-
mapping.external_addr(gateway.external_addr);
470-
self.pending_events.push_back(Event::ExpiredExternalAddr(
471-
external_multiaddr.clone(),
472-
));
473-
return Poll::Ready(ToSwarm::ExternalAddrExpired(
474-
external_multiaddr,
475-
));
476-
}
477-
MappingState::Pending => {
478-
tracing::debug!(
479-
address=%mapping.internal_addr,
480-
protocol=%mapping.protocol,
481-
"failed to map UPnP mapped for protocol: {err}"
482-
);
483-
}
484-
_ => {
485-
unreachable!()
501+
let prev_state =
502+
self.mappings.get(&mapping).expect("mapping should exist");
503+
504+
let (retry_count, was_active) = match prev_state {
505+
MappingState::Active(_) => (0, true),
506+
MappingState::Pending { retry_count } => (*retry_count, false),
507+
MappingState::Failed { retry_count, .. } => {
508+
(*retry_count, false)
486509
}
510+
_ => unreachable!(),
511+
};
512+
513+
let new_retry_count = retry_count + 1;
514+
let next_retry = if new_retry_count < MAX_RETRY_ATTEMPTS {
515+
let delay_secs = std::cmp::min(
516+
BASE_RETRY_DELAY_SECS
517+
.saturating_mul(2_u64.pow(retry_count)),
518+
MAX_RETRY_DELAY_SECS,
519+
);
520+
Some(Delay::new(Duration::from_secs(delay_secs)))
521+
} else {
522+
tracing::warn!(
523+
address=%mapping.internal_addr,
524+
protocol=%mapping.protocol,
525+
"giving up on UPnP mapping after {new_retry_count} attempts"
526+
);
527+
None
528+
};
529+
530+
self.mappings.insert(
531+
mapping.clone(),
532+
MappingState::Failed {
533+
retry_count: new_retry_count,
534+
next_retry,
535+
},
536+
);
537+
538+
if was_active {
539+
tracing::debug!(
540+
address=%mapping.internal_addr,
541+
protocol=%mapping.protocol,
542+
"failed to remap UPnP mapped for protocol: {err}"
543+
);
544+
let external_multiaddr =
545+
mapping.external_addr(gateway.external_addr);
546+
self.pending_events.push_back(Event::ExpiredExternalAddr(
547+
external_multiaddr.clone(),
548+
));
549+
return Poll::Ready(ToSwarm::ExternalAddrExpired(
550+
external_multiaddr,
551+
));
552+
} else {
553+
tracing::debug!(
554+
address=%mapping.internal_addr,
555+
protocol=%mapping.protocol,
556+
retry_count=%new_retry_count,
557+
"failed to map UPnP mapped for protocol: {err}"
558+
);
487559
}
488560
}
489561
GatewayEvent::Removed(mapping) => {

0 commit comments

Comments
 (0)