diff --git a/connection.go b/connection.go index 32689da0..387a4ba5 100644 --- a/connection.go +++ b/connection.go @@ -285,9 +285,10 @@ func (c *Connection) setupConnection() error { if !ok { return } - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { - // Return a bare io.EOF error if error is EOF/ErrUnexpectedEOF - c.errorChan <- io.EOF + var connErr *muxer.ConnectionClosedError + if errors.As(err, &connErr) { + // Pass through ConnectionClosedError from muxer + c.errorChan <- err } else { // Wrap error message to denote it comes from the muxer c.errorChan <- fmt.Errorf("muxer error: %w", err) @@ -354,7 +355,7 @@ func (c *Connection) setupConnection() error { select { case <-c.doneChan: // Return an error if we're shutting down - return io.EOF + return fmt.Errorf("connection shutdown initiated: %w", io.EOF) case err := <-c.protoErrorChan: // Shutdown the connection and return the error c.Close() diff --git a/muxer/muxer.go b/muxer/muxer.go index f8998ec1..244cf85b 100644 --- a/muxer/muxer.go +++ b/muxer/muxer.go @@ -69,6 +69,19 @@ type Muxer struct { onceStop sync.Once } +type ConnectionClosedError struct { + Context string + Err error +} + +func (e *ConnectionClosedError) Error() string { + return fmt.Sprintf("peer closed the connection while %s: %v", e.Context, e.Err) +} + +func (e *ConnectionClosedError) Unwrap() error { + return e.Err +} + // New creates a new Muxer object and starts the read loop func New(conn net.Conn) *Muxer { m := &Muxer{ @@ -287,7 +300,11 @@ func (m *Muxer) readLoop() { if errors.Is(err, io.ErrClosedPipe) { err = io.EOF } - m.sendError(err) + if errors.Is(err, io.EOF) { + m.sendError(&ConnectionClosedError{Context: "reading header", Err: err}) + } else { + m.sendError(err) + } return } msg := &Segment{ @@ -300,7 +317,11 @@ func (m *Muxer) readLoop() { if errors.Is(err, io.ErrClosedPipe) { err = io.EOF } - m.sendError(err) + if errors.Is(err, io.EOF) { + m.sendError(&ConnectionClosedError{Context: "reading payload", Err: err}) + } else { + m.sendError(err) + } return } // Check for message from initiator when we're not configured as a responder