Skip to content

Commit 9937acb

Browse files
committed
fix(bitswap/network): set read deadline before stream Close to prevent blocking
SendMessage() can block indefinitely when the remote peer is slow or unresponsive during the multistream-select handshake completion. The fix sets a read deadline (using the calculated send timeout) before calling stream.Close(), ensuring the operation will time out rather than block indefinitely. See: multiformats/go-multistream#47 See: ipshipyard/waterworks-infra#860
1 parent de0b141 commit 9937acb

File tree

3 files changed

+106
-9
lines changed

3 files changed

+106
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ The following emojis are used to highlight certain changes:
2828

2929
### Fixed
3030

31+
- `bitswap/network`: Fixed goroutine leak that could cause bitswap to stop serving blocks after extended uptime. The root cause is `stream.Close()` blocking indefinitely when remote peers are unresponsive during multistream handshake ([go-libp2p#3448](https://github.com/libp2p/go-libp2p/pull/3448)). This PR ([#1083](https://github.com/ipfs/boxo/pull/1083)) adds a localized fix specific to bitswap's `SendMessage` by setting a read deadline before closing streams.
32+
3133
### Security
3234

3335

bitswap/network/bsnet/ipfs_impl.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,14 @@ func (bsnet *impl) SendMessage(
390390
return err
391391
}
392392

393+
// Set a read deadline to prevent Close() from blocking indefinitely
394+
// when the remote peer is slow or unresponsive during multistream
395+
// handshake completion.
396+
// See: https://github.com/multiformats/go-multistream/issues/47
397+
// See: https://github.com/ipshipyard/waterworks-infra/issues/860
398+
if err := s.SetReadDeadline(time.Now().Add(timeout)); err != nil {
399+
log.Debugf("error setting read deadline: %s", err)
400+
}
393401
return s.Close()
394402
}
395403

bitswap/network/bsnet/ipfs_impl_test.go

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,22 @@ var errMockNetErr = errors.New("network err")
7373

7474
type ErrStream struct {
7575
p2pnet.Stream
76-
lk sync.Mutex
77-
err error
78-
timingOut bool
79-
closed bool
76+
lk sync.Mutex
77+
err error
78+
timingOut bool
79+
closed bool
80+
blockOnClose bool // if true, Close() will block until deadline
81+
readDeadlineSet bool // tracks if SetReadDeadline was called
82+
readDeadline time.Time // the deadline that was set
8083
}
8184

8285
type ErrHost struct {
8386
host.Host
84-
lk sync.Mutex
85-
err error
86-
timingOut bool
87-
streams []*ErrStream
87+
lk sync.Mutex
88+
err error
89+
timingOut bool
90+
blockOnClose bool
91+
streams []*ErrStream
8892
}
8993

9094
func (es *ErrStream) Write(b []byte) (int, error) {
@@ -100,11 +104,36 @@ func (es *ErrStream) Write(b []byte) (int, error) {
100104
return es.Stream.Write(b)
101105
}
102106

107+
func (es *ErrStream) SetReadDeadline(t time.Time) error {
108+
es.lk.Lock()
109+
defer es.lk.Unlock()
110+
es.readDeadlineSet = true
111+
es.readDeadline = t
112+
return es.Stream.SetReadDeadline(t)
113+
}
114+
103115
func (es *ErrStream) Close() error {
104116
es.lk.Lock()
117+
blockOnClose := es.blockOnClose
118+
readDeadlineSet := es.readDeadlineSet
119+
readDeadline := es.readDeadline
105120
es.closed = true
106121
es.lk.Unlock()
107122

123+
if blockOnClose {
124+
if readDeadlineSet && !readDeadline.IsZero() {
125+
// Simulate blocking until deadline (the fix sets a deadline, so this will timeout)
126+
waitTime := time.Until(readDeadline)
127+
if waitTime > 0 {
128+
time.Sleep(waitTime)
129+
}
130+
} else {
131+
// No deadline set - would block forever (demonstrates the bug without fix)
132+
// In test, we use a channel to avoid actually blocking forever
133+
select {}
134+
}
135+
}
136+
108137
return es.Stream.Close()
109138
}
110139

@@ -140,7 +169,7 @@ func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID
140169
return nil, context.DeadlineExceeded
141170
}
142171
stream, err := eh.Host.NewStream(ctx, p, pids...)
143-
estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut}
172+
estrm := &ErrStream{Stream: stream, err: eh.err, timingOut: eh.timingOut, blockOnClose: eh.blockOnClose}
144173

145174
eh.streams = append(eh.streams, estrm)
146175
return estrm, err
@@ -170,6 +199,18 @@ func (eh *ErrHost) setTimeoutState(timingOut bool) {
170199
}
171200
}
172201

202+
func (eh *ErrHost) setBlockOnClose(block bool) {
203+
eh.lk.Lock()
204+
defer eh.lk.Unlock()
205+
206+
eh.blockOnClose = block
207+
for _, s := range eh.streams {
208+
s.lk.Lock()
209+
s.blockOnClose = block
210+
s.lk.Unlock()
211+
}
212+
}
213+
173214
func TestMessageSendAndReceive(t *testing.T) {
174215
// create network
175216
ctx := context.Background()
@@ -671,3 +712,49 @@ func TestNetworkCounters(t *testing.T) {
671712
testNetworkCounters(t, 10-n, n)
672713
}
673714
}
715+
716+
// TestSendMessageCloseDoesNotHang verifies that SendMessage calls SetReadDeadline
717+
// before Close(), preventing indefinite blocking when the remote peer is
718+
// unresponsive during multistream handshake completion.
719+
//
720+
// This test uses ErrStream to simulate a blocking Close() that only unblocks
721+
// when SetReadDeadline has been called. This proves the fix works without
722+
// relying on real network timeouts.
723+
func TestSendMessageCloseDoesNotHang(t *testing.T) {
724+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
725+
defer cancel()
726+
727+
p1 := tnet.RandIdentityOrFatal(t)
728+
r1 := newReceiver()
729+
p2 := tnet.RandIdentityOrFatal(t)
730+
r2 := newReceiver()
731+
732+
// Use prepareNetwork but we'll configure blocking after
733+
eh1, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
734+
735+
// Configure h1's streams to block on Close() - this simulates the scenario
736+
// where multistream handshake read would block indefinitely.
737+
// With the fix, SetReadDeadline is called before Close(), so the simulated
738+
// blocking will respect the deadline and unblock.
739+
eh1.setBlockOnClose(true)
740+
741+
// SendMessage should complete because the fix sets a read deadline before
742+
// calling Close(). The ErrStream.Close() will block until the deadline,
743+
// simulating the real-world scenario where Close() would hang without
744+
// a deadline.
745+
start := time.Now()
746+
err := bsnet1.SendMessage(ctx, p2.ID(), msg)
747+
elapsed := time.Since(start)
748+
749+
// The sendTimeout for a small message is minSendTimeout (10s).
750+
// With the fix, Close() should return after waiting until the deadline.
751+
// Without the fix, it would hang forever (ErrStream.Close blocks indefinitely
752+
// when blockOnClose=true and no deadline is set).
753+
maxExpected := 15 * time.Second // minSendTimeout + margin
754+
if elapsed > maxExpected {
755+
t.Fatalf("SendMessage took %v, expected < %v (should timeout via SetReadDeadline)", elapsed, maxExpected)
756+
}
757+
758+
// Error is expected because the simulated blocking causes the deadline to be reached
759+
t.Logf("SendMessage returned in %v with error: %v", elapsed, err)
760+
}

0 commit comments

Comments
 (0)