Skip to content

Commit b56b47a

Browse files
committed
Improve logs for substream limits
1 parent 04c2e64 commit b56b47a

File tree

10 files changed

+93
-12
lines changed

10 files changed

+93
-12
lines changed

core/src/muxing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use std::future::Future;
5656
use std::pin::Pin;
5757

5858
pub use self::boxed::StreamMuxerBox;
59-
pub use self::boxed::SubstreamBox;
59+
pub use self::boxed::{AsyncReadWrite, SubstreamBox};
6060

6161
mod boxed;
6262

core/src/muxing/boxed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl fmt::Debug for SubstreamBox {
152152
}
153153

154154
/// Workaround because Rust does not allow `Box<dyn AsyncRead + AsyncWrite>`.
155-
trait AsyncReadWrite: AsyncRead + AsyncWrite {
155+
pub trait AsyncReadWrite: AsyncRead + AsyncWrite {
156156
/// Helper function to capture the erased inner type.
157157
///
158158
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful.

misc/multistream-select/src/length_delimited.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const DEFAULT_BUFFER_SIZE: usize = 64;
4242
pub(crate) struct LengthDelimited<R> {
4343
/// The inner I/O resource.
4444
#[pin]
45-
inner: R,
45+
pub(crate) inner: R,
4646
/// Read buffer for a single incoming unsigned-varint length-delimited frame.
4747
read_buffer: BytesMut,
4848
/// Write buffer for outgoing unsigned-varint length-delimited frames.
@@ -298,7 +298,7 @@ where
298298
#[derive(Debug)]
299299
pub(crate) struct LengthDelimitedReader<R> {
300300
#[pin]
301-
inner: LengthDelimited<R>,
301+
pub(crate) inner: LengthDelimited<R>,
302302
}
303303

304304
impl<R> LengthDelimitedReader<R> {

misc/multistream-select/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ mod protocol;
9595

9696
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
9797
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};
98-
pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError};
98+
pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError, State};
9999
pub use self::protocol::ProtocolError;
100100

101101
/// Supported multistream-select versions.

misc/multistream-select/src/negotiated.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use std::{
4848
#[derive(Debug)]
4949
pub struct Negotiated<TInner> {
5050
#[pin]
51-
state: State<TInner>,
51+
pub state: State<TInner>,
5252
}
5353

5454
/// A `Future` that waits on the completion of protocol negotiation.
@@ -197,7 +197,8 @@ impl<TInner> Negotiated<TInner> {
197197
/// The states of a `Negotiated` I/O stream.
198198
#[pin_project(project = StateProj)]
199199
#[derive(Debug)]
200-
enum State<R> {
200+
#[allow(private_interfaces)]
201+
pub enum State<R> {
201202
/// In this state, a `Negotiated` is still expecting to
202203
/// receive confirmation of the protocol it has optimistically
203204
/// settled on.
@@ -224,6 +225,26 @@ enum State<R> {
224225
Invalid,
225226
}
226227

228+
impl<R> State<R> {
229+
/// Returns the underlying protocol, if negotiation is in progress.
230+
pub fn protocol(&self) -> Option<&str> {
231+
match self {
232+
State::Expecting { protocol, .. } => Some(protocol.as_ref()),
233+
State::Completed { .. } => None,
234+
State::Invalid => None,
235+
}
236+
}
237+
238+
/// Returns the underlying stream, if it is available.
239+
pub fn inner(&self) -> Option<&R> {
240+
match self {
241+
State::Expecting { io, .. } => Some(&io.inner.inner.inner),
242+
State::Completed { io } => Some(io),
243+
State::Invalid => None,
244+
}
245+
}
246+
}
247+
227248
impl<TInner> AsyncRead for Negotiated<TInner>
228249
where
229250
TInner: AsyncRead + AsyncWrite + Unpin,

misc/multistream-select/src/protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ where
321321
#[derive(Debug)]
322322
pub(crate) struct MessageReader<R> {
323323
#[pin]
324-
inner: LengthDelimitedReader<R>,
324+
pub(crate) inner: LengthDelimitedReader<R>,
325325
}
326326

327327
impl<R> MessageReader<R> {

misc/quick-protobuf-codec/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub use generated::test as proto;
1515
///
1616
/// to prefix messages with their length and uses [`quick_protobuf`] and a provided
1717
/// `struct` implementing [`MessageRead`] and [`MessageWrite`] to do the encoding.
18+
#[derive(Debug)]
1819
pub struct Codec<In, Out = In> {
1920
max_message_len_bytes: usize,
2021
phantom: PhantomData<(In, Out)>,

protocols/kad/src/handler.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use libp2p_swarm::{
3535
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamUpgradeError, SubstreamProtocol,
3636
SupportedProtocols,
3737
};
38-
use std::collections::VecDeque;
38+
use std::collections::{HashMap, VecDeque};
3939
use std::task::Waker;
4040
use std::time::Duration;
4141
use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll};
@@ -98,6 +98,7 @@ struct ProtocolStatus {
9898
}
9999

100100
/// State of an active inbound substream.
101+
#[derive(Debug)]
101102
enum InboundSubstreamState {
102103
/// Waiting for a request from the remote.
103104
WaitingMessage {
@@ -173,6 +174,23 @@ impl InboundSubstreamState {
173174
InboundSubstreamState::Poisoned { .. } => unreachable!(),
174175
}
175176
}
177+
178+
/// Returns the `KadInStreamSink` associated with this state, or `None` if the state has no substream.
179+
fn sink(&self) -> Option<&KadInStreamSink<Stream>> {
180+
match self {
181+
InboundSubstreamState::WaitingMessage { substream, .. }
182+
| InboundSubstreamState::WaitingBehaviour(_, substream, _)
183+
| InboundSubstreamState::PendingSend(_, substream, _)
184+
| InboundSubstreamState::PendingFlush(_, substream)
185+
| InboundSubstreamState::Closing(substream) => Some(substream),
186+
InboundSubstreamState::Cancelled | InboundSubstreamState::Poisoned { .. } => None,
187+
}
188+
}
189+
190+
/// Returns the `Stream` associated with this state, or `None` if the state has no substream.
191+
fn stream(&self) -> Option<&Stream> {
192+
self.sink().map(|v| &**v)
193+
}
176194
}
177195

178196
/// Event produced by the Kademlia handler.
@@ -524,15 +542,23 @@ impl Handler {
524542
InboundSubstreamState::WaitingMessage { first: false, .. }
525543
)
526544
}) {
545+
let prev_state = format!("{s:?}");
546+
527547
*s = InboundSubstreamState::Cancelled;
548+
528549
tracing::debug!(
529550
peer=?self.remote_peer_id,
551+
?prev_state,
552+
?MAX_NUM_STREAMS,
553+
active_streams=?self.debug_inbound_substreams(),
530554
"New inbound substream to peer exceeds inbound substream limit. \
531555
Removed older substream waiting to be reused."
532-
)
556+
);
533557
} else {
534558
tracing::warn!(
535559
peer=?self.remote_peer_id,
560+
?MAX_NUM_STREAMS,
561+
active_streams=?self.debug_inbound_substreams(),
536562
"New inbound substream to peer exceeds inbound substream limit. \
537563
No older substream waiting to be reused. Dropping new substream."
538564
);
@@ -550,6 +576,30 @@ impl Handler {
550576
});
551577
}
552578

579+
/// Returns a summary of the protocols of the inbound substreams.
580+
fn debug_inbound_substreams(&self) -> String {
581+
use libp2p_core::muxing::AsyncReadWrite;
582+
583+
let mut protocols = HashMap::<_, usize>::new();
584+
let mut stream_types = HashMap::<_, usize>::new();
585+
586+
for substream in &self.inbound_substreams {
587+
let state = substream.stream().map(|s| &s.stream.state);
588+
let protocol = state.and_then(|s| s.protocol());
589+
let stream_type = state.and_then(|s| s.inner()).map(|i| i.type_name());
590+
591+
if let Some(protocol) = protocol {
592+
*protocols.entry(protocol).or_default() += 1;
593+
}
594+
595+
if let Some(stream_type) = stream_type {
596+
*stream_types.entry(stream_type).or_default() += 1;
597+
}
598+
}
599+
600+
format!("protocols: {protocols:?}, stream types: {stream_types:?}")
601+
}
602+
553603
/// Takes the given [`KadRequestMsg`] and composes it into an outbound request-response protocol handshake using a [`oneshot::channel`].
554604
fn queue_new_stream(&mut self, id: QueryId, msg: KadRequestMsg) {
555605
let (sender, receiver) = oneshot::channel();

protocols/kad/src/protocol.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl UpgradeInfo for ProtocolConfig {
199199
}
200200

201201
/// Codec for Kademlia inbound and outbound message framing.
202+
#[derive(Debug)]
202203
pub struct Codec<A, B> {
203204
codec: quick_protobuf_codec::Codec<proto::Message>,
204205
__phantom: PhantomData<(A, B)>,

swarm/src/stream.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,17 @@ use std::{
99
};
1010

1111
/// Counter for the number of active streams on a connection.
12-
#[derive(Debug, Clone)]
12+
#[derive(Clone)]
1313
pub(crate) struct ActiveStreamCounter(Arc<()>);
1414

15+
impl std::fmt::Debug for ActiveStreamCounter {
16+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17+
f.debug_tuple("ActiveStreamCounter")
18+
.field(&self.num_alive_streams())
19+
.finish()
20+
}
21+
}
22+
1523
impl ActiveStreamCounter {
1624
pub(crate) fn default() -> Self {
1725
Self(Arc::new(()))
@@ -28,7 +36,7 @@ impl ActiveStreamCounter {
2836

2937
#[derive(Debug)]
3038
pub struct Stream {
31-
stream: Negotiated<SubstreamBox>,
39+
pub stream: Negotiated<SubstreamBox>,
3240
counter: Option<ActiveStreamCounter>,
3341
}
3442

0 commit comments

Comments
 (0)