Skip to content

Commit 8dcbdca

Browse files
committed
p2p: track write errors and prevent writes during shutdown
As of this commit, we no longer rely on the protocol handler to report write errors in a timely fashion. When a write fails, shutdown is initiated immediately and no new writes can start. This will also prevent new writes from starting after Server.Stop has been called.
1 parent 6f5c615 commit 8dcbdca

File tree

1 file changed

+57
-25
lines changed

1 file changed

+57
-25
lines changed

p2p/peer.go

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -115,37 +115,54 @@ func newPeer(conn *conn, protocols []Protocol) *Peer {
115115
}
116116

117117
func (p *Peer) run() DiscReason {
118-
readErr := make(chan error, 1)
118+
var (
119+
writeStart = make(chan struct{}, 1)
120+
writeErr = make(chan error, 1)
121+
readErr = make(chan error, 1)
122+
reason DiscReason
123+
requested bool
124+
)
119125
p.wg.Add(2)
120126
go p.readLoop(readErr)
121127
go p.pingLoop()
122128

123-
p.startProtocols()
129+
// Start all protocol handlers.
130+
writeStart <- struct{}{}
131+
p.startProtocols(writeStart, writeErr)
124132

125133
// Wait for an error or disconnect.
126-
var (
127-
reason DiscReason
128-
requested bool
129-
)
130-
select {
131-
case err := <-readErr:
132-
if r, ok := err.(DiscReason); ok {
133-
reason = r
134-
} else {
135-
// Note: We rely on protocols to abort if there is a write
136-
// error. It might be more robust to handle them here as well.
137-
glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err)
138-
reason = DiscNetworkError
134+
loop:
135+
for {
136+
select {
137+
case err := <-writeErr:
138+
// A write finished. Allow the next write to start if
139+
// there was no error.
140+
if err != nil {
141+
glog.V(logger.Detail).Infof("%v: Write error: %v\n", p, err)
142+
reason = DiscNetworkError
143+
break loop
144+
}
145+
writeStart <- struct{}{}
146+
case err := <-readErr:
147+
if r, ok := err.(DiscReason); ok {
148+
reason = r
149+
} else {
150+
glog.V(logger.Detail).Infof("%v: Read error: %v\n", p, err)
151+
reason = DiscNetworkError
152+
}
153+
break loop
154+
case err := <-p.protoErr:
155+
reason = discReasonForError(err)
156+
break loop
157+
case reason = <-p.disc:
158+
requested = true
159+
break loop
139160
}
140-
case err := <-p.protoErr:
141-
reason = discReasonForError(err)
142-
case reason = <-p.disc:
143-
requested = true
144161
}
162+
145163
close(p.closed)
146164
p.rw.close(reason)
147165
p.wg.Wait()
148-
149166
if requested {
150167
reason = DiscRequested
151168
}
@@ -247,11 +264,13 @@ outer:
247264
return result
248265
}
249266

250-
func (p *Peer) startProtocols() {
267+
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
251268
p.wg.Add(len(p.running))
252269
for _, proto := range p.running {
253270
proto := proto
254271
proto.closed = p.closed
272+
proto.wstart = writeStart
273+
proto.werr = writeErr
255274
glog.V(logger.Detail).Infof("%v: Starting protocol %s/%d\n", p, proto.Name, proto.Version)
256275
go func() {
257276
err := proto.Run(p, proto)
@@ -280,18 +299,31 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
280299

281300
type protoRW struct {
282301
Protocol
283-
in chan Msg
284-
closed <-chan struct{}
302+
in chan Msg // receices read messages
303+
closed <-chan struct{} // receives when peer is shutting down
304+
wstart <-chan struct{} // receives when write may start
305+
werr chan<- error // for write results
285306
offset uint64
286307
w MsgWriter
287308
}
288309

289-
func (rw *protoRW) WriteMsg(msg Msg) error {
310+
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
290311
if msg.Code >= rw.Length {
291312
return newPeerError(errInvalidMsgCode, "not handled")
292313
}
293314
msg.Code += rw.offset
294-
return rw.w.WriteMsg(msg)
315+
select {
316+
case <-rw.wstart:
317+
err = rw.w.WriteMsg(msg)
318+
// Report write status back to Peer.run. It will initiate
319+
// shutdown if the error is non-nil and unblock the next write
320+
// otherwise. The calling protocol code should exit for errors
321+
// as well but we don't want to rely on that.
322+
rw.werr <- err
323+
case <-rw.closed:
324+
err = fmt.Errorf("shutting down")
325+
}
326+
return err
295327
}
296328

297329
func (rw *protoRW) ReadMsg() (Msg, error) {

0 commit comments

Comments
 (0)