Skip to content

Commit 062c8bb

Browse files
authored
Add Done and Err methods to Conn (#348)
* Add Done and Err methods to Conn Signals when a connection has been closed and any associated error. * fix up changelog (bad merge) * add some more comments and an example * refine comments * fix type-o in changelog
1 parent fff23c3 commit 062c8bb

File tree

5 files changed

+163
-9
lines changed

5 files changed

+163
-9
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Release History
22

3+
## 1.3.0 (unreleased)
4+
5+
### Features Added
6+
7+
* Added methods `Done` and `Err` to `Conn`
8+
* `Done` returns a channel that's closed when `Conn` has closed.
9+
* `Err` explains why `Conn` was closed.
10+
311
## 1.2.0 (2024-09-30)
412

513
### Features Added
@@ -9,7 +17,7 @@
917

1018
### Bugs Fixed
1119

12-
* Fixed a rare race in `Conn.start` that could cause goroutines to be leaked if the provided context was canceld/expired.
20+
* Fixed a rare race in `Conn.start` that could cause goroutines to be leaked if the provided context was canceled/expired.
1321

1422
### Other Changes
1523

conn.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,13 @@ func (c *Conn) startImpl(ctx context.Context) error {
406406
}
407407

408408
// Close closes the connection.
409+
//
410+
// Returns nil if there were no errors during shutdown,
411+
// or a *ConnError. This error is not actionable and is
412+
// purely for diagnostic purposes.
413+
//
414+
// The error returned by subsequent calls to Close is
415+
// idempotent, so the same value will always be returned.
409416
func (c *Conn) Close() error {
410417
c.close()
411418

@@ -415,15 +422,32 @@ func (c *Conn) Close() error {
415422
<-c.txDone
416423
<-c.rxDone
417424

418-
var connErr *ConnError
419-
if errors.As(c.doneErr, &connErr) && connErr.RemoteErr == nil && connErr.inner == nil {
420-
// an empty ConnectionError means the connection was closed by the caller
425+
return c.closedErr()
426+
}
427+
428+
// Done returns a channel that's closed when Conn is closed.
429+
func (c *Conn) Done() <-chan struct{} {
430+
return c.done
431+
}
432+
433+
// If Done is not yet closed, Err returns nil.
434+
// If Done is closed, Err returns nil or a *ConnError explaining why.
435+
// A nil error indicates that [Close] was called and there
436+
// were no errors during shutdown.
437+
//
438+
// A *ConnError indicates one of three things
439+
// - there was an error during shutdown from a client-side call to [Close]. the
440+
// error is not actionable and is purely for diagnostic purposes.
441+
// - a fatal error was encountered that caused [Conn] to close
442+
// - the peer closed the connection. [ConnError.RemoteErr] MAY contain an error
443+
// from the peer indicating why it closed the connection
444+
func (c *Conn) Err() error {
445+
select {
446+
case <-c.done:
447+
return c.closedErr()
448+
default:
421449
return nil
422450
}
423-
424-
// there was an error during shut-down or connReader/connWriter
425-
// experienced a terminal error
426-
return c.doneErr
427451
}
428452

429453
// close is called once, either from Close() or when connReader/connWriter exits
@@ -471,6 +495,20 @@ func (c *Conn) closeDuringStart() {
471495
})
472496
}
473497

498+
// returns the error indicating why Conn has closed
499+
// NOTE: only call this AFTER Conn.done has been closed!
500+
func (c *Conn) closedErr() error {
501+
// an empty ConnError means the connection was closed by the caller
502+
var connErr *ConnError
503+
if errors.As(c.doneErr, &connErr) && connErr.RemoteErr == nil && connErr.inner == nil {
504+
return nil
505+
}
506+
507+
// there was an error during shut-down or connReader/connWriter
508+
// experienced a terminal error
509+
return c.doneErr
510+
}
511+
474512
// NewSession starts a new session on the connection.
475513
// - ctx controls waiting for the peer to acknowledge the session
476514
// - opts contains optional values, pass nil to accept the defaults

conn_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,13 @@ func TestClose(t *testing.T) {
302302
cancel()
303303
require.Nil(t, conn.Properties())
304304
require.NoError(t, conn.Close())
305+
select {
306+
case <-conn.Done():
307+
require.NoError(t, conn.Err())
308+
default:
309+
t.Fatal("expected conn.Done() to be signaled")
310+
}
311+
305312
// with Close error
306313
netConn = fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
307314
conn, err = newConn(netConn, nil)
@@ -315,6 +322,50 @@ func TestClose(t *testing.T) {
315322
// wait a bit for connReader to read from the mock
316323
time.Sleep(100 * time.Millisecond)
317324
require.Error(t, conn.Close())
325+
select {
326+
case <-conn.Done():
327+
require.Error(t, conn.Err())
328+
default:
329+
t.Fatal("expected conn.Done() to be signaled")
330+
}
331+
}
332+
333+
func TestCloseAsync(t *testing.T) {
334+
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
335+
conn, err := newConn(netConn, nil)
336+
require.NoError(t, err)
337+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
338+
require.NoError(t, conn.start(ctx))
339+
cancel()
340+
go func() {
341+
require.NoError(t, conn.Close())
342+
}()
343+
select {
344+
case <-conn.Done():
345+
require.NoError(t, conn.Err())
346+
case <-time.After(1 * time.Second):
347+
t.Fatal("expected conn.Done() to be signaled")
348+
}
349+
350+
// with Close error
351+
netConn = fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
352+
conn, err = newConn(netConn, nil)
353+
require.NoError(t, err)
354+
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
355+
require.NoError(t, conn.start(ctx))
356+
cancel()
357+
netConn.OnClose = func() error {
358+
return errors.New("mock close failed")
359+
}
360+
go func() {
361+
require.Error(t, conn.Close())
362+
}()
363+
select {
364+
case <-conn.Done():
365+
require.Error(t, conn.Err())
366+
case <-time.After(1 * time.Second):
367+
t.Fatal("expected conn.Done() to be signaled")
368+
}
318369
}
319370

320371
func TestServerSideClose(t *testing.T) {
@@ -344,6 +395,12 @@ func TestServerSideClose(t *testing.T) {
344395
<-closeReceived
345396
err = conn.Close()
346397
require.NoError(t, err)
398+
select {
399+
case <-conn.Done():
400+
require.NoError(t, conn.Err())
401+
default:
402+
t.Fatal("expected conn.Done() to be signaled")
403+
}
347404

348405
// with error
349406
closeReceived = make(chan struct{})
@@ -361,6 +418,16 @@ func TestServerSideClose(t *testing.T) {
361418
var connErr *ConnError
362419
require.ErrorAs(t, err, &connErr)
363420
require.Equal(t, "*Error{Condition: Close, Description: mock server error, Info: map[]}", connErr.Error())
421+
select {
422+
case <-conn.Done():
423+
connErr = nil
424+
require.ErrorAs(t, conn.Err(), &connErr)
425+
require.Equal(t, "*Error{Condition: Close, Description: mock server error, Info: map[]}", connErr.Error())
426+
default:
427+
t.Fatal("expected conn.Done() to be signaled")
428+
}
429+
require.ErrorAs(t, conn.Err(), &connErr)
430+
require.Equal(t, "*Error{Condition: Close, Description: mock server error, Info: map[]}", connErr.Error())
364431
}
365432

366433
func TestKeepAlives(t *testing.T) {
@@ -592,7 +659,9 @@ func TestClientClose(t *testing.T) {
592659
require.NoError(t, err)
593660
require.NotNil(t, client)
594661
require.NoError(t, client.Close())
662+
require.NoError(t, client.Err())
595663
require.NoError(t, client.Close())
664+
require.NoError(t, client.Err())
596665
}
597666

598667
func TestSessionOptions(t *testing.T) {

errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type ConnError struct {
8383
inner error
8484
}
8585

86-
// Error implements the error interface for ConnectionError.
86+
// Error implements the error interface for ConnError.
8787
func (e *ConnError) Error() string {
8888
if e.RemoteErr == nil && e.inner == nil {
8989
return "amqp: connection closed"

example_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,42 @@ func ExampleLinkError() {
218218
log.Fatalf("unexpected error type %T", err)
219219
}
220220
}
221+
222+
func ExampleConn_Done() {
223+
ctx := context.TODO()
224+
225+
// create connection
226+
conn, err := amqp.Dial(ctx, "amqps://my-namespace.servicebus.windows.net", &amqp.ConnOptions{
227+
SASLType: amqp.SASLTypePlain("access-key-name", "access-key"),
228+
})
229+
if err != nil {
230+
log.Fatal("Dialing AMQP server:", err)
231+
}
232+
233+
// when the channel returned by Done is closed, conn has been closed
234+
<-conn.Done()
235+
236+
// Err indicates why the connection was closed. a nil error indicates
237+
// a client-side call to Close and there were no errors during shutdown.
238+
closedErr := conn.Err()
239+
240+
// when Err returns a non-nil error, it means that either a client-side
241+
// call to Close encountered an error during shutdown, a fatal error was
242+
// encountered that caused the connection to close, or that the peer
243+
// closed the connection.
244+
if closedErr != nil {
245+
// the error returned by Err is always a *ConnError
246+
var connErr *amqp.ConnError
247+
errors.As(closedErr, &connErr)
248+
249+
if connErr.RemoteErr != nil {
250+
// the peer closed the connection and provided an error explaining why.
251+
// note that the peer MAY send an error when closing the connection but
252+
// is not required to.
253+
} else {
254+
// the connection encountered a fatal error or there was
255+
// an error during client-side shutdown. this is for
256+
// diagnostics, the connection has been closed.
257+
}
258+
}
259+
}

0 commit comments

Comments
 (0)