Skip to content

Commit 22f1e23

Browse files
authored
feat(gossipsub): process send-queue before inbound stream
Process outbound stream before inbound stream in `gossipsub::EnabledHandler::poll(..)`. Pull-Request: #4778.
1 parent 69e62cb commit 22f1e23

File tree

2 files changed

+67
-67
lines changed

2 files changed

+67
-67
lines changed

protocols/gossipsub/CHANGELOG.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
1111
See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642).
1212
- Return typed error from config builder.
13-
See [PR 4445].
14-
15-
[PR 4445]: https://github.com/libp2p/rust-libp2p/pull/4445
13+
See [PR 4445](https://github.com/libp2p/rust-libp2p/pull/4445).
14+
- Process outbound stream before inbound stream in `EnabledHandler::poll(..)`.
15+
See [PR 4778](https://github.com/libp2p/rust-libp2p/pull/4778).
1616

1717
## 0.45.2
1818

protocols/gossipsub/src/handler.rs

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -241,70 +241,6 @@ impl EnabledHandler {
241241
});
242242
}
243243

244-
loop {
245-
match std::mem::replace(
246-
&mut self.inbound_substream,
247-
Some(InboundSubstreamState::Poisoned),
248-
) {
249-
// inbound idle state
250-
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
251-
match substream.poll_next_unpin(cx) {
252-
Poll::Ready(Some(Ok(message))) => {
253-
self.last_io_activity = Instant::now();
254-
self.inbound_substream =
255-
Some(InboundSubstreamState::WaitingInput(substream));
256-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
257-
}
258-
Poll::Ready(Some(Err(error))) => {
259-
tracing::debug!("Failed to read from inbound stream: {error}");
260-
// Close this side of the stream. If the
261-
// peer is still around, they will re-establish their
262-
// outbound stream i.e. our inbound stream.
263-
self.inbound_substream =
264-
Some(InboundSubstreamState::Closing(substream));
265-
}
266-
// peer closed the stream
267-
Poll::Ready(None) => {
268-
tracing::debug!("Inbound stream closed by remote");
269-
self.inbound_substream =
270-
Some(InboundSubstreamState::Closing(substream));
271-
}
272-
Poll::Pending => {
273-
self.inbound_substream =
274-
Some(InboundSubstreamState::WaitingInput(substream));
275-
break;
276-
}
277-
}
278-
}
279-
Some(InboundSubstreamState::Closing(mut substream)) => {
280-
match Sink::poll_close(Pin::new(&mut substream), cx) {
281-
Poll::Ready(res) => {
282-
if let Err(e) = res {
283-
// Don't close the connection but just drop the inbound substream.
284-
// In case the remote has more to send, they will open up a new
285-
// substream.
286-
tracing::debug!("Inbound substream error while closing: {e}");
287-
}
288-
self.inbound_substream = None;
289-
break;
290-
}
291-
Poll::Pending => {
292-
self.inbound_substream =
293-
Some(InboundSubstreamState::Closing(substream));
294-
break;
295-
}
296-
}
297-
}
298-
None => {
299-
self.inbound_substream = None;
300-
break;
301-
}
302-
Some(InboundSubstreamState::Poisoned) => {
303-
unreachable!("Error occurred during inbound stream processing")
304-
}
305-
}
306-
}
307-
308244
// process outbound stream
309245
loop {
310246
match std::mem::replace(
@@ -382,6 +318,70 @@ impl EnabledHandler {
382318
}
383319
}
384320

321+
loop {
322+
match std::mem::replace(
323+
&mut self.inbound_substream,
324+
Some(InboundSubstreamState::Poisoned),
325+
) {
326+
// inbound idle state
327+
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
328+
match substream.poll_next_unpin(cx) {
329+
Poll::Ready(Some(Ok(message))) => {
330+
self.last_io_activity = Instant::now();
331+
self.inbound_substream =
332+
Some(InboundSubstreamState::WaitingInput(substream));
333+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(message));
334+
}
335+
Poll::Ready(Some(Err(error))) => {
336+
tracing::debug!("Failed to read from inbound stream: {error}");
337+
// Close this side of the stream. If the
338+
// peer is still around, they will re-establish their
339+
// outbound stream i.e. our inbound stream.
340+
self.inbound_substream =
341+
Some(InboundSubstreamState::Closing(substream));
342+
}
343+
// peer closed the stream
344+
Poll::Ready(None) => {
345+
tracing::debug!("Inbound stream closed by remote");
346+
self.inbound_substream =
347+
Some(InboundSubstreamState::Closing(substream));
348+
}
349+
Poll::Pending => {
350+
self.inbound_substream =
351+
Some(InboundSubstreamState::WaitingInput(substream));
352+
break;
353+
}
354+
}
355+
}
356+
Some(InboundSubstreamState::Closing(mut substream)) => {
357+
match Sink::poll_close(Pin::new(&mut substream), cx) {
358+
Poll::Ready(res) => {
359+
if let Err(e) = res {
360+
// Don't close the connection but just drop the inbound substream.
361+
// In case the remote has more to send, they will open up a new
362+
// substream.
363+
tracing::debug!("Inbound substream error while closing: {e}");
364+
}
365+
self.inbound_substream = None;
366+
break;
367+
}
368+
Poll::Pending => {
369+
self.inbound_substream =
370+
Some(InboundSubstreamState::Closing(substream));
371+
break;
372+
}
373+
}
374+
}
375+
None => {
376+
self.inbound_substream = None;
377+
break;
378+
}
379+
Some(InboundSubstreamState::Poisoned) => {
380+
unreachable!("Error occurred during inbound stream processing")
381+
}
382+
}
383+
}
384+
385385
Poll::Pending
386386
}
387387
}

0 commit comments

Comments
 (0)