Skip to content

Commit e05133a

Browse files
authored
Cancel consumer before closing channel (#70)
* Cancel consumer before closing channel * consumer cancel: fix tests * consumer cancel: fix linter
1 parent 5826978 commit e05133a

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

consumer/consumer.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package consumer
33
import (
44
"context"
55
"fmt"
6+
"os"
7+
"strconv"
68
"strings"
79
"sync"
10+
"sync/atomic"
811
"time"
912

1013
"github.com/makasim/amqpextra/logger"
@@ -52,6 +55,7 @@ type AMQPChannel interface {
5255
NotifyCancel(c chan string) chan string
5356
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
5457
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
58+
Cancel(consumer string, noWait bool) error
5559
Close() error
5660
}
5761

@@ -149,6 +153,10 @@ func New(
149153
return nil, fmt.Errorf("handler must be not nil")
150154
}
151155

156+
if c.consumer == `` {
157+
c.consumer = uniqueConsumerTag()
158+
}
159+
152160
if c.queue == "" && c.exchange == "" && !c.queueDeclare {
153161
return nil, fmt.Errorf("WithQueue or WithExchange or WithDeclareQueue or WithTmpQueue options must be set")
154162
}
@@ -522,8 +530,33 @@ func (c *Consumer) notifyUnready(err error) State {
522530

523531
func (c *Consumer) close(ch AMQPChannel) {
524532
if ch != nil {
533+
if err := ch.Cancel(c.consumer, false); err != nil {
534+
c.logger.Printf("[WARN] channel cancel: %s", err)
535+
}
525536
if err := ch.Close(); err != nil && !strings.Contains(err.Error(), "channel/connection is not open") {
526537
c.logger.Printf("[WARN] channel close: %s", err)
527538
}
528539
}
529540
}
541+
542+
// COPY AND PASTE from amqp091 library
543+
544+
var consumerSeq uint64
545+
546+
const consumerTagLengthMax = 0xFF // see writeShortstr
547+
548+
func uniqueConsumerTag() string {
549+
return commandNameBasedUniqueConsumerTag(os.Args[0])
550+
}
551+
552+
func commandNameBasedUniqueConsumerTag(commandName string) string {
553+
tagPrefix := "ctag-"
554+
tagInfix := commandName
555+
tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10)
556+
557+
if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax {
558+
tagInfix = "streadway/amqp"
559+
}
560+
561+
return tagPrefix + tagInfix + tagSuffix
562+
}

consumer/consumer_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func TestNotify(main *testing.T) {
104104
ch.EXPECT().
105105
NotifyClose(any()).
106106
AnyTimes()
107+
ch.EXPECT().Cancel(any(), false).AnyTimes()
107108
ch.EXPECT().Close().AnyTimes()
108109

109110
connCh := make(chan *consumer.Connection, 1)
@@ -162,6 +163,7 @@ func TestNotify(main *testing.T) {
162163
ch.EXPECT().
163164
NotifyClose(any()).
164165
AnyTimes()
166+
ch.EXPECT().Cancel(any(), false).AnyTimes()
165167
ch.EXPECT().Close().AnyTimes()
166168

167169
connCh := make(chan *consumer.Connection, 1)
@@ -211,7 +213,7 @@ func TestNotify(main *testing.T) {
211213
DeclareArgs: amqp.Table{
212214
"foo": "fooVal",
213215
},
214-
216+
215217
Consumer: "theConsumer",
216218
AutoAck: true,
217219
Exclusive: true,
@@ -683,6 +685,7 @@ func TestConsume(main *testing.T) {
683685
Return(chCloseCh).Times(1)
684686
ch.EXPECT().NotifyCancel(any()).
685687
Return(cancelCh).Times(1)
688+
ch.EXPECT().Cancel(any(), false).Times(1)
686689
ch.EXPECT().Close().Times(1)
687690
ch.EXPECT().Qos(any(), any(), any()).
688691
Times(1)
@@ -731,12 +734,13 @@ func TestConsume(main *testing.T) {
731734
msgCh := make(chan amqp.Delivery)
732735

733736
ch := mock_consumer.NewMockAMQPChannel(ctrl)
734-
ch.EXPECT().Consume("theQueue", "", false, false, false, false, amqp.Table(nil)).
737+
ch.EXPECT().Consume("theQueue", &consumerTagMatcher{t: t}, false, false, false, false, amqp.Table(nil)).
735738
Return(msgCh, nil).Times(1)
736739
ch.EXPECT().NotifyClose(any()).
737740
Return(chCloseCh).Times(1)
738741
ch.EXPECT().NotifyCancel(any()).
739742
Return(cancelCh).Times(1)
743+
ch.EXPECT().Cancel(any(), false).Times(1)
740744
ch.EXPECT().Close().Times(1)
741745
ch.EXPECT().Qos(any(), any(), any()).
742746
Times(1)
@@ -792,6 +796,7 @@ func TestConsume(main *testing.T) {
792796
Return(chCloseCh).Times(1)
793797
ch.EXPECT().NotifyCancel(any()).
794798
Return(cancelCh).Times(1)
799+
ch.EXPECT().Cancel(`theConsumer`, false).Times(1)
795800
ch.EXPECT().Close().Times(1)
796801
ch.EXPECT().Qos(any(), any(), any()).
797802
Times(1)
@@ -847,6 +852,7 @@ func TestConsume(main *testing.T) {
847852
Return(chCloseCh).Times(1)
848853
ch.EXPECT().NotifyCancel(any()).
849854
Return(cancelCh).Times(1)
855+
ch.EXPECT().Cancel(any(), false).Times(1)
850856
ch.EXPECT().Close().Times(1)
851857
ch.EXPECT().Qos(any(), any(), any()).
852858
Times(1)
@@ -905,6 +911,7 @@ func TestConsume(main *testing.T) {
905911
Return(chCloseCh).Times(1)
906912
ch.EXPECT().NotifyCancel(any()).
907913
Return(cancelCh).Times(1)
914+
ch.EXPECT().Cancel(any(), false).Times(1)
908915
ch.EXPECT().Close().Return(nil).Times(1)
909916

910917
newChCloseCh := make(chan *amqp.Error)
@@ -920,6 +927,7 @@ func TestConsume(main *testing.T) {
920927
Return(newChCloseCh).Times(1)
921928
newCh.EXPECT().NotifyCancel(any()).
922929
Return(newCancelCh).Times(1)
930+
newCh.EXPECT().Cancel(any(), false).Times(1)
923931
newCh.EXPECT().Close().Times(1)
924932

925933
conn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -984,6 +992,7 @@ func TestConsume(main *testing.T) {
984992
Return(chCloseCh).Times(1)
985993
ch.EXPECT().NotifyCancel(any()).
986994
Return(cancelCh).Times(1)
995+
ch.EXPECT().Cancel(any(), false).Times(1)
987996
ch.EXPECT().Close().Return(fmt.Errorf("the error")).Times(1)
988997
ch.EXPECT().Qos(any(), any(), any()).
989998
Times(1)
@@ -1037,6 +1046,7 @@ func TestConsume(main *testing.T) {
10371046
Return(chCloseCh).Times(1)
10381047
ch.EXPECT().NotifyCancel(any()).
10391048
Return(cancelCh).Times(1)
1049+
ch.EXPECT().Cancel(any(), false).Times(1)
10401050
ch.EXPECT().Close().Return(amqp.ErrClosed).Times(1)
10411051
ch.EXPECT().Qos(any(), any(), any()).
10421052
Times(1)
@@ -1092,6 +1102,7 @@ func TestConsume(main *testing.T) {
10921102
Return(chCloseCh).Times(1)
10931103
ch.EXPECT().NotifyCancel(any()).
10941104
Return(cancelCh).Times(1)
1105+
ch.EXPECT().Cancel(any(), false).Times(1)
10951106
ch.EXPECT().Close().Return(nil).Times(1)
10961107

10971108
conn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1113,6 +1124,7 @@ func TestConsume(main *testing.T) {
11131124
Return(newChCloseCh).Times(1)
11141125
newCh.EXPECT().NotifyCancel(any()).
11151126
Return(newCancelCh).Times(1)
1127+
newCh.EXPECT().Cancel(any(), false).Times(1)
11161128
newCh.EXPECT().Close().Times(1)
11171129

11181130
c, err := consumer.New(
@@ -1176,6 +1188,7 @@ func TestConsume(main *testing.T) {
11761188
Return(chCloseCh).Times(1)
11771189
ch.EXPECT().NotifyCancel(any()).
11781190
Return(cancelCh).Times(1)
1191+
ch.EXPECT().Cancel(any(), false).Times(1)
11791192
ch.EXPECT().Close().Return(nil).Times(1)
11801193

11811194
conn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1241,6 +1254,7 @@ func TestConcurrency(main *testing.T) {
12411254
Return(chCloseCh).Times(1)
12421255
ch.EXPECT().NotifyCancel(any()).
12431256
Return(cancelCh).Times(1)
1257+
ch.EXPECT().Cancel(any(), false).Times(1)
12441258
ch.EXPECT().Close().Return(nil).Times(1)
12451259

12461260
amqpConn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1256,6 +1270,7 @@ func TestConcurrency(main *testing.T) {
12561270
Return(newChCloseCh).Times(1)
12571271
newCh.EXPECT().NotifyCancel(any()).
12581272
Return(newCancelCh).Times(1)
1273+
newCh.EXPECT().Cancel(any(), false).Times(1)
12591274
newCh.EXPECT().Close().Return(nil).Times(1)
12601275

12611276
newConn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1333,6 +1348,7 @@ func TestConcurrency(main *testing.T) {
13331348
Return(chCloseCh).Times(1)
13341349
ch.EXPECT().NotifyCancel(any()).
13351350
Return(cancelCh).Times(1)
1351+
ch.EXPECT().Cancel(any(), false).Times(1)
13361352
ch.EXPECT().Close().Return(amqp.ErrClosed).Times(1)
13371353

13381354
conn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1409,6 +1425,7 @@ func TestConcurrency(main *testing.T) {
14091425
Return(chCloseCh).Times(1)
14101426
ch.EXPECT().NotifyCancel(any()).
14111427
Return(cancelCh).Times(1)
1428+
ch.EXPECT().Cancel(any(), false).Times(1)
14121429
ch.EXPECT().Close().Return(nil).Times(1)
14131430

14141431
newChCloseCh := make(chan *amqp.Error)
@@ -1422,6 +1439,7 @@ func TestConcurrency(main *testing.T) {
14221439
Return(newChCloseCh).Times(1)
14231440
newCh.EXPECT().NotifyCancel(any()).
14241441
Return(newCancelCh).Times(1)
1442+
newCh.EXPECT().Cancel(any(), false).Times(1)
14251443
newCh.EXPECT().Close().Return(nil).Times(1)
14261444

14271445
conn := mock_consumer.NewMockAMQPConnection(ctrl)
@@ -1501,6 +1519,9 @@ func TestOptions(main *testing.T) {
15011519
ch.EXPECT().
15021520
NotifyCancel(any()).
15031521
AnyTimes()
1522+
ch.EXPECT().
1523+
Cancel(any(), false).
1524+
AnyTimes()
15041525
ch.EXPECT().
15051526
Close().
15061527
AnyTimes()
@@ -1565,6 +1586,9 @@ func TestOptions(main *testing.T) {
15651586
ch.EXPECT().
15661587
NotifyCancel(any()).
15671588
AnyTimes()
1589+
ch.EXPECT().
1590+
Cancel(any(), false).
1591+
AnyTimes()
15681592
ch.EXPECT().
15691593
Close().
15701594
AnyTimes()
@@ -1624,6 +1648,9 @@ func TestOptions(main *testing.T) {
16241648
ch.EXPECT().
16251649
NotifyCancel(any()).
16261650
AnyTimes()
1651+
ch.EXPECT().
1652+
Cancel(any(), false).
1653+
AnyTimes()
16271654
ch.EXPECT().
16281655
Close().
16291656
AnyTimes()
@@ -1682,6 +1709,9 @@ func TestOptions(main *testing.T) {
16821709
ch.EXPECT().
16831710
NotifyCancel(any()).
16841711
AnyTimes()
1712+
ch.EXPECT().
1713+
Cancel(any(), false).
1714+
AnyTimes()
16851715
ch.EXPECT().
16861716
Close().
16871717
AnyTimes()
@@ -1740,6 +1770,9 @@ func TestOptions(main *testing.T) {
17401770
ch.EXPECT().
17411771
NotifyCancel(any()).
17421772
AnyTimes()
1773+
ch.EXPECT().
1774+
Cancel(any(), false).
1775+
AnyTimes()
17431776
ch.EXPECT().
17441777
Close().
17451778
AnyTimes()
@@ -1840,6 +1873,9 @@ func TestOptions(main *testing.T) {
18401873
ch.EXPECT().
18411874
NotifyClose(any()).
18421875
AnyTimes()
1876+
ch.EXPECT().
1877+
Cancel(any(), false).
1878+
AnyTimes()
18431879
ch.EXPECT().
18441880
Close().
18451881
AnyTimes()
@@ -1891,6 +1927,9 @@ func TestOptions(main *testing.T) {
18911927
ch.EXPECT().
18921928
NotifyClose(any()).
18931929
AnyTimes()
1930+
ch.EXPECT().
1931+
Cancel(any(), false).
1932+
AnyTimes()
18941933
ch.EXPECT().
18951934
Close().
18961935
AnyTimes()
@@ -1950,6 +1989,9 @@ func TestOptions(main *testing.T) {
19501989
ch.EXPECT().
19511990
NotifyClose(any()).
19521991
AnyTimes()
1992+
ch.EXPECT().
1993+
Cancel(any(), false).
1994+
AnyTimes()
19531995
ch.EXPECT().
19541996
Close().
19551997
AnyTimes()
@@ -2001,6 +2043,9 @@ func TestOptions(main *testing.T) {
20012043
ch.EXPECT().
20022044
NotifyClose(any()).
20032045
AnyTimes()
2046+
ch.EXPECT().
2047+
Cancel(any(), false).
2048+
AnyTimes()
20042049
ch.EXPECT().
20052050
Close().
20062051
AnyTimes()
@@ -2056,6 +2101,9 @@ func TestOptions(main *testing.T) {
20562101
ch.EXPECT().
20572102
NotifyClose(any()).
20582103
AnyTimes()
2104+
ch.EXPECT().
2105+
Cancel(any(), false).
2106+
AnyTimes()
20592107
ch.EXPECT().
20602108
Close().
20612109
AnyTimes()
@@ -2165,3 +2213,15 @@ func initFuncStub(chs ...consumer.AMQPChannel) func(consumer.AMQPConnection) (co
21652213
return currCh, nil
21662214
}
21672215
}
2216+
2217+
type consumerTagMatcher struct {
2218+
t *testing.T
2219+
}
2220+
2221+
func (m *consumerTagMatcher) Matches(x interface{}) bool {
2222+
return assert.IsType(m.t, `string`, x) && assert.NotEmpty(m.t, x)
2223+
}
2224+
2225+
func (*consumerTagMatcher) String() string {
2226+
return `consumer tag must be a non-empty string`
2227+
}

consumer/mock_consumer/mocks.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)