Skip to content

Commit 70f3084

Browse files
Consume from exchange e2e (#47)
* add e2e test for consume from exchange logic
1 parent 2d12a8e commit 70f3084

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

e2e_test/consumer_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,90 @@ waitOpened:
103103
time.Sleep(time.Millisecond * 100)
104104
}
105105

106+
func TestConsumerWithExchange(t *testing.T) {
107+
amqpConn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/amqpextra")
108+
require.NoError(t, err)
109+
defer amqpConn.Close()
110+
111+
ch, err := amqpConn.Channel()
112+
require.NoError(t, err)
113+
defer ch.Close()
114+
115+
rnum, err := rand.Int(rand.Reader, big.NewInt(10000000))
116+
require.NoError(t, err)
117+
exchangeName := fmt.Sprintf("exchange%d%d", time.Now().UnixNano(), rnum)
118+
119+
dialerReadyCh := make(chan struct{}, 1)
120+
dialerUnreadyCh := make(chan error, 1)
121+
122+
dialer, err := amqpextra.NewDialer(
123+
amqpextra.WithNotify(dialerReadyCh, dialerUnreadyCh),
124+
amqpextra.WithURL("amqp://guest:guest@rabbitmq:5672/amqpextra"),
125+
)
126+
require.NoError(t, err)
127+
defer dialer.Close()
128+
129+
assertConsumerReady(t, dialerReadyCh)
130+
131+
gotMsg := make(chan struct{})
132+
h := consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} {
133+
close(gotMsg)
134+
return nil
135+
})
136+
137+
err = ch.ExchangeDeclare(
138+
exchangeName,
139+
amqp.ExchangeFanout,
140+
false,
141+
true,
142+
false,
143+
false,
144+
nil,
145+
)
146+
require.NoError(t, err)
147+
readyCh := make(chan consumer.Ready, 1)
148+
unreadyCh := make(chan error, 1)
149+
c, err := dialer.Consumer(
150+
consumer.WithNotify(readyCh, unreadyCh),
151+
consumer.WithExchange(exchangeName, ""),
152+
consumer.WithHandler(h),
153+
)
154+
require.NoError(t, err)
155+
defer c.Close()
156+
157+
assertConsumerReadyQueue(t, readyCh)
158+
159+
err = ch.Publish(exchangeName,
160+
"",
161+
false,
162+
false,
163+
amqp.Publishing{Body: []byte("aMessage")},
164+
)
165+
require.NoError(t, err)
166+
timer := time.NewTimer(time.Second)
167+
168+
select {
169+
case <-timer.C:
170+
t.Fatal("message must be received")
171+
case <-gotMsg:
172+
}
173+
174+
c.Close()
175+
<-c.NotifyClosed()
176+
dialer.Close()
177+
}
178+
179+
func assertConsumerReadyQueue(t *testing.T, readyCh chan consumer.Ready) {
180+
timer := time.NewTimer(time.Millisecond * 2000)
181+
defer timer.Stop()
182+
183+
select {
184+
case <-readyCh:
185+
case <-timer.C:
186+
t.Fatal("consumer must be ready")
187+
}
188+
}
189+
106190
func assertConsumerReady(t *testing.T, readyCh chan struct{}) {
107191
timer := time.NewTimer(time.Millisecond * 2000)
108192
defer timer.Stop()

0 commit comments

Comments
 (0)