Skip to content

Commit 771249c

Browse files
committed
Fix test assertions running outside subject nodes
Assertions should be made only inside subject nodes i.e. It(), otherwise, a failed assertion panics and the test code does not recover correctly.
1 parent c82637d commit 771249c

File tree

1 file changed

+92
-86
lines changed

1 file changed

+92
-86
lines changed

pkg/rabbitmqamqp/amqp_consumer_stream_test.go

Lines changed: 92 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rabbitmqamqp
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
"github.com/Azure/go-amqp"
@@ -320,33 +319,43 @@ var _ = Describe("Consumer stream test", func() {
320319
})
321320

322321
Describe("consumer should filter messages based on application properties", func() {
323-
qName := generateName("consumer should filter messages based on application properties")
324-
connection, err := Dial(context.Background(), "amqp://", nil)
325-
Expect(err).To(BeNil())
326-
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
327-
Name: qName,
328-
})
329-
Expect(err).To(BeNil())
330-
Expect(queueInfo).NotTo(BeNil())
322+
var (
323+
qName string
324+
connection *AmqpConnection
325+
)
326+
BeforeEach(func() {
327+
qName = generateName("consumer should filter messages based on application properties")
328+
var err error
329+
connection, err = Dial(context.Background(), "amqp://", nil)
330+
Expect(err).ToNot(HaveOccurred())
331+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
332+
Name: qName,
333+
})
334+
Expect(err).ToNot(HaveOccurred())
335+
Expect(queueInfo).NotTo(BeNil())
331336

332-
publishMessagesWithMessageLogic(qName, "ignoredKey", 7, func(msg *amqp.Message) {
333-
msg.ApplicationProperties = map[string]interface{}{"ignoredKey": "ignoredValue"}
334-
})
337+
publishMessagesWithMessageLogic(qName, "ignoredKey", 7, func(msg *amqp.Message) {
338+
msg.ApplicationProperties = map[string]interface{}{"ignoredKey": "ignoredValue"}
339+
})
335340

336-
publishMessagesWithMessageLogic(qName, "key1", 10, func(msg *amqp.Message) {
337-
msg.ApplicationProperties = map[string]interface{}{"key1": "value1", "constFilterKey": "constFilterValue"}
338-
})
341+
publishMessagesWithMessageLogic(qName, "key1", 10, func(msg *amqp.Message) {
342+
msg.ApplicationProperties = map[string]interface{}{"key1": "value1", "constFilterKey": "constFilterValue"}
343+
})
344+
345+
publishMessagesWithMessageLogic(qName, "key2", 10, func(msg *amqp.Message) {
346+
msg.ApplicationProperties = map[string]interface{}{"key2": "value2", "constFilterKey": "constFilterValue"}
347+
})
339348

340-
publishMessagesWithMessageLogic(qName, "key2", 10, func(msg *amqp.Message) {
341-
msg.ApplicationProperties = map[string]interface{}{"key2": "value2", "constFilterKey": "constFilterValue"}
349+
publishMessagesWithMessageLogic(qName, "key3", 10, func(msg *amqp.Message) {
350+
msg.ApplicationProperties = map[string]interface{}{"key3": "value3", "constFilterKey": "constFilterValue"}
351+
})
342352
})
343353

344-
publishMessagesWithMessageLogic(qName, "key3", 10, func(msg *amqp.Message) {
345-
msg.ApplicationProperties = map[string]interface{}{"key3": "value3", "constFilterKey": "constFilterValue"}
354+
AfterEach(func() {
355+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(Succeed())
356+
Expect(connection.Close(context.Background())).To(Succeed())
346357
})
347358

348-
var wg sync.WaitGroup
349-
wg.Add(3)
350359
DescribeTable("consumer should filter messages based on application properties", func(key string, value any, label string) {
351360

352361
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
@@ -375,93 +384,96 @@ var _ = Describe("Consumer stream test", func() {
375384
Expect(dc.Accept(context.Background())).To(BeNil())
376385
}
377386
Expect(consumer.Close(context.Background())).To(BeNil())
378-
wg.Done()
379387
},
380388
Entry("key1 value1", "key1", "value1", "key1"),
381389
Entry("key2 value2", "key2", "value2", "key2"),
382390
Entry("key3 value3", "key3", "value3", "key3"),
383391
)
384-
go func() {
385-
wg.Wait()
386-
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
387-
Expect(connection.Close(context.Background())).To(BeNil())
388-
}()
389-
390392
})
391393

392394
Describe("consumer should filter messages based on properties", func() {
393395
/*
394396
Test the consumer should filter messages based on properties
395397
*/
396-
// TODO: defer cleanup to delete the stream queue
397-
qName := generateName("consumer should filter messages based on properties")
398-
qName += time.Now().String()
399-
connection, err := Dial(context.Background(), "amqp://", nil)
400-
Expect(err).To(BeNil())
401-
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
402-
Name: qName,
403-
})
404-
Expect(err).To(BeNil())
405-
Expect(queueInfo).NotTo(BeNil())
398+
var (
399+
qName string
400+
connection *AmqpConnection
401+
)
406402

407-
publishMessagesWithMessageLogic(qName, "MessageID", 10, func(msg *amqp.Message) {
408-
msg.Properties = &amqp.MessageProperties{MessageID: "MessageID"}
409-
})
403+
BeforeEach(func() {
404+
qName = generateName("consumer should filter messages based on properties")
405+
qName += time.Now().String()
406+
var err error
407+
connection, err = Dial(context.Background(), "amqp://", nil)
408+
Expect(err).ToNot(HaveOccurred())
409+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
410+
Name: qName,
411+
})
412+
Expect(err).ToNot(HaveOccurred())
413+
Expect(queueInfo).NotTo(BeNil())
410414

411-
publishMessagesWithMessageLogic(qName, "Subject", 10, func(msg *amqp.Message) {
412-
msg.Properties = &amqp.MessageProperties{Subject: ptr("Subject")}
413-
})
415+
publishMessagesWithMessageLogic(qName, "MessageID", 10, func(msg *amqp.Message) {
416+
msg.Properties = &amqp.MessageProperties{MessageID: "MessageID"}
417+
})
414418

415-
publishMessagesWithMessageLogic(qName, "ReplyTo", 10, func(msg *amqp.Message) {
416-
msg.Properties = &amqp.MessageProperties{ReplyTo: ptr("ReplyTo")}
417-
})
419+
publishMessagesWithMessageLogic(qName, "Subject", 10, func(msg *amqp.Message) {
420+
msg.Properties = &amqp.MessageProperties{Subject: ptr("Subject")}
421+
})
418422

419-
publishMessagesWithMessageLogic(qName, "ContentType", 10, func(msg *amqp.Message) {
420-
msg.Properties = &amqp.MessageProperties{ContentType: ptr("ContentType")}
421-
})
423+
publishMessagesWithMessageLogic(qName, "ReplyTo", 10, func(msg *amqp.Message) {
424+
msg.Properties = &amqp.MessageProperties{ReplyTo: ptr("ReplyTo")}
425+
})
422426

423-
publishMessagesWithMessageLogic(qName, "ContentEncoding", 10, func(msg *amqp.Message) {
424-
msg.Properties = &amqp.MessageProperties{ContentEncoding: ptr("ContentEncoding")}
425-
})
427+
publishMessagesWithMessageLogic(qName, "ContentType", 10, func(msg *amqp.Message) {
428+
msg.Properties = &amqp.MessageProperties{ContentType: ptr("ContentType")}
429+
})
426430

427-
publishMessagesWithMessageLogic(qName, "GroupID", 10, func(msg *amqp.Message) {
428-
msg.Properties = &amqp.MessageProperties{GroupID: ptr("GroupID")}
429-
})
431+
publishMessagesWithMessageLogic(qName, "ContentEncoding", 10, func(msg *amqp.Message) {
432+
msg.Properties = &amqp.MessageProperties{ContentEncoding: ptr("ContentEncoding")}
433+
})
430434

431-
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
432-
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
433-
})
435+
publishMessagesWithMessageLogic(qName, "GroupID", 10, func(msg *amqp.Message) {
436+
msg.Properties = &amqp.MessageProperties{GroupID: ptr("GroupID")}
437+
})
434438

435-
// GroupSequence
436-
publishMessagesWithMessageLogic(qName, "GroupSequence", 10, func(msg *amqp.Message) {
437-
msg.Properties = &amqp.MessageProperties{GroupSequence: ptr(uint32(137))}
438-
})
439+
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
440+
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
441+
})
439442

440-
// ReplyToGroupID
441-
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
442-
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
443-
})
443+
// GroupSequence
444+
publishMessagesWithMessageLogic(qName, "GroupSequence", 10, func(msg *amqp.Message) {
445+
msg.Properties = &amqp.MessageProperties{GroupSequence: ptr(uint32(137))}
446+
})
444447

445-
// CreationTime
448+
// ReplyToGroupID
449+
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
450+
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
451+
})
446452

447-
publishMessagesWithMessageLogic(qName, "CreationTime", 10, func(msg *amqp.Message) {
448-
msg.Properties = &amqp.MessageProperties{CreationTime: ptr(createDateTime())}
449-
})
453+
// CreationTime
450454

451-
// AbsoluteExpiryTime
455+
publishMessagesWithMessageLogic(qName, "CreationTime", 10, func(msg *amqp.Message) {
456+
msg.Properties = &amqp.MessageProperties{CreationTime: ptr(createDateTime())}
457+
})
452458

453-
publishMessagesWithMessageLogic(qName, "AbsoluteExpiryTime", 10, func(msg *amqp.Message) {
454-
msg.Properties = &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}
455-
})
459+
// AbsoluteExpiryTime
456460

457-
// CorrelationID
461+
publishMessagesWithMessageLogic(qName, "AbsoluteExpiryTime", 10, func(msg *amqp.Message) {
462+
msg.Properties = &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}
463+
})
464+
465+
// CorrelationID
458466

459-
publishMessagesWithMessageLogic(qName, "CorrelationID", 10, func(msg *amqp.Message) {
460-
msg.Properties = &amqp.MessageProperties{CorrelationID: "CorrelationID"}
467+
publishMessagesWithMessageLogic(qName, "CorrelationID", 10, func(msg *amqp.Message) {
468+
msg.Properties = &amqp.MessageProperties{CorrelationID: "CorrelationID"}
469+
})
470+
})
471+
472+
AfterEach(func() {
473+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
474+
Expect(connection.Close(context.Background())).To(BeNil())
461475
})
462476

463-
var wg sync.WaitGroup
464-
wg.Add(12)
465477
DescribeTable("consumer should filter messages based on properties", func(properties *amqp.MessageProperties, label string) {
466478

467479
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
@@ -533,7 +545,6 @@ var _ = Describe("Consumer stream test", func() {
533545
Expect(dc.Accept(context.Background())).To(BeNil())
534546
}
535547
Expect(consumer.Close(context.Background())).To(BeNil())
536-
wg.Done()
537548
},
538549
Entry("MessageID", &amqp.MessageProperties{MessageID: "MessageID"}, "MessageID"),
539550
Entry("Subject", &amqp.MessageProperties{Subject: ptr("Subject")}, "Subject"),
@@ -548,11 +559,6 @@ var _ = Describe("Consumer stream test", func() {
548559
Entry("AbsoluteExpiryTime", &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}, "AbsoluteExpiryTime"),
549560
Entry("CorrelationID", &amqp.MessageProperties{CorrelationID: "CorrelationID"}, "CorrelationID"),
550561
)
551-
go func() {
552-
wg.Wait()
553-
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
554-
Expect(connection.Close(context.Background())).To(BeNil())
555-
}()
556562
})
557563

558564
It("SQL filter consumer", func() {

0 commit comments

Comments
 (0)