Skip to content

Commit 8582edf

Browse files
committed
Fix issue when channel/connection is closed before NotifyClose
This fixes an issue where the connection or channel would be closed before NotifyClose is setup. That can cause the monitorAndWait to block even when a connection or channel is actually closed. This happens because the consumers or publishers can close the channel due to an error inside their respective goroutines. This fix also avoids starting and leaking a bunch of goroutines inside monitorAndWait.
1 parent d5524a5 commit 8582edf

File tree

3 files changed

+52
-26
lines changed

3 files changed

+52
-26
lines changed

client.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,14 @@ func (c *Client) runOnce() error {
360360
onStarted(inputConn, outputConn, inputCh, outputCh)
361361
}
362362

363+
// Start listening on NotifyClose before we properly begin so that we avoid
364+
// race when the consumer or publisher starts work before we call
365+
// monitorAndWait. All have a buffer of 1 as recommended by amqp-go.
366+
notifyInputConnClose := inputConn.NotifyClose(make(chan *amqp.Error, 1))
367+
notifyOutputConnClose := outputConn.NotifyClose(make(chan *amqp.Error, 1))
368+
notifyInputChClose := inputCh.NotifyClose(make(chan *amqp.Error, 1))
369+
notifyOutputChClose := outputCh.NotifyClose(make(chan *amqp.Error, 1))
370+
363371
err = c.runRepliesConsumer(inputCh)
364372
if err != nil {
365373
return err
@@ -386,10 +394,10 @@ func (c *Client) runOnce() error {
386394
_, err = monitorAndWait(
387395
make(chan struct{}),
388396
c.stopChan,
389-
inputConn.NotifyClose(make(chan *amqp.Error)),
390-
outputConn.NotifyClose(make(chan *amqp.Error)),
391-
inputCh.NotifyClose(make(chan *amqp.Error)),
392-
outputCh.NotifyClose(make(chan *amqp.Error)),
397+
notifyInputConnClose,
398+
notifyOutputConnClose,
399+
notifyInputChClose,
400+
notifyOutputChClose,
393401
)
394402
if err != nil {
395403
return err

connection.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,39 @@ var ErrUnexpectedConnClosed = errors.New("unexpected connection close without sp
1919
// NotifyPublish etc.
2020
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
2121

22-
func monitorAndWait(restartChan, stopChan chan struct{}, amqpErrs ...chan *amqp.Error) (bool, error) {
23-
result := make(chan error, len(amqpErrs))
24-
25-
// Setup monitoring for connections and channels, can be several connections and several channels.
26-
// The first one closed will yield the error.
27-
for _, errCh := range amqpErrs {
28-
go func(c chan *amqp.Error) {
29-
err, ok := <-c
30-
if !ok {
31-
result <- ErrUnexpectedConnClosed
32-
return
33-
}
34-
result <- err
35-
}(errCh)
36-
}
37-
22+
func monitorAndWait(
23+
restartChan,
24+
stopChan chan struct{},
25+
inputConnClose,
26+
outputConnClose,
27+
inputChClose,
28+
outputChClose chan *amqp.Error,
29+
) (bool, error) {
3830
select {
39-
case err := <-result:
40-
return true, err
4131
case <-restartChan:
4232
return true, nil
4333
case <-stopChan:
4434
return false, nil
35+
case err, ok := <-inputConnClose:
36+
if !ok {
37+
return false, ErrUnexpectedConnClosed
38+
}
39+
return false, err
40+
case err, ok := <-outputConnClose:
41+
if !ok {
42+
return false, ErrUnexpectedConnClosed
43+
}
44+
return false, err
45+
case err, ok := <-inputChClose:
46+
if !ok {
47+
return false, ErrUnexpectedConnClosed
48+
}
49+
return false, err
50+
case err, ok := <-outputChClose:
51+
if !ok {
52+
return false, ErrUnexpectedConnClosed
53+
}
54+
return false, err
4555
}
4656
}
4757

server.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,14 @@ func (s *Server) listenAndServe() (bool, error) {
274274
// Notify everyone that the server has started.
275275
s.notifyStarted(inputConn, outputConn, inputCh, outputCh)
276276

277+
// Start listening on NotifyClose before we properly begin so that we avoid
278+
// race when the consumer or publisher starts work before we call
279+
// monitorAndWait. All have a buffer of 1 as recommended by amqp-go.
280+
notifyInputConnClose := inputConn.NotifyClose(make(chan *amqp.Error, 1))
281+
notifyOutputConnClose := outputConn.NotifyClose(make(chan *amqp.Error, 1))
282+
notifyInputChClose := inputCh.NotifyClose(make(chan *amqp.Error, 1))
283+
notifyOutputChClose := outputCh.NotifyClose(make(chan *amqp.Error, 1))
284+
277285
// Create any exchanges that must be declared on startup.
278286
err = createExchanges(inputCh, s.exchanges)
279287
if err != nil {
@@ -302,10 +310,10 @@ func (s *Server) listenAndServe() (bool, error) {
302310
shouldRestart, err := monitorAndWait(
303311
s.restartChan,
304312
s.stopChan,
305-
inputConn.NotifyClose(make(chan *amqp.Error)),
306-
outputConn.NotifyClose(make(chan *amqp.Error)),
307-
inputCh.NotifyClose(make(chan *amqp.Error)),
308-
outputCh.NotifyClose(make(chan *amqp.Error)),
313+
notifyInputConnClose,
314+
notifyOutputConnClose,
315+
notifyInputChClose,
316+
notifyOutputChClose,
309317
)
310318
if err != nil {
311319
return shouldRestart, err

0 commit comments

Comments
 (0)