Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ struct ProtocolStatus {
enum InboundSubstreamState {
/// Waiting for a request from the remote.
WaitingMessage {
/// Whether it is the first message to be awaited on this stream.
first: bool,
/// How long before we give up on waiting for the first request on this stream.
/// `None` if there has already been a request on this stream, or it has timed out and can
/// be re-used.
first_request_timeout: Option<Delay>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only the first request ?
Maybe I'm missing context here but ideally we timeout for any pending request and move to next one no ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeouts are redundant on any requests after the first request on a substream. See commit 8053eb9, which removes the redundant timeout on the second and later requests.

Background

In the upstream code, substreams are already available for re-use as soon as the first request has finished. But there's no timeout upstream, so if the first request never arrives, the substream can never be re-used.

When the remote peer opens a substream, we know it will send at least one request, so this fix makes the inbound side wait 10 seconds for that first request. After the timeout, the substream can be re-used immediately. This matches the timeout and re-use behaviour on the outbound side.

Peer Behaviour

After the first request, there are two possible behaviours in the protocol that we need to handle. The peer can send another request on the same substream ID, or it can use a new substream ID.

To handle the case where the peer re-uses the same substream ID, we leave the substream ID slot available, unless we reach the substream limit.

To handle the case where the peer sends a new substream ID, we accept new substreams up to a limit (32).
When that limit is reached, we drop any timed out or used substream IDs immediately, and re-use that substream slot with the new ID.

This is why there is no timeout after the first request - we don't need one, because the substream can be re-used immediately. We'd just be adding timers and load for nothing.

connection_id: UniqueConnecId,
substream: KadInStreamSink<Stream>,
/// How long before we give up on waiting for a request.
request_timeout: Delay,
},
/// Waiting for the behaviour to send a [`HandlerIn`] event containing the response.
WaitingBehaviour(
Expand Down Expand Up @@ -191,10 +191,9 @@ impl InboundSubstreamState {
},
) {
InboundSubstreamState::WaitingMessage {
first_request_timeout: _,
substream,
first: _,
connection_id: _,
request_timeout: _,
}
| InboundSubstreamState::WaitingBehaviour(_, substream, _, _)
| InboundSubstreamState::PendingSend(_, substream, _, _)
Expand Down Expand Up @@ -559,7 +558,10 @@ impl Handler {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
InboundSubstreamState::WaitingMessage {
first_request_timeout: None,
..
}
)
}) {
*s = InboundSubstreamState::Cancelled;
Expand All @@ -582,10 +584,9 @@ impl Handler {
self.next_connec_unique_id.0 += 1;
self.inbound_substreams
.push(InboundSubstreamState::WaitingMessage {
first: true,
first_request_timeout: Some(Delay::new(SUBSTREAM_TIMEOUT)),
connection_id: connec_unique_id,
substream: protocol,
request_timeout: Delay::new(SUBSTREAM_TIMEOUT),
});
}

Expand Down Expand Up @@ -921,13 +922,12 @@ impl futures::Stream for InboundSubstreamState {
},
) {
InboundSubstreamState::WaitingMessage {
first,
mut first_request_timeout,
connection_id,
mut substream,
mut request_timeout,
} => match (
substream.poll_next_unpin(cx),
request_timeout.poll_unpin(cx),
first_request_timeout.as_mut().map(|t| t.poll_unpin(cx)),
) {
// Prefer ready requests over ready timeouts
(Poll::Ready(Some(Ok(KadRequestMsg::Ping))), _) => {
Expand Down Expand Up @@ -971,13 +971,11 @@ impl futures::Stream for InboundSubstreamState {
)));
}
(Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))), _) => {
// The request has finished, so renew the request timeout
let request_timeout = Delay::new(SUBSTREAM_TIMEOUT);
// This request type requires no response
*this = InboundSubstreamState::WaitingMessage {
first: false,
first_request_timeout: None,
connection_id,
substream,
request_timeout,
};
return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::AddProvider { key, provider },
Expand Down Expand Up @@ -1017,30 +1015,28 @@ impl futures::Stream for InboundSubstreamState {
},
)));
}
(Poll::Pending, Poll::Pending) => {
(Poll::Pending, Some(Poll::Pending)) | (Poll::Pending, None) => {
// Keep the original request timeout
*this = InboundSubstreamState::WaitingMessage {
first,
first_request_timeout,
connection_id,
substream,
request_timeout,
};
return Poll::Pending;
}
(Poll::Pending, Poll::Ready(())) => {
(Poll::Pending, Some(Poll::Ready(()))) => {
tracing::debug!(
?first,
first = ?first_request_timeout.is_some(),
?connection_id,
"Inbound substream timed out waiting for request",
);

// Renew the request timeout, but mark this substream as available for re-use
let request_timeout = Delay::new(SUBSTREAM_TIMEOUT);
// Drop the first request timeout, and mark this substream as available for
// re-use
*this = InboundSubstreamState::WaitingMessage {
first: false,
first_request_timeout: None,
connection_id,
substream,
request_timeout,
};
return Poll::Pending;
}
Expand Down Expand Up @@ -1086,7 +1082,12 @@ impl futures::Stream for InboundSubstreamState {
return Poll::Pending;
}
},
InboundSubstreamState::PendingSend(id, mut substream, msg, mut response_timeout) => {
InboundSubstreamState::PendingSend(
id,
mut substream,
msg,
mut response_timeout,
) => {
match (
substream.poll_ready_unpin(cx),
response_timeout.poll_unpin(cx),
Expand Down Expand Up @@ -1130,10 +1131,9 @@ impl futures::Stream for InboundSubstreamState {
) {
(Poll::Ready(Ok(())), _) => {
*this = InboundSubstreamState::WaitingMessage {
first: false,
first_request_timeout: None,
connection_id: id,
substream,
request_timeout: Delay::new(SUBSTREAM_TIMEOUT),
};
}
(Poll::Pending, Poll::Pending) => {
Expand Down
Loading