Skip to content

Commit 89c4dd7

Browse files
authored
Refactor the API interfaces (#21)
* Refactor the API * Use the interfaces to define the targets for queues, exchanges and bindings * Implement the producer message target-based --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent bfc8b02 commit 89c4dd7

18 files changed

+907
-279
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,10 @@ This library is in early stages of development. It is meant to be used with Rabb
1010

1111
## Getting Started
1212

13-
You can find an example in: `examples/getting_started`
13+
You can find an example in: `docs/examples/getting_started`
14+
15+
## Examples
16+
17+
You can find more examples in: `docs/examples`
18+
19+

examples/getting_started/main.go renamed to docs/examples/getting_started/main.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func main() {
3737
// Create the management interface for the connection
3838
// so we can declare exchanges, queues, and bindings
3939
management := amqpConnection.Management()
40-
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
40+
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.TopicExchangeSpecification{
4141
Name: exchangeName,
4242
})
4343
if err != nil {
@@ -46,9 +46,8 @@ func main() {
4646
}
4747

4848
// Declare a Quorum queue
49-
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
50-
Name: queueName,
51-
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
49+
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
50+
Name: queueName,
5251
})
5352

5453
if err != nil {
@@ -57,7 +56,7 @@ func main() {
5756
}
5857

5958
// Bind the queue to the exchange
60-
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
59+
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.ExchangeToQueueBindingSpecification{
6160
SourceExchange: exchangeName,
6261
DestinationQueue: queueName,
6362
BindingKey: routingKey,
@@ -70,8 +69,10 @@ func main() {
7069

7170
// Create a consumer to receive messages from the queue
7271
// you need to build the address of the queue, but you can use the helper function
73-
addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
74-
consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")
72+
73+
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
74+
Queue: queueName,
75+
}, "getting-started-consumer")
7576
if err != nil {
7677
rabbitmq_amqp.Error("Error creating consumer", err)
7778
return
@@ -105,8 +106,10 @@ func main() {
105106
}
106107
}(consumerContext)
107108

108-
addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
109-
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
109+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{
110+
Exchange: exchangeName,
111+
Key: routingKey,
112+
}, "getting-started-publisher")
110113
if err != nil {
111114
rabbitmq_amqp.Error("Error creating publisher", err)
112115
return
@@ -151,10 +154,12 @@ func main() {
151154
err = consumer.Close(context.Background())
152155
if err != nil {
153156
rabbitmq_amqp.Error("[Consumer]", err)
157+
return
154158
}
155159
// Close the publisher
156160
err = publisher.Close(context.Background())
157161
if err != nil {
162+
rabbitmq_amqp.Error("[Publisher]", err)
158163
return
159164
}
160165

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ go 1.22.0
55
require (
66
github.com/Azure/go-amqp v1.4.0-beta.1
77
github.com/google/uuid v1.6.0
8-
github.com/onsi/ginkgo/v2 v2.20.2
9-
github.com/onsi/gomega v1.34.2
8+
github.com/onsi/ginkgo/v2 v2.22.1
9+
github.com/onsi/gomega v1.36.2
1010
)
1111

1212
require (
1313
github.com/go-logr/logr v1.4.2 // indirect
1414
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
1515
github.com/google/go-cmp v0.6.0 // indirect
16-
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
16+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
1717
golang.org/x/net v0.33.0 // indirect
1818
golang.org/x/sys v0.28.0 // indirect
1919
golang.org/x/text v0.21.0 // indirect
20-
golang.org/x/tools v0.25.0 // indirect
20+
golang.org/x/tools v0.28.0 // indirect
2121
gopkg.in/yaml.v3 v3.0.1 // indirect
2222
)

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
1010
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
1111
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1212
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
13-
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
14-
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
13+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
14+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
1515
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1616
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
17-
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
18-
github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag=
19-
github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8=
20-
github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc=
17+
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
18+
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
19+
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
20+
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
2121
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2222
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2323
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
@@ -28,10 +28,10 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
2828
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2929
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
3030
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
31-
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
32-
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
33-
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
34-
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
31+
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
32+
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
33+
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
34+
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
3535
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
3636
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3737
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

rabbitmq_amqp/address_helper.go renamed to rabbitmq_amqp/address.go

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,71 @@ package rabbitmq_amqp
33
import (
44
"errors"
55
"fmt"
6+
"github.com/Azure/go-amqp"
67
"strings"
78
)
89

9-
// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
10+
// TargetAddress is an interface that represents an address that can be used to send messages to.
11+
// It can be either a Queue or an Exchange with a routing key.
12+
type TargetAddress interface {
13+
toAddress() (string, error)
14+
}
15+
16+
// QueueAddress represents the address of a queue.
17+
type QueueAddress struct {
18+
Queue string // The name of the queue
19+
Parameters string // Additional parameters not related to the queue. Most of the time it is empty
20+
}
21+
22+
func (qas *QueueAddress) toAddress() (string, error) {
23+
q := &qas.Queue
24+
if isStringNilOrEmpty(&qas.Queue) {
25+
q = nil
26+
}
27+
return queueAddress(q)
28+
}
29+
30+
// ExchangeAddress represents the address of an exchange with a routing key.
31+
type ExchangeAddress struct {
32+
Exchange string // The name of the exchange
33+
Key string // The routing key. Can be empty
34+
}
35+
36+
func (eas *ExchangeAddress) toAddress() (string, error) {
37+
ex := &eas.Exchange
38+
if isStringNilOrEmpty(&eas.Exchange) {
39+
ex = nil
40+
}
41+
k := &eas.Key
42+
if isStringNilOrEmpty(&eas.Key) {
43+
k = nil
44+
}
45+
return exchangeAddress(ex, k)
46+
}
47+
48+
// MessageToAddressHelper sets the To property of the message to the address of the target.
49+
// The target must be a QueueAddress or an ExchangeAddress.
50+
// Note: The field To will be overwritten if it is already set.
51+
func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error {
52+
if target == nil {
53+
return errors.New("target cannot be nil")
54+
}
55+
56+
address, err := target.toAddress()
57+
if err != nil {
58+
return err
59+
}
60+
61+
if msgRef.Properties == nil {
62+
msgRef.Properties = &amqp.MessageProperties{}
63+
}
64+
msgRef.Properties.To = &address
65+
return nil
66+
}
67+
68+
// address Creates the address for the exchange or queue following the RabbitMQ conventions.
1069
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
11-
func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
70+
func address(exchange, key, queue *string, urlParameters *string) (string, error) {
1271
if exchange == nil && queue == nil {
1372
return "", errors.New("exchange or queue must be set")
1473
}
@@ -39,23 +98,23 @@ func Address(exchange, key, queue *string, urlParameters *string) (string, error
3998
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
4099
}
41100

42-
// ExchangeAddress Creates the address for the exchange
43-
// See Address for more information
44-
func ExchangeAddress(exchange, key *string) (string, error) {
45-
return Address(exchange, key, nil, nil)
101+
// exchangeAddress Creates the address for the exchange
102+
// See address for more information
103+
func exchangeAddress(exchange, key *string) (string, error) {
104+
return address(exchange, key, nil, nil)
46105
}
47106

48-
// QueueAddress Creates the address for the queue.
49-
// See Address for more information
50-
func QueueAddress(queue *string) (string, error) {
51-
return Address(nil, nil, queue, nil)
107+
// queueAddress Creates the address for the queue.
108+
// See address for more information
109+
func queueAddress(queue *string) (string, error) {
110+
return address(nil, nil, queue, nil)
52111
}
53112

54113
// PurgeQueueAddress Creates the address for purging the queue.
55-
// See Address for more information
56-
func PurgeQueueAddress(queue *string) (string, error) {
114+
// See address for more information
115+
func purgeQueueAddress(queue *string) (string, error) {
57116
parameter := "/messages"
58-
return Address(nil, nil, queue, &parameter)
117+
return address(nil, nil, queue, &parameter)
59118
}
60119

61120
// encodePathSegments takes a string and returns its percent-encoded representation.
@@ -112,6 +171,9 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
112171
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
113172
}
114173

115-
func validateAddress(address string) bool {
116-
return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
174+
func validateAddress(address string) error {
175+
if strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues)) {
176+
return nil
177+
}
178+
return fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
117179
}

rabbitmq_amqp/address_helper_test.go renamed to rabbitmq_amqp/address_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ import (
55
. "github.com/onsi/gomega"
66
)
77

8-
var _ = Describe("Address builder test ", func() {
8+
var _ = Describe("address builder test ", func() {
99
It("With exchange, queue and key should raise and error", func() {
1010
queue := "my_queue"
1111
exchange := "my_exchange"
1212

13-
_, err := Address(&exchange, nil, &queue, nil)
13+
_, err := address(&exchange, nil, &queue, nil)
1414
Expect(err).NotTo(BeNil())
1515
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
1616
})
1717

1818
It("Without exchange and queue should raise and error", func() {
19-
_, err := Address(nil, nil, nil, nil)
19+
_, err := address(nil, nil, nil, nil)
2020
Expect(err).NotTo(BeNil())
2121
Expect(err.Error()).To(Equal("exchange or queue must be set"))
2222
})
@@ -25,14 +25,14 @@ var _ = Describe("Address builder test ", func() {
2525
exchange := "my_exchange"
2626
key := "my_key"
2727

28-
address, err := Address(&exchange, &key, nil, nil)
28+
address, err := address(&exchange, &key, nil, nil)
2929
Expect(err).To(BeNil())
3030
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
3131
})
3232

3333
It("With exchange should return address", func() {
3434
exchange := "my_exchange"
35-
address, err := Address(&exchange, nil, nil, nil)
35+
address, err := address(&exchange, nil, nil, nil)
3636
Expect(err).To(BeNil())
3737
Expect(address).To(Equal("/exchanges/my_exchange"))
3838
})
@@ -42,21 +42,21 @@ var _ = Describe("Address builder test ", func() {
4242
exchange := "my_ exchange/()"
4343
key := "my_key "
4444

45-
address, err := Address(&exchange, &key, nil, nil)
45+
address, err := address(&exchange, &key, nil, nil)
4646
Expect(err).To(BeNil())
4747
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
4848
})
4949

5050
It("With queue should return address", func() {
5151
queue := "my_queue>"
52-
address, err := Address(nil, nil, &queue, nil)
52+
address, err := address(nil, nil, &queue, nil)
5353
Expect(err).To(BeNil())
5454
Expect(address).To(Equal("/queues/my_queue%3E"))
5555
})
5656

5757
It("With queue and urlParameters should return address", func() {
5858
queue := "my_queue"
59-
address, err := PurgeQueueAddress(&queue)
59+
address, err := purgeQueueAddress(&queue)
6060
Expect(err).To(BeNil())
6161
Expect(address).To(Equal("/queues/my_queue/messages"))
6262
})

rabbitmq_amqp/amqp_binding.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
"errors"
56
"github.com/Azure/go-amqp"
67
)
78

@@ -31,30 +32,30 @@ func (b *AMQPBinding) SourceExchange(sourceName string) {
3132
}
3233
}
3334

34-
func (b *AMQPBinding) DestinationExchange(destinationName string) {
35-
if len(destinationName) > 0 {
36-
b.destinationName = destinationName
37-
b.toQueue = false
38-
}
39-
}
40-
41-
func (b *AMQPBinding) DestinationQueue(queueName string) {
42-
if len(queueName) > 0 {
43-
b.destinationName = queueName
44-
b.toQueue = true
45-
}
35+
func (b *AMQPBinding) Destination(name string, isQueue bool) {
36+
b.destinationName = name
37+
b.toQueue = isQueue
4638
}
4739

4840
// Bind creates a binding between an exchange and a queue or exchange
4941
// with the specified binding key.
5042
// Returns the binding path that can be used to unbind the binding.
5143
// Given a virtual host, the binding path is unique.
5244
func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
45+
destination := "destination_queue"
46+
if !b.toQueue {
47+
destination = "destination_exchange"
48+
}
49+
50+
if len(b.sourceName) == 0 || len(b.destinationName) == 0 {
51+
return "", errors.New("source and destination names are required")
52+
}
53+
5354
path := bindingPath()
5455
kv := make(map[string]any)
5556
kv["binding_key"] = b.bindingKey
5657
kv["source"] = b.sourceName
57-
kv["destination_queue"] = b.destinationName
58+
kv[destination] = b.destinationName
5859
kv["arguments"] = make(map[string]any)
5960
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
6061
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)

0 commit comments

Comments
 (0)