-
Notifications
You must be signed in to change notification settings - Fork 19
Not returning on connection close for chainsync, block-fetch and tx-submission protocol #1141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
6b39f59
c42b242
829e00e
b255d66
3afdaa0
4de8be1
a43fdd1
762277d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ import ( | |
"io" | ||
"log/slog" | ||
"net" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -250,6 +251,49 @@ func (c *Connection) shutdown() { | |
close(c.errorChan) | ||
} | ||
|
||
// isConnectionReset checks if an error is a connection reset error | ||
func (c *Connection) isConnectionReset(err error) bool { | ||
errStr := err.Error() | ||
return strings.Contains(errStr, "connection reset") || | ||
strings.Contains(errStr, "broken pipe") | ||
} | ||
|
||
// checkProtocols checks if the protocols are explicitly stopped by the client- treat as normal connection closure | ||
func (c *Connection) checkProtocols() bool { | ||
|
||
// Check chain-sync protocol | ||
if c.chainSync != nil && (!c.chainSync.Client.IsDone() || !c.chainSync.Server.IsDone()) { | ||
|
||
return false | ||
} | ||
|
||
// Check block-fetch protocol | ||
if c.blockFetch != nil && (!c.blockFetch.Client.IsDone() || !c.blockFetch.Server.IsDone()) { | ||
|
||
return false | ||
} | ||
|
||
// Check tx-submission protocol | ||
if c.txSubmission != nil && (!c.txSubmission.Client.IsDone() || !c.txSubmission.Server.IsDone()) { | ||
|
||
return false | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also add |
||
|
||
return true | ||
} | ||
|
||
// HandleConnectionError handles connection-level errors centrally | ||
func (c *Connection) HandleConnectionError(err error) error { | ||
|
||
if err == nil { | ||
return nil | ||
} | ||
|
||
if c.checkProtocols() { | ||
return nil | ||
} | ||
|
||
if errors.Is(err, io.EOF) || c.isConnectionReset(err) { | ||
return err | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block doesn't seem to be useful, since we unconditionally |
||
return err | ||
} | ||
|
||
// setupConnection establishes the muxer, configures and starts the handshake process, and initializes | ||
// the appropriate mini-protocols | ||
func (c *Connection) setupConnection() error { | ||
|
@@ -285,16 +329,20 @@ func (c *Connection) setupConnection() error { | |
if !ok { | ||
return | ||
} | ||
var connErr *muxer.ConnectionClosedError | ||
if errors.As(err, &connErr) { | ||
// Pass through ConnectionClosedError from muxer | ||
c.errorChan <- err | ||
} else { | ||
// Wrap error message to denote it comes from the muxer | ||
c.errorChan <- fmt.Errorf("muxer error: %w", err) | ||
|
||
// Use centralized connection error handling | ||
if handledErr := c.HandleConnectionError(err); handledErr != nil { | ||
var connErr *muxer.ConnectionClosedError | ||
if errors.As(handledErr, &connErr) { | ||
// Pass through ConnectionClosedError from muxer | ||
c.errorChan <- handledErr | ||
} else { | ||
// Wrap error message to denote it comes from the muxer | ||
c.errorChan <- fmt.Errorf("muxer error: %w", handledErr) | ||
} | ||
// Close connection on muxer errors | ||
c.Close() | ||
} | ||
// Close connection on muxer errors | ||
c.Close() | ||
} | ||
}() | ||
protoOptions := protocol.ProtocolOptions{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,12 @@ package ouroboros_test | |
|
||
import ( | ||
"fmt" | ||
"io" | ||
"testing" | ||
"time" | ||
|
||
ouroboros "github.com/blinklabs-io/gouroboros" | ||
"github.com/blinklabs-io/ouroboros-mock" | ||
ouroboros_mock "github.com/blinklabs-io/ouroboros-mock" | ||
"go.uber.org/goleak" | ||
) | ||
|
||
|
@@ -82,3 +83,193 @@ func TestDoubleClose(t *testing.T) { | |
t.Errorf("did not shutdown within timeout") | ||
} | ||
} | ||
|
||
// TestHandleConnectionError_ProtocolsDone tests that connection errors are ignored | ||
// when main protocols are explicitly stopped | ||
func TestHandleConnectionError_ProtocolsDone(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
// Test through HandleConnectionError - should return error when protocols are active | ||
testErr := io.EOF | ||
err = oConn.HandleConnectionError(testErr) | ||
if err != testErr { | ||
t.Fatalf("expected original error when protocols are active, got: %v", err) | ||
} | ||
|
||
|
||
oConn.Close() | ||
} | ||
|
||
// TestHandleConnectionError_ProtocolCompletion tests the specific requirement: | ||
// When main protocols are done, connection errors should be ignored | ||
func TestHandleConnectionError_ProtocolCompletion(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
// Test that the implementation correctly handles the requirement | ||
t.Log("Testing that connection errors are ignored when main protocols (chain-sync, block-fetch, tx-submission) are explicitly stopped by client") | ||
|
||
oConn.Close() | ||
} | ||
|
||
|
||
// TestHandleConnectionError_NilError tests that nil errors are handled correctly | ||
func TestHandleConnectionError_NilError(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
// Test that nil errors are handled correctly - should return nil | ||
err = oConn.HandleConnectionError(nil) | ||
if err != nil { | ||
t.Fatalf("expected nil error when input is nil, got: %s", err) | ||
} | ||
|
||
oConn.Close() | ||
} | ||
|
||
// TestHandleConnectionError_ConnectionReset tests connection reset error handling | ||
func TestHandleConnectionError_ConnectionReset(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
// Test connection reset error - should return the error when protocols are active | ||
resetErr := fmt.Errorf("connection reset by peer") | ||
err = oConn.HandleConnectionError(resetErr) | ||
if err != resetErr { | ||
t.Fatalf("expected connection reset error when protocols are active, got: %v", err) | ||
} | ||
|
||
oConn.Close() | ||
} | ||
|
||
// TestHandleConnectionError_EOF tests EOF error handling | ||
func TestHandleConnectionError_EOF(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
// Test EOF error - should return the error when protocols are active | ||
eofErr := io.EOF | ||
err = oConn.HandleConnectionError(eofErr) | ||
if err != eofErr { | ||
t.Fatalf("expected EOF error when protocols are active, got: %v", err) | ||
} | ||
|
||
oConn.Close() | ||
} | ||
|
||
// TestCentralizedErrorHandlingIntegration tests that error handling is centralized | ||
// in the Connection class rather than in individual protocols | ||
func TestCentralizedErrorHandlingIntegration(t *testing.T) { | ||
defer goleak.VerifyNone(t) | ||
|
||
mockConn := ouroboros_mock.NewConnection( | ||
ouroboros_mock.ProtocolRoleClient, | ||
[]ouroboros_mock.ConversationEntry{ | ||
ouroboros_mock.ConversationEntryHandshakeRequestGeneric, | ||
ouroboros_mock.ConversationEntryHandshakeNtNResponse, | ||
}, | ||
) | ||
|
||
oConn, err := ouroboros.New( | ||
ouroboros.WithConnection(mockConn), | ||
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic), | ||
ouroboros.WithNodeToNode(true), | ||
) | ||
if err != nil { | ||
t.Fatalf("unexpected error when creating Connection object: %s", err) | ||
} | ||
|
||
testErr := fmt.Errorf("test error") | ||
resultErr := oConn.HandleConnectionError(testErr) | ||
|
||
// Should return the original error when protocols are active | ||
if resultErr != testErr { | ||
t.Fatalf("expected test error to be returned, got: %v", resultErr) | ||
} | ||
|
||
// Verify that the method signature matches the expected behavior | ||
if oConn.HandleConnectionError(nil) != nil { | ||
t.Error("HandleConnectionError(nil) should return nil") | ||
} | ||
|
||
oConn.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check for error types rather than substring matches of the
Error()
output. Do we actually get any "connection closed" error that isn'tio.EOF
?