Skip to content

Commit c2adc7a

Browse files
committed
fix(basic_host): set read deadline before multistream Close to prevent blocking
streamWrapper.Close() can block indefinitely when the remote peer is slow or unresponsive during the multistream-select handshake completion. The lazy multistream protocol negotiation defers reading the handshake response until Close() is called. If the remote peer doesn't respond, the read blocks forever, causing goroutine leaks. This is particularly problematic for bitswap servers where taskWorkers can get stuck trying to close streams after sending blocks. The fix sets a read deadline (using DefaultNegotiationTimeout) before calling the multistream Close(), ensuring the operation will time out rather than block indefinitely. Related: multiformats/go-multistream#47 Related: multiformats/go-multistream#48
1 parent b0b2a18 commit c2adc7a

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

p2p/host/basic/basic_host.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,11 @@ func (s *streamWrapper) Write(b []byte) (int, error) {
683683
}
684684

685685
func (s *streamWrapper) Close() error {
686+
// Set a read deadline to prevent Close() from blocking indefinitely
687+
// waiting for the multistream-select handshake to complete.
688+
// This can happen when the remote peer is slow or unresponsive.
689+
// See: https://github.com/multiformats/go-multistream/issues/47
690+
_ = s.Stream.SetReadDeadline(time.Now().Add(DefaultNegotiationTimeout))
686691
return s.rw.Close()
687692
}
688693

p2p/host/basic/basic_host_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,61 @@ func TestNegotiationCancel(t *testing.T) {
785785
}
786786
}
787787

788+
// TestStreamCloseDoesNotHangOnUnresponsivePeer verifies that stream.Close()
789+
// returns within DefaultNegotiationTimeout even when the remote peer never
790+
// completes the multistream handshake. Without the read deadline fix in
791+
// streamWrapper.Close(), this would hang indefinitely.
792+
func TestStreamCloseDoesNotHangOnUnresponsivePeer(t *testing.T) {
793+
ctx := t.Context()
794+
795+
h1, h2 := getHostPair(t)
796+
defer h1.Close()
797+
defer h2.Close()
798+
799+
const testProto = "/test/hang"
800+
801+
// Manually add protocol to peerstore so h1 thinks h2 supports it.
802+
// This makes NewStream use lazy multistream (skipping negotiation until Close).
803+
h1.Peerstore().AddProtocols(h2.ID(), testProto)
804+
805+
// h2 accepts streams at the network level but never responds to
806+
// multistream protocol negotiation, simulating an unresponsive peer.
807+
h2.Network().SetStreamHandler(func(s network.Stream) {
808+
// Read incoming data but never write back - simulates unresponsive peer
809+
buf := make([]byte, 1024)
810+
for {
811+
_, err := s.Read(buf)
812+
if err != nil {
813+
return
814+
}
815+
}
816+
})
817+
818+
// Open stream to h2 - uses lazy multistream because protocol is "known"
819+
s, err := h1.NewStream(ctx, h2.ID(), testProto)
820+
require.NoError(t, err)
821+
822+
// Trigger the lazy handshake by writing data.
823+
// The write succeeds (buffered), but the read handshake will block
824+
// because h2 never sends a response.
825+
_, err = s.Write([]byte("trigger handshake"))
826+
require.NoError(t, err)
827+
828+
// Close() should return within DefaultNegotiationTimeout because the fix
829+
// sets a read deadline before calling the underlying Close().
830+
// Without the fix, this would hang indefinitely.
831+
start := time.Now()
832+
_ = s.Close()
833+
elapsed := time.Since(start)
834+
835+
// Allow some margin for test execution overhead
836+
maxExpected := DefaultNegotiationTimeout + 2*time.Second
837+
require.Less(t, elapsed, maxExpected,
838+
"Close() took %v, expected < %v (DefaultNegotiationTimeout + margin)", elapsed, maxExpected)
839+
840+
t.Logf("Close() returned in %v (limit: %v)", elapsed, maxExpected)
841+
}
842+
788843
func waitForAddrChangeEvent(ctx context.Context, sub event.Subscription, t *testing.T) event.EvtLocalAddressesUpdated {
789844
t.Helper()
790845
for {

0 commit comments

Comments
 (0)