Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ func (c *Connection) setupConnection() error {
if !ok {
return
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// Return a bare io.EOF error if error is EOF/ErrUnexpectedEOF
c.errorChan <- io.EOF
var connErr *muxer.ConnectionClosedError
if errors.As(err, &connErr) {
// Pass through ConnectionClosedError from muxer
c.errorChan <- err
} else {
// Wrap error message to denote it comes from the muxer
c.errorChan <- fmt.Errorf("muxer error: %w", err)
Expand Down Expand Up @@ -354,7 +355,7 @@ func (c *Connection) setupConnection() error {
select {
case <-c.doneChan:
// Return an error if we're shutting down
return io.EOF
return fmt.Errorf("connection shutdown initiated: %w", io.EOF)
case err := <-c.protoErrorChan:
// Shutdown the connection and return the error
c.Close()
Expand Down
25 changes: 23 additions & 2 deletions muxer/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ type Muxer struct {
onceStop sync.Once
}

type ConnectionClosedError struct {
Context string
Err error
}

func (e *ConnectionClosedError) Error() string {
return fmt.Sprintf("peer closed the connection while %s: %v", e.Context, e.Err)
}

func (e *ConnectionClosedError) Unwrap() error {
return e.Err
}

// New creates a new Muxer object and starts the read loop
func New(conn net.Conn) *Muxer {
m := &Muxer{
Expand Down Expand Up @@ -287,7 +300,11 @@ func (m *Muxer) readLoop() {
if errors.Is(err, io.ErrClosedPipe) {
err = io.EOF
}
m.sendError(err)
if errors.Is(err, io.EOF) {
m.sendError(&ConnectionClosedError{Context: "reading header", Err: err})
} else {
m.sendError(err)
}
return
}
msg := &Segment{
Expand All @@ -300,7 +317,11 @@ func (m *Muxer) readLoop() {
if errors.Is(err, io.ErrClosedPipe) {
err = io.EOF
}
m.sendError(err)
if errors.Is(err, io.EOF) {
m.sendError(&ConnectionClosedError{Context: "reading payload", Err: err})
} else {
m.sendError(err)
}
return
}
// Check for message from initiator when we're not configured as a responder
Expand Down
Loading