Skip to content

Commit fcd410a

Browse files
authored
fix(swarm): keep connections alive while active streams exist
Resolves: #4520. Related: #4306. Pull-Request: #4595.
1 parent 4378722 commit fcd410a

File tree

23 files changed

+152
-581
lines changed

23 files changed

+152
-581
lines changed

protocols/dcutr/src/handler/relayed.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
3333
ListenUpgradeError,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol,
36+
ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol,
3737
};
3838
use std::collections::VecDeque;
3939
use std::task::{Context, Poll};
@@ -249,20 +249,12 @@ impl ConnectionHandler for Handler {
249249
}
250250
}
251251

252-
fn connection_keep_alive(&self) -> KeepAlive {
253-
if !self.queued_events.is_empty() {
254-
return KeepAlive::Yes;
255-
}
256-
257-
if self.inbound_connect.is_some() {
258-
return KeepAlive::Yes;
259-
}
260-
252+
fn connection_keep_alive(&self) -> bool {
261253
if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
262-
return KeepAlive::Yes;
254+
return true;
263255
}
264256

265-
KeepAlive::No
257+
false
266258
}
267259

268260
fn poll(

protocols/gossipsub/src/behaviour.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,7 +3265,6 @@ where
32653265
type ConnectionHandler = Handler;
32663266
type ToSwarm = Event;
32673267

3268-
#[allow(deprecated)]
32693268
fn handle_established_inbound_connection(
32703269
&mut self,
32713270
_: ConnectionId,
@@ -3276,7 +3275,6 @@ where
32763275
Ok(Handler::new(self.config.protocol_config()))
32773276
}
32783277

3279-
#[allow(deprecated)]
32803278
fn handle_established_outbound_connection(
32813279
&mut self,
32823280
_: ConnectionId,

protocols/gossipsub/src/handler.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use instant::Instant;
3030
use libp2p_core::upgrade::DeniedUpgrade;
3131
use libp2p_swarm::handler::{
3232
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
33-
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
34-
SubstreamProtocol,
33+
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
3534
};
3635
use libp2p_swarm::Stream;
3736
use smallvec::SmallVec;
@@ -424,26 +423,8 @@ impl ConnectionHandler for Handler {
424423
}
425424
}
426425

427-
fn connection_keep_alive(&self) -> KeepAlive {
428-
match self {
429-
Handler::Enabled(handler) => {
430-
if handler.in_mesh {
431-
return KeepAlive::Yes;
432-
}
433-
434-
if let Some(
435-
OutboundSubstreamState::PendingSend(_, _)
436-
| OutboundSubstreamState::PendingFlush(_),
437-
) = handler.outbound_substream
438-
{
439-
return KeepAlive::Yes;
440-
}
441-
442-
#[allow(deprecated)]
443-
KeepAlive::No
444-
}
445-
Handler::Disabled(_) => KeepAlive::No,
446-
}
426+
fn connection_keep_alive(&self) -> bool {
427+
matches!(self, Handler::Enabled(h) if h.in_mesh)
447428
}
448429

449430
fn poll(

protocols/identify/src/handler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
3333
ProtocolSupport,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
36+
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
3737
SubstreamProtocol, SupportedProtocols,
3838
};
3939
use log::{warn, Level};
@@ -314,14 +314,6 @@ impl ConnectionHandler for Handler {
314314
}
315315
}
316316

317-
fn connection_keep_alive(&self) -> KeepAlive {
318-
if !self.active_streams.is_empty() {
319-
return KeepAlive::Yes;
320-
}
321-
322-
KeepAlive::No
323-
}
324-
325317
fn poll(
326318
&mut self,
327319
cx: &mut Context<'_>,

protocols/kad/src/handler.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
3333
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3434
};
3535
use libp2p_swarm::{
36-
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError,
36+
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamUpgradeError,
3737
SubstreamProtocol, SupportedProtocols,
3838
};
3939
use log::trace;
@@ -702,14 +702,6 @@ impl ConnectionHandler for Handler {
702702
}
703703
}
704704

705-
fn connection_keep_alive(&self) -> KeepAlive {
706-
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
707-
return KeepAlive::No;
708-
};
709-
710-
KeepAlive::Yes
711-
}
712-
713705
fn poll(
714706
&mut self,
715707
cx: &mut Context<'_>,

protocols/ping/src/handler.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use libp2p_swarm::handler::{
2828
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
2929
};
3030
use libp2p_swarm::{
31-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
32-
StreamUpgradeError, SubstreamProtocol,
31+
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
32+
SubstreamProtocol,
3333
};
3434
use std::collections::VecDeque;
3535
use std::{
@@ -225,10 +225,6 @@ impl ConnectionHandler for Handler {
225225

226226
fn on_behaviour_event(&mut self, _: Void) {}
227227

228-
fn connection_keep_alive(&self) -> KeepAlive {
229-
KeepAlive::No
230-
}
231-
232228
fn poll(
233229
&mut self,
234230
cx: &mut Context<'_>,
@@ -349,15 +345,17 @@ impl ConnectionHandler for Handler {
349345
) {
350346
match event {
351347
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
352-
protocol: stream,
348+
protocol: mut stream,
353349
..
354350
}) => {
351+
stream.ignore_for_keep_alive();
355352
self.inbound = Some(protocol::recv_ping(stream).boxed());
356353
}
357354
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
358-
protocol: stream,
355+
protocol: mut stream,
359356
..
360357
}) => {
358+
stream.ignore_for_keep_alive();
361359
self.outbound = Some(OutboundState::Ping(
362360
send_ping(stream, self.config.timeout).boxed(),
363361
));

protocols/relay/src/behaviour/handler.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use libp2p_swarm::handler::{
3737
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3838
};
3939
use libp2p_swarm::{
40-
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamProtocol,
40+
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, Stream, StreamProtocol,
4141
StreamUpgradeError, SubstreamProtocol,
4242
};
4343
use std::collections::VecDeque;
@@ -376,10 +376,6 @@ pub struct Handler {
376376
///
377377
/// Contains a [`futures::future::Future`] for each lend out substream that
378378
/// resolves once the substream is dropped.
379-
///
380-
/// Once all substreams are dropped and this handler has no other work,
381-
/// [`KeepAlive::Until`] can be set, allowing the connection to be closed
382-
/// eventually.
383379
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<()>>,
384380
/// Futures relaying data for circuit between two peers.
385381
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
@@ -615,13 +611,12 @@ impl ConnectionHandler for Handler {
615611
}
616612
}
617613

618-
fn connection_keep_alive(&self) -> KeepAlive {
619-
match self.idle_at {
620-
Some(idle_at) if Instant::now().duration_since(idle_at) > Duration::from_secs(10) => {
621-
KeepAlive::No
622-
}
623-
_ => KeepAlive::Yes,
624-
}
614+
fn connection_keep_alive(&self) -> bool {
615+
let Some(idle_at) = self.idle_at else {
616+
return true;
617+
};
618+
619+
Instant::now().duration_since(idle_at) <= Duration::from_secs(10)
625620
}
626621

627622
fn poll(
@@ -881,13 +876,7 @@ impl ConnectionHandler for Handler {
881876
{}
882877

883878
// Check keep alive status.
884-
if self.reservation_request_future.is_none()
885-
&& self.circuit_accept_futures.is_empty()
886-
&& self.circuit_deny_futures.is_empty()
887-
&& self.alive_lend_out_substreams.is_empty()
888-
&& self.circuits.is_empty()
889-
&& self.active_reservation.is_none()
890-
{
879+
if self.active_reservation.is_none() {
891880
if self.idle_at.is_none() {
892881
self.idle_at = Some(Instant::now());
893882
}

protocols/relay/src/priv_client/handler.rs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use libp2p_swarm::handler::{
3737
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
3838
};
3939
use libp2p_swarm::{
40-
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError,
40+
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
4141
SubstreamProtocol,
4242
};
4343
use log::debug;
@@ -319,28 +319,8 @@ impl ConnectionHandler for Handler {
319319
}
320320
}
321321

322-
fn connection_keep_alive(&self) -> KeepAlive {
323-
if self.reservation.is_some() {
324-
return KeepAlive::Yes;
325-
}
326-
327-
if !self.alive_lend_out_substreams.is_empty() {
328-
return KeepAlive::Yes;
329-
}
330-
331-
if !self.circuit_deny_futs.is_empty() {
332-
return KeepAlive::Yes;
333-
}
334-
335-
if !self.open_circuit_futs.is_empty() {
336-
return KeepAlive::Yes;
337-
}
338-
339-
if !self.outbound_circuits.is_empty() {
340-
return KeepAlive::Yes;
341-
}
342-
343-
KeepAlive::No
322+
fn connection_keep_alive(&self) -> bool {
323+
self.reservation.is_some()
344324
}
345325

346326
fn poll(

protocols/request-response/src/handler.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use libp2p_swarm::handler::{
3232
ListenUpgradeError,
3333
};
3434
use libp2p_swarm::{
35-
handler::{ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError},
35+
handler::{ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError},
3636
SubstreamProtocol,
3737
};
3838
use smallvec::SmallVec;
@@ -59,8 +59,6 @@ where
5959
/// The timeout for inbound and outbound substreams (i.e. request
6060
/// and response processing).
6161
substream_timeout: Duration,
62-
/// The current connection keep-alive.
63-
keep_alive: KeepAlive,
6462
/// Queue of events to emit in `poll()`.
6563
pending_events: VecDeque<Event<TCodec>>,
6664
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
@@ -94,7 +92,6 @@ where
9492
Self {
9593
inbound_protocols,
9694
codec,
97-
keep_alive: KeepAlive::Yes,
9895
substream_timeout,
9996
outbound: VecDeque::new(),
10097
inbound: FuturesUnordered::new(),
@@ -274,14 +271,9 @@ where
274271
}
275272

276273
fn on_behaviour_event(&mut self, request: Self::FromBehaviour) {
277-
self.keep_alive = KeepAlive::Yes;
278274
self.outbound.push_back(request);
279275
}
280276

281-
fn connection_keep_alive(&self) -> KeepAlive {
282-
self.keep_alive
283-
}
284-
285277
fn poll(
286278
&mut self,
287279
cx: &mut Context<'_>,
@@ -300,7 +292,6 @@ where
300292
match result {
301293
Ok(((id, rq), rs_sender)) => {
302294
// We received an inbound request.
303-
self.keep_alive = KeepAlive::Yes;
304295
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Request {
305296
request_id: id,
306297
request: rq,
@@ -330,13 +321,6 @@ where
330321
self.outbound.shrink_to_fit();
331322
}
332323

333-
if self.inbound.is_empty() && self.keep_alive.is_yes() {
334-
// No new inbound or outbound requests. We already check
335-
// there is no active streams exist in swarm connection,
336-
// so we can set keep-alive to no directly.
337-
self.keep_alive = KeepAlive::No;
338-
}
339-
340324
Poll::Pending
341325
}
342326

swarm/src/behaviour/toggle.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ use crate::behaviour::FromSwarm;
2222
use crate::connection::ConnectionId;
2323
use crate::handler::{
2424
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
25-
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
26-
SubstreamProtocol,
25+
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, SubstreamProtocol,
2726
};
2827
use crate::upgrade::SendWrapper;
2928
use crate::{
@@ -291,11 +290,11 @@ where
291290
.on_behaviour_event(event)
292291
}
293292

294-
fn connection_keep_alive(&self) -> KeepAlive {
293+
fn connection_keep_alive(&self) -> bool {
295294
self.inner
296295
.as_ref()
297296
.map(|h| h.connection_keep_alive())
298-
.unwrap_or(KeepAlive::No)
297+
.unwrap_or(false)
299298
}
300299

301300
fn poll(

0 commit comments

Comments
 (0)