Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,11 @@ func (s *streamWrapper) Write(b []byte) (int, error) {
}

func (s *streamWrapper) Close() error {
// Set a read deadline to prevent Close() from blocking indefinitely
// waiting for the multistream-select handshake to complete.
// This can happen when the remote peer is slow or unresponsive.
// See: https://github.com/multiformats/go-multistream/issues/47
_ = s.Stream.SetReadDeadline(time.Now().Add(DefaultNegotiationTimeout))
return s.rw.Close()
}

Expand Down
81 changes: 81 additions & 0 deletions p2p/host/basic/basic_host_synctest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//go:build go1.25

package basichost_test

import (
"testing"
"testing/synctest"
"time"

"github.com/libp2p/go-libp2p/core/network"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/x/simlibp2p"

"github.com/stretchr/testify/require"
)

// TestStreamCloseDoesNotHangOnUnresponsivePeer verifies that stream.Close()
// returns within DefaultNegotiationTimeout even when the remote peer never
// completes the multistream handshake. Without the read deadline fix in
// streamWrapper.Close(), this would hang indefinitely.
func TestStreamCloseDoesNotHangOnUnresponsivePeer_synctest(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx := t.Context()

h1, h2 := simlibp2p.GetBasicHostPair(t)
defer h1.Close()
defer h2.Close()

const testProto = "/test/hang"

// Manually add protocol to peerstore so h1 thinks h2 supports it.
// This makes NewStream use lazy multistream (skipping negotiation until Close).
h1.Peerstore().AddProtocols(h2.ID(), testProto)

// h2 accepts streams at the network level but never responds to
// multistream protocol negotiation, simulating an unresponsive peer.
h2.Network().SetStreamHandler(func(s network.Stream) {
// Read incoming data but never write back - simulates unresponsive peer
buf := make([]byte, 1024)
for {
_, err := s.Read(buf)
if err != nil {
return
}
}
})

// Open stream to h2 - uses lazy multistream because protocol is "known"
s, err := h1.NewStream(ctx, h2.ID(), testProto)
require.NoError(t, err)

// Trigger the lazy handshake by writing data.
// The write succeeds (buffered), but the read handshake will block
// because h2 never sends a response.
_, err = s.Write([]byte("trigger handshake"))
require.NoError(t, err)

// Close() should return within DefaultNegotiationTimeout because the fix
// sets a read deadline before calling the underlying Close().
// Without the fix, this would hang indefinitely.
elapsedCh := make(chan time.Duration)
go func() {
start := time.Now()
_ = s.Close()
elapsedCh <- time.Since(start)
}()

maxExpected := basichost.DefaultNegotiationTimeout
var elapsed time.Duration
select {
case elapsed = <-elapsedCh:
case <-time.After(maxExpected + time.Second):
t.Fatal("timeout waiting for Close()")
}

require.Equal(t, elapsed, maxExpected,
"Close() took %v, expected < %v (DefaultNegotiationTimeout + margin)", elapsed, maxExpected)

t.Logf("Close() returned in %v (limit: %v)", elapsed, maxExpected)
})
}
Loading