Skip to content

Commit 4e73e82

Browse files
authored
Refactor and test fixes (#66)
* Refactor and remove QueueType It is just a wrapper around TQueueType * Fix test assertions running outside subject nodes Assertions should be made only inside subject nodes i.e. It(), otherwise, a failed assertion panics and the test code does not recover correctly. * Refactor exchange type It had a wrapper struct. It's simpler to use the exchange type directly * Fix assertions running outside subject node Because it can cause panics on failed assertions, and it does not respect the focus marker i.e. it always used to run.
1 parent c8d3639 commit 4e73e82

File tree

5 files changed

+147
-160
lines changed

5 files changed

+147
-160
lines changed

pkg/rabbitmqamqp/amqp_connection_test.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"crypto/tls"
66
"crypto/x509"
77
"fmt"
8+
"os"
9+
"time"
10+
811
"github.com/Azure/go-amqp"
912
. "github.com/onsi/ginkgo/v2"
1013
. "github.com/onsi/gomega"
11-
"os"
12-
"sync"
13-
"time"
1414
)
1515

1616
var _ = Describe("AMQP connection Test", func() {
@@ -117,12 +117,10 @@ var _ = Describe("AMQP connection Test", func() {
117117
})
118118

119119
Describe("AMQP TLS connection should succeed with in different vhosts with Anonymous and External.", func() {
120-
wg := &sync.WaitGroup{}
121-
wg.Add(4)
122120
DescribeTable("TLS connection should success in different vhosts ", func(virtualHost string, sasl amqp.SASLType) {
123121
// Load CA cert
124122
caCert, err := os.ReadFile("../../.ci/certs/ca_certificate.pem")
125-
Expect(err).To(BeNil())
123+
Expect(err).ToNot(HaveOccurred())
126124

127125
// Create a CA certificate pool and add the CA certificate to it
128126
caCertPool := x509.NewCertPool()
@@ -131,7 +129,7 @@ var _ = Describe("AMQP connection Test", func() {
131129
// Load client cert
132130
clientCert, err := tls.LoadX509KeyPair("../../.ci/certs/client_localhost_certificate.pem",
133131
"../../.ci/certs/client_localhost_key.pem")
134-
Expect(err).To(BeNil())
132+
Expect(err).ToNot(HaveOccurred())
135133

136134
// Create a TLS configuration
137135
tlsConfig := &tls.Config{
@@ -146,34 +144,32 @@ var _ = Describe("AMQP connection Test", func() {
146144
SASLType: sasl,
147145
TLSConfig: tlsConfig,
148146
})
149-
Expect(err).To(BeNil())
147+
Expect(err).ToNot(HaveOccurred())
150148
Expect(connection).NotTo(BeNil())
151149

152150
// Close the connection
153151
err = connection.Close(context.Background())
154-
Expect(err).To(BeNil())
155-
wg.Done()
152+
Expect(err).ToNot(HaveOccurred())
156153
},
157-
Entry("with virtual host. External", "%2F", amqp.SASLTypeExternal("")),
158-
Entry("with a not default virtual host. External", "tls", amqp.SASLTypeExternal("")),
159-
Entry("with virtual host. Anonymous", "%2F", amqp.SASLTypeAnonymous()),
160-
Entry("with a not default virtual host. Anonymous", "tls", amqp.SASLTypeAnonymous()),
154+
Entry("default virtual host + External", "%2F", amqp.SASLTypeExternal("")),
155+
Entry("non-default virtual host + External", "tls", amqp.SASLTypeExternal("")),
156+
Entry("default virtual host + Anonymous", "%2F", amqp.SASLTypeAnonymous()),
157+
Entry("non-default virtual host + Anonymous", "tls", amqp.SASLTypeAnonymous()),
161158
)
162-
go func() {
163-
wg.Wait()
164-
}()
165159
})
166160

167-
Describe("AMQP TLS connection should fail with error.", func() {
168-
tlsConfig := &tls.Config{}
161+
Describe("AMQP TLS connection", func() {
162+
It("should fail with error", func() {
163+
tlsConfig := &tls.Config{}
169164

170-
// Dial the AMQP server with TLS configuration
171-
connection, err := Dial(context.Background(), "amqps://does_not_exist:5671", &AmqpConnOptions{
172-
TLSConfig: tlsConfig,
165+
// Dial the AMQP server with TLS configuration
166+
connection, err := Dial(context.Background(), "amqps://does_not_exist:5671", &AmqpConnOptions{
167+
TLSConfig: tlsConfig,
168+
})
169+
Expect(connection).To(BeNil())
170+
Expect(err).To(HaveOccurred())
171+
Expect(err.Error()).To(ContainSubstring("failed to open TLS connection"))
173172
})
174-
Expect(connection).To(BeNil())
175-
Expect(err).To(HaveOccurred())
176-
Expect(err.Error()).To(ContainSubstring("failed to open TLS connection"))
177173
})
178174

179175
})

pkg/rabbitmqamqp/amqp_consumer_stream_test.go

Lines changed: 92 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rabbitmqamqp
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
"github.com/Azure/go-amqp"
@@ -320,33 +319,43 @@ var _ = Describe("Consumer stream test", func() {
320319
})
321320

322321
Describe("consumer should filter messages based on application properties", func() {
323-
qName := generateName("consumer should filter messages based on application properties")
324-
connection, err := Dial(context.Background(), "amqp://", nil)
325-
Expect(err).To(BeNil())
326-
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
327-
Name: qName,
328-
})
329-
Expect(err).To(BeNil())
330-
Expect(queueInfo).NotTo(BeNil())
322+
var (
323+
qName string
324+
connection *AmqpConnection
325+
)
326+
BeforeEach(func() {
327+
qName = generateName("consumer should filter messages based on application properties")
328+
var err error
329+
connection, err = Dial(context.Background(), "amqp://", nil)
330+
Expect(err).ToNot(HaveOccurred())
331+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
332+
Name: qName,
333+
})
334+
Expect(err).ToNot(HaveOccurred())
335+
Expect(queueInfo).NotTo(BeNil())
331336

332-
publishMessagesWithMessageLogic(qName, "ignoredKey", 7, func(msg *amqp.Message) {
333-
msg.ApplicationProperties = map[string]interface{}{"ignoredKey": "ignoredValue"}
334-
})
337+
publishMessagesWithMessageLogic(qName, "ignoredKey", 7, func(msg *amqp.Message) {
338+
msg.ApplicationProperties = map[string]interface{}{"ignoredKey": "ignoredValue"}
339+
})
335340

336-
publishMessagesWithMessageLogic(qName, "key1", 10, func(msg *amqp.Message) {
337-
msg.ApplicationProperties = map[string]interface{}{"key1": "value1", "constFilterKey": "constFilterValue"}
338-
})
341+
publishMessagesWithMessageLogic(qName, "key1", 10, func(msg *amqp.Message) {
342+
msg.ApplicationProperties = map[string]interface{}{"key1": "value1", "constFilterKey": "constFilterValue"}
343+
})
344+
345+
publishMessagesWithMessageLogic(qName, "key2", 10, func(msg *amqp.Message) {
346+
msg.ApplicationProperties = map[string]interface{}{"key2": "value2", "constFilterKey": "constFilterValue"}
347+
})
339348

340-
publishMessagesWithMessageLogic(qName, "key2", 10, func(msg *amqp.Message) {
341-
msg.ApplicationProperties = map[string]interface{}{"key2": "value2", "constFilterKey": "constFilterValue"}
349+
publishMessagesWithMessageLogic(qName, "key3", 10, func(msg *amqp.Message) {
350+
msg.ApplicationProperties = map[string]interface{}{"key3": "value3", "constFilterKey": "constFilterValue"}
351+
})
342352
})
343353

344-
publishMessagesWithMessageLogic(qName, "key3", 10, func(msg *amqp.Message) {
345-
msg.ApplicationProperties = map[string]interface{}{"key3": "value3", "constFilterKey": "constFilterValue"}
354+
AfterEach(func() {
355+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(Succeed())
356+
Expect(connection.Close(context.Background())).To(Succeed())
346357
})
347358

348-
var wg sync.WaitGroup
349-
wg.Add(3)
350359
DescribeTable("consumer should filter messages based on application properties", func(key string, value any, label string) {
351360

352361
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
@@ -375,93 +384,96 @@ var _ = Describe("Consumer stream test", func() {
375384
Expect(dc.Accept(context.Background())).To(BeNil())
376385
}
377386
Expect(consumer.Close(context.Background())).To(BeNil())
378-
wg.Done()
379387
},
380388
Entry("key1 value1", "key1", "value1", "key1"),
381389
Entry("key2 value2", "key2", "value2", "key2"),
382390
Entry("key3 value3", "key3", "value3", "key3"),
383391
)
384-
go func() {
385-
wg.Wait()
386-
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
387-
Expect(connection.Close(context.Background())).To(BeNil())
388-
}()
389-
390392
})
391393

392394
Describe("consumer should filter messages based on properties", func() {
393395
/*
394396
Test the consumer should filter messages based on properties
395397
*/
396-
// TODO: defer cleanup to delete the stream queue
397-
qName := generateName("consumer should filter messages based on properties")
398-
qName += time.Now().String()
399-
connection, err := Dial(context.Background(), "amqp://", nil)
400-
Expect(err).To(BeNil())
401-
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
402-
Name: qName,
403-
})
404-
Expect(err).To(BeNil())
405-
Expect(queueInfo).NotTo(BeNil())
398+
var (
399+
qName string
400+
connection *AmqpConnection
401+
)
406402

407-
publishMessagesWithMessageLogic(qName, "MessageID", 10, func(msg *amqp.Message) {
408-
msg.Properties = &amqp.MessageProperties{MessageID: "MessageID"}
409-
})
403+
BeforeEach(func() {
404+
qName = generateName("consumer should filter messages based on properties")
405+
qName += time.Now().String()
406+
var err error
407+
connection, err = Dial(context.Background(), "amqp://", nil)
408+
Expect(err).ToNot(HaveOccurred())
409+
queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{
410+
Name: qName,
411+
})
412+
Expect(err).ToNot(HaveOccurred())
413+
Expect(queueInfo).NotTo(BeNil())
410414

411-
publishMessagesWithMessageLogic(qName, "Subject", 10, func(msg *amqp.Message) {
412-
msg.Properties = &amqp.MessageProperties{Subject: ptr("Subject")}
413-
})
415+
publishMessagesWithMessageLogic(qName, "MessageID", 10, func(msg *amqp.Message) {
416+
msg.Properties = &amqp.MessageProperties{MessageID: "MessageID"}
417+
})
414418

415-
publishMessagesWithMessageLogic(qName, "ReplyTo", 10, func(msg *amqp.Message) {
416-
msg.Properties = &amqp.MessageProperties{ReplyTo: ptr("ReplyTo")}
417-
})
419+
publishMessagesWithMessageLogic(qName, "Subject", 10, func(msg *amqp.Message) {
420+
msg.Properties = &amqp.MessageProperties{Subject: ptr("Subject")}
421+
})
418422

419-
publishMessagesWithMessageLogic(qName, "ContentType", 10, func(msg *amqp.Message) {
420-
msg.Properties = &amqp.MessageProperties{ContentType: ptr("ContentType")}
421-
})
423+
publishMessagesWithMessageLogic(qName, "ReplyTo", 10, func(msg *amqp.Message) {
424+
msg.Properties = &amqp.MessageProperties{ReplyTo: ptr("ReplyTo")}
425+
})
422426

423-
publishMessagesWithMessageLogic(qName, "ContentEncoding", 10, func(msg *amqp.Message) {
424-
msg.Properties = &amqp.MessageProperties{ContentEncoding: ptr("ContentEncoding")}
425-
})
427+
publishMessagesWithMessageLogic(qName, "ContentType", 10, func(msg *amqp.Message) {
428+
msg.Properties = &amqp.MessageProperties{ContentType: ptr("ContentType")}
429+
})
426430

427-
publishMessagesWithMessageLogic(qName, "GroupID", 10, func(msg *amqp.Message) {
428-
msg.Properties = &amqp.MessageProperties{GroupID: ptr("GroupID")}
429-
})
431+
publishMessagesWithMessageLogic(qName, "ContentEncoding", 10, func(msg *amqp.Message) {
432+
msg.Properties = &amqp.MessageProperties{ContentEncoding: ptr("ContentEncoding")}
433+
})
430434

431-
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
432-
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
433-
})
435+
publishMessagesWithMessageLogic(qName, "GroupID", 10, func(msg *amqp.Message) {
436+
msg.Properties = &amqp.MessageProperties{GroupID: ptr("GroupID")}
437+
})
434438

435-
// GroupSequence
436-
publishMessagesWithMessageLogic(qName, "GroupSequence", 10, func(msg *amqp.Message) {
437-
msg.Properties = &amqp.MessageProperties{GroupSequence: ptr(uint32(137))}
438-
})
439+
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
440+
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
441+
})
439442

440-
// ReplyToGroupID
441-
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
442-
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
443-
})
443+
// GroupSequence
444+
publishMessagesWithMessageLogic(qName, "GroupSequence", 10, func(msg *amqp.Message) {
445+
msg.Properties = &amqp.MessageProperties{GroupSequence: ptr(uint32(137))}
446+
})
444447

445-
// CreationTime
448+
// ReplyToGroupID
449+
publishMessagesWithMessageLogic(qName, "ReplyToGroupID", 10, func(msg *amqp.Message) {
450+
msg.Properties = &amqp.MessageProperties{ReplyToGroupID: ptr("ReplyToGroupID")}
451+
})
446452

447-
publishMessagesWithMessageLogic(qName, "CreationTime", 10, func(msg *amqp.Message) {
448-
msg.Properties = &amqp.MessageProperties{CreationTime: ptr(createDateTime())}
449-
})
453+
// CreationTime
450454

451-
// AbsoluteExpiryTime
455+
publishMessagesWithMessageLogic(qName, "CreationTime", 10, func(msg *amqp.Message) {
456+
msg.Properties = &amqp.MessageProperties{CreationTime: ptr(createDateTime())}
457+
})
452458

453-
publishMessagesWithMessageLogic(qName, "AbsoluteExpiryTime", 10, func(msg *amqp.Message) {
454-
msg.Properties = &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}
455-
})
459+
// AbsoluteExpiryTime
456460

457-
// CorrelationID
461+
publishMessagesWithMessageLogic(qName, "AbsoluteExpiryTime", 10, func(msg *amqp.Message) {
462+
msg.Properties = &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}
463+
})
464+
465+
// CorrelationID
458466

459-
publishMessagesWithMessageLogic(qName, "CorrelationID", 10, func(msg *amqp.Message) {
460-
msg.Properties = &amqp.MessageProperties{CorrelationID: "CorrelationID"}
467+
publishMessagesWithMessageLogic(qName, "CorrelationID", 10, func(msg *amqp.Message) {
468+
msg.Properties = &amqp.MessageProperties{CorrelationID: "CorrelationID"}
469+
})
470+
})
471+
472+
AfterEach(func() {
473+
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
474+
Expect(connection.Close(context.Background())).To(BeNil())
461475
})
462476

463-
var wg sync.WaitGroup
464-
wg.Add(12)
465477
DescribeTable("consumer should filter messages based on properties", func(properties *amqp.MessageProperties, label string) {
466478

467479
consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{
@@ -533,7 +545,6 @@ var _ = Describe("Consumer stream test", func() {
533545
Expect(dc.Accept(context.Background())).To(BeNil())
534546
}
535547
Expect(consumer.Close(context.Background())).To(BeNil())
536-
wg.Done()
537548
},
538549
Entry("MessageID", &amqp.MessageProperties{MessageID: "MessageID"}, "MessageID"),
539550
Entry("Subject", &amqp.MessageProperties{Subject: ptr("Subject")}, "Subject"),
@@ -548,11 +559,6 @@ var _ = Describe("Consumer stream test", func() {
548559
Entry("AbsoluteExpiryTime", &amqp.MessageProperties{AbsoluteExpiryTime: ptr(createDateTime())}, "AbsoluteExpiryTime"),
549560
Entry("CorrelationID", &amqp.MessageProperties{CorrelationID: "CorrelationID"}, "CorrelationID"),
550561
)
551-
go func() {
552-
wg.Wait()
553-
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
554-
Expect(connection.Close(context.Background())).To(BeNil())
555-
}()
556562
})
557563

558564
It("SQL filter consumer", func() {

pkg/rabbitmqamqp/amqp_exchange.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rabbitmqamqp
33
import (
44
"context"
55
"errors"
6+
67
"github.com/Azure/go-amqp"
78
)
89

@@ -23,14 +24,14 @@ type AmqpExchange struct {
2324
management *AmqpManagement
2425
arguments map[string]any
2526
isAutoDelete bool
26-
exchangeType ExchangeType
27+
exchangeType TExchangeType
2728
}
2829

2930
func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
3031
return &AmqpExchange{management: management,
3132
name: name,
3233
arguments: make(map[string]any),
33-
exchangeType: ExchangeType{Type: Direct},
34+
exchangeType: Direct,
3435
}
3536
}
3637

@@ -46,7 +47,7 @@ func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) {
4647
kv := make(map[string]any)
4748
kv["auto_delete"] = e.isAutoDelete
4849
kv["durable"] = true
49-
kv["type"] = e.exchangeType.String()
50+
kv["type"] = string(e.exchangeType)
5051
if e.arguments != nil {
5152
kv["arguments"] = e.arguments
5253
}
@@ -78,14 +79,14 @@ func (e *AmqpExchange) Delete(ctx context.Context) error {
7879
return err
7980
}
8081

81-
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) {
82-
if len(exchangeType.Type) > 0 {
82+
func (e *AmqpExchange) ExchangeType(exchangeType TExchangeType) {
83+
if len(exchangeType) > 0 {
8384
e.exchangeType = exchangeType
8485
}
8586
}
8687

8788
func (e *AmqpExchange) GetExchangeType() TExchangeType {
88-
return e.exchangeType.Type
89+
return e.exchangeType
8990
}
9091

9192
func (e *AmqpExchange) Name() string {

0 commit comments

Comments
 (0)