Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 110 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
"io"
"log/slog"
"net"
"os"
"strings"
"sync"
"syscall"
"time"

"github.com/blinklabs-io/gouroboros/connection"
Expand Down Expand Up @@ -250,6 +253,100 @@
close(c.errorChan)
}

// isConnectionReset checks if an error is a connection reset error using proper error type checking
func (c *Connection) isConnectionReset(err error) bool {
if errors.Is(err, io.EOF) {
return true
}

// Check for connection reset errors using proper error type checking
var opErr *net.OpError
if errors.As(err, &opErr) {
if syscallErr, ok := opErr.Err.(*os.SyscallError); ok {

Check failure on line 265 in connection.go

View workflow job for this annotation

GitHub Actions / lint

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
if errno, ok := syscallErr.Err.(syscall.Errno); ok {

Check failure on line 266 in connection.go

View workflow job for this annotation

GitHub Actions / lint

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
// Check for connection reset (ECONNRESET) or broken pipe (EPIPE)
return errno == syscall.ECONNRESET || errno == syscall.EPIPE
}
}
// Also check for string-based errors as fallback for edge cases
errStr := opErr.Err.Error()
return strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "broken pipe")
}

return false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for error types rather than substring matches of the Error() output. Do we actually get any "connection closed" error that isn't io.EOF?


// checkProtocolsDone checks if the protocols are explicitly stopped by the client - treat as normal connection closure
func (c *Connection) checkProtocolsDone() bool {
// Check chain-sync protocol
if c.chainSync != nil {
if (c.chainSync.Client != nil && !c.chainSync.Client.IsDone()) ||
(c.chainSync.Server != nil && !c.chainSync.Server.IsDone()) {
return false
}
}

// Check block-fetch protocol
if c.blockFetch != nil {
if (c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone()) ||
(c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone()) {
return false
}
}

// Check tx-submission protocol
if c.txSubmission != nil {
if (c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone()) ||
(c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone()) {
return false
}
}

// Check local-state-query protocol
if c.localStateQuery != nil {
if (c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone()) ||
(c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone()) {
return false
}
}

// Check local-tx-monitor protocol
if c.localTxMonitor != nil {
if (c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone()) ||
(c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone()) {
return false
}
}

// Check local-tx-submission protocol
if c.localTxSubmission != nil {
if (c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone()) ||
(c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone()) {
return false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add LocalStateQuery, LocalTxMonitor, and LocalTxSubmission to this. It should be safe to do the NtN and NtC protocols all together with the nil checks.


return true
}

// handleConnectionError handles connection-level errors centrally
func (c *Connection) handleConnectionError(err error) error {
if err == nil {
return nil
}

if c.checkProtocolsDone() {
return nil
}

if errors.Is(err, io.EOF) || c.isConnectionReset(err) {
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block doesn't seem to be useful, since we unconditionally return err right below it


return err
}

// setupConnection establishes the muxer, configures and starts the handshake process, and initializes
// the appropriate mini-protocols
func (c *Connection) setupConnection() error {
Expand Down Expand Up @@ -285,16 +382,20 @@
if !ok {
return
}
var connErr *muxer.ConnectionClosedError
if errors.As(err, &connErr) {
// Pass through ConnectionClosedError from muxer
c.errorChan <- err
} else {
// Wrap error message to denote it comes from the muxer
c.errorChan <- fmt.Errorf("muxer error: %w", err)

// Use centralized connection error handling
if handledErr := c.handleConnectionError(err); handledErr != nil {
var connErr *muxer.ConnectionClosedError
if errors.As(handledErr, &connErr) {
// Pass through ConnectionClosedError from muxer
c.errorChan <- handledErr
} else {
// Wrap error message to denote it comes from the muxer
c.errorChan <- fmt.Errorf("muxer error: %w", handledErr)
}
// Close connection on muxer errors
c.Close()
}
// Close connection on muxer errors
c.Close()
}
}()
protoOptions := protocol.ProtocolOptions{
Expand Down
Loading
Loading