Skip to content

Commit 11c9180

Browse files
mvdanp4u
authored andcommitted
multirpc/subpub: fix potential goroutine deadlocks
When the connection to a peer is lost, broadcastHandler errors in its SendMessage call, and the entire goroutine stops. No goroutine will continue receiving on the write channel, and sooner than later, sends to the write channel will start blocking. This starts causing deadlocks further up in IPFSsync. SubPub.Subscribe and SubPub.PeerStreamWrite can now block forever, and further up the chain in IPFSsync, that can mean some goroutines hold onto mutexes forever. On one hand, this chain of events can hang IPFSsync, stopping it from doing anything useful until a restart. On the other hand, it causes goroutine leaks. When more calls to IPFSsync.Handle come through, using new goroutines via the router, those try to grab the deadlocked mutexes and hang forever. First, fix the root cause: peerSub now has a "closed" channel, which gets closed by peersManager when the peer is dropped. Its goroutines, both for reading and writing messages, keep running until that happens. Second, make the symptom of the deadlock less severe: prevent blocking on channel sends forever. Any send on the "write" channel now stops on "closed". And the send on BroadcastWriter, which could also block forever, now has a fallback timeout of five minutes. Updates #243. Perhaps not a total fix, as there might be other leaks.
1 parent d8c83c6 commit 11c9180

File tree

4 files changed

+52
-12
lines changed

4 files changed

+52
-12
lines changed

multirpc/subpub/discovery.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,15 @@ func (ps *SubPub) Subscribe(ctx context.Context) {
5858
case <-ps.close:
5959
return
6060
case msg := <-ps.BroadcastWriter:
61+
6162
ps.PeersMu.Lock()
6263
for _, peer := range ps.Peers {
6364
if peer.write == nil {
6465
continue
6566
}
6667
select {
6768
case peer.write <- msg:
69+
case <-peer.peerClosed:
6870
default:
6971
log.Infof("dropping broadcast message for peer %s", peer.id)
7072
}

multirpc/subpub/peers.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@ import (
1313
)
1414

1515
type peerSub struct {
16-
id libpeer.ID
16+
id libpeer.ID
17+
1718
write chan []byte
19+
20+
// peerClosed signals that we've lost the connection with the peer, or
21+
// it has been removed by peersManager.
22+
// When closed, its goroutines stop.
23+
peerClosed chan bool
1824
}
1925

2026
// PeerStreamWrite looks for an existing connection with peerID and calls the callback function with the writer channel as parameter
@@ -31,8 +37,14 @@ func (ps *SubPub) PeerStreamWrite(peerID string, msg []byte) error {
3137
if peerIdx < 0 {
3238
return fmt.Errorf("no connection with peer %s, cannot open stream", peerID)
3339
}
34-
ps.Peers[peerIdx].write <- msg
35-
return nil
40+
41+
peer := ps.Peers[peerIdx]
42+
select {
43+
case peer.write <- msg:
44+
return nil
45+
case <-peer.peerClosed:
46+
return nil
47+
}
3648
}
3749

3850
// FindTopic opens one or multiple new streams with the peers announcing the namespace.
@@ -101,6 +113,8 @@ func (ps *SubPub) peersManager() {
101113
if len(ps.Host.Network().ConnsToPeer(peer.id)) > 0 {
102114
continue
103115
}
116+
close(peer.peerClosed)
117+
104118
// Remove peer if no active connection
105119
ps.Peers[i] = ps.Peers[len(ps.Peers)-1]
106120
ps.Peers = ps.Peers[:len(ps.Peers)-1]

multirpc/subpub/stream.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
)
1111

1212
func (ps *SubPub) handleStream(stream network.Stream) {
13+
peerClosed := make(chan bool)
14+
1315
// First, ensure that any messages read from the stream are sent to the
1416
// SubPub.Reader channel.
15-
go ps.readHandler(stream)
17+
go ps.readHandler(peerClosed, stream)
1618

1719
// Second, ensure that, from now on, any broadcast message is sent to
1820
// this stream as well.
@@ -27,38 +29,50 @@ func (ps *SubPub) handleStream(stream network.Stream) {
2729
pid := stream.Conn().RemotePeer()
2830
ps.PeersMu.Lock()
2931
defer ps.PeersMu.Unlock()
30-
ps.Peers = append(ps.Peers, peerSub{pid, write}) // TO-DO this should be a map
32+
ps.Peers = append(ps.Peers, peerSub{ // TO-DO this should be a map
33+
id: pid,
34+
peerClosed: peerClosed,
35+
write: write,
36+
})
3137
if fn := ps.onPeerAdd; fn != nil {
3238
fn(pid)
3339
}
3440
log.Infof("connected to peer %s: %+v", pid, stream.Conn().RemoteMultiaddr())
35-
go ps.broadcastHandler(write, bufio.NewWriter(stream))
41+
go ps.broadcastHandler(peerClosed, write, bufio.NewWriter(stream))
3642
}
3743

38-
func (ps *SubPub) broadcastHandler(write <-chan []byte, w *bufio.Writer) {
44+
func (ps *SubPub) broadcastHandler(peerClosed <-chan bool, write <-chan []byte, w *bufio.Writer) {
3945
for {
4046
select {
4147
case <-ps.close:
4248
return
49+
case <-peerClosed:
50+
return
4351
case msg := <-write:
4452
if err := ps.SendMessage(w, msg); err != nil {
4553
log.Debugf("error writing to buffer: (%s)", err)
46-
return
54+
continue
4755
}
4856
if err := w.Flush(); err != nil {
4957
log.Debugf("error flushing write buffer: (%s)", err)
50-
return
58+
continue
5159
}
5260
}
5361
}
5462
}
5563

56-
func (ps *SubPub) readHandler(stream network.Stream) {
64+
func (ps *SubPub) readHandler(peerClosed <-chan bool, stream network.Stream) {
5765
r := bufio.NewReader(stream)
66+
67+
// Ensure that we always close the stream.
68+
defer stream.Close()
69+
5870
for {
5971
select {
6072
case <-ps.close:
6173
return
74+
case <-peerClosed:
75+
return
6276
default:
6377
// continues below
6478
}
@@ -67,7 +81,6 @@ func (ps *SubPub) readHandler(stream network.Stream) {
6781
bare.MaxUnmarshalBytes(bareMaxUnmarshalBytes)
6882
if err := bare.UnmarshalReader(io.Reader(r), message); err != nil {
6983
log.Debugf("error reading stream buffer %s: %v", stream.Conn().RemotePeer().Pretty(), err)
70-
stream.Close()
7184
return
7285
} else if len(message.Data) == 0 {
7386
log.Debugf("no data could be read from stream: %s (%+v)", stream.Conn().RemotePeer().Pretty(), stream.Stat())

multirpc/transports/subpubtransport/subpub.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,18 @@ func (s *SubPubHandle) ConnectionType() string {
8686

8787
func (s *SubPubHandle) Send(msg transports.Message) error {
8888
log.Debugf("sending %d bytes to broadcast channel", len(msg.Data))
89-
s.SubPub.BroadcastWriter <- msg.Data
89+
90+
// Use a fallback timeout of five minutes, to prevent blocking forever
91+
// or leaking goroutines.
92+
// TODO(mvdan): turn this fallback timeout into a ctx parameter
93+
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
94+
defer cancel()
95+
96+
select {
97+
case s.SubPub.BroadcastWriter <- msg.Data:
98+
case <-ctx.Done():
99+
return ctx.Err()
100+
}
90101
return nil
91102
}
92103

0 commit comments

Comments
 (0)