Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ This library is in early stages of development. It is meant to be used with Rabb

## Getting Started

You can find an example in: `examples/getting_started`
You can find an example in: `docs/examples/getting_started`

## Examples

You can find more examples in: `docs/examples`


Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
Expand All @@ -46,9 +46,8 @@ func main() {
}

// Declare a Quorum queue
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
Name: queueName,
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
Name: queueName,
})

if err != nil {
Expand All @@ -57,7 +56,7 @@ func main() {
}

// Bind the queue to the exchange
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
Expand All @@ -70,8 +69,10 @@ func main() {

// Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function
addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")

consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "getting-started-consumer")
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
Expand Down Expand Up @@ -105,8 +106,10 @@ func main() {
}
}(consumerContext)

addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, "getting-started-publisher")
if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err)
return
Expand Down Expand Up @@ -151,10 +154,12 @@ func main() {
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Consumer]", err)
return
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Publisher]", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ go 1.22.0
require (
github.com/Azure/go-amqp v1.4.0-beta.1
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
github.com/onsi/ginkgo/v2 v2.22.1
github.com/onsi/gomega v1.36.2
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.25.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag=
github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8=
github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc=
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand All @@ -28,10 +28,10 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
92 changes: 77 additions & 15 deletions rabbitmq_amqp/address_helper.go → rabbitmq_amqp/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,71 @@ package rabbitmq_amqp
import (
"errors"
"fmt"
"github.com/Azure/go-amqp"
"strings"
)

// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
// TargetAddress is an interface that represents an address that can be used to send messages to.
// It can be either a Queue or an Exchange with a routing key.
type TargetAddress interface {
toAddress() (string, error)
}

// QueueAddress represents the address of a queue.
type QueueAddress struct {
Queue string // The name of the queue
Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}

func (qas *QueueAddress) toAddress() (string, error) {
q := &qas.Queue
if isStringNilOrEmpty(&qas.Queue) {
q = nil
}
return queueAddress(q)
}

// ExchangeAddress represents the address of an exchange with a routing key.
type ExchangeAddress struct {
Exchange string // The name of the exchange
Key string // The routing key. Can be empty
}

func (eas *ExchangeAddress) toAddress() (string, error) {
ex := &eas.Exchange
if isStringNilOrEmpty(&eas.Exchange) {
ex = nil
}
k := &eas.Key
if isStringNilOrEmpty(&eas.Key) {
k = nil
}
return exchangeAddress(ex, k)
}

// MessageToAddressHelper sets the To property of the message to the address of the target.
// The target must be a QueueAddress or an ExchangeAddress.
// Note: The field To will be overwritten if it is already set.
func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error {
if target == nil {
return errors.New("target cannot be nil")
}

address, err := target.toAddress()
if err != nil {
return err
}

if msgRef.Properties == nil {
msgRef.Properties = &amqp.MessageProperties{}
}
msgRef.Properties.To = &address
return nil
}

// address Creates the address for the exchange or queue following the RabbitMQ conventions.
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
func address(exchange, key, queue *string, urlParameters *string) (string, error) {
if exchange == nil && queue == nil {
return "", errors.New("exchange or queue must be set")
}
Expand Down Expand Up @@ -39,23 +98,23 @@ func Address(exchange, key, queue *string, urlParameters *string) (string, error
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
}

// ExchangeAddress Creates the address for the exchange
// See Address for more information
func ExchangeAddress(exchange, key *string) (string, error) {
return Address(exchange, key, nil, nil)
// exchangeAddress Creates the address for the exchange
// See address for more information
func exchangeAddress(exchange, key *string) (string, error) {
return address(exchange, key, nil, nil)
}

// QueueAddress Creates the address for the queue.
// See Address for more information
func QueueAddress(queue *string) (string, error) {
return Address(nil, nil, queue, nil)
// queueAddress Creates the address for the queue.
// See address for more information
func queueAddress(queue *string) (string, error) {
return address(nil, nil, queue, nil)
}

// PurgeQueueAddress Creates the address for purging the queue.
// See Address for more information
func PurgeQueueAddress(queue *string) (string, error) {
// See address for more information
func purgeQueueAddress(queue *string) (string, error) {
parameter := "/messages"
return Address(nil, nil, queue, &parameter)
return address(nil, nil, queue, &parameter)
}

// encodePathSegments takes a string and returns its percent-encoded representation.
Expand Down Expand Up @@ -112,6 +171,9 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}

func validateAddress(address string) bool {
return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
func validateAddress(address string) error {
if strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues)) {
return nil
}
return fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
. "github.com/onsi/gomega"
)

var _ = Describe("Address builder test ", func() {
var _ = Describe("address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
queue := "my_queue"
exchange := "my_exchange"

_, err := Address(&exchange, nil, &queue, nil)
_, err := address(&exchange, nil, &queue, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})

It("Without exchange and queue should raise and error", func() {
_, err := Address(nil, nil, nil, nil)
_, err := address(nil, nil, nil, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})
Expand All @@ -25,14 +25,14 @@ var _ = Describe("Address builder test ", func() {
exchange := "my_exchange"
key := "my_key"

address, err := Address(&exchange, &key, nil, nil)
address, err := address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})

It("With exchange should return address", func() {
exchange := "my_exchange"
address, err := Address(&exchange, nil, nil, nil)
address, err := address(&exchange, nil, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})
Expand All @@ -42,21 +42,21 @@ var _ = Describe("Address builder test ", func() {
exchange := "my_ exchange/()"
key := "my_key "

address, err := Address(&exchange, &key, nil, nil)
address, err := address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})

It("With queue should return address", func() {
queue := "my_queue>"
address, err := Address(nil, nil, &queue, nil)
address, err := address(nil, nil, &queue, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})

It("With queue and urlParameters should return address", func() {
queue := "my_queue"
address, err := PurgeQueueAddress(&queue)
address, err := purgeQueueAddress(&queue)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})
Expand Down
27 changes: 14 additions & 13 deletions rabbitmq_amqp/amqp_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq_amqp

import (
"context"
"errors"
"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -31,30 +32,30 @@ func (b *AMQPBinding) SourceExchange(sourceName string) {
}
}

func (b *AMQPBinding) DestinationExchange(destinationName string) {
if len(destinationName) > 0 {
b.destinationName = destinationName
b.toQueue = false
}
}

func (b *AMQPBinding) DestinationQueue(queueName string) {
if len(queueName) > 0 {
b.destinationName = queueName
b.toQueue = true
}
func (b *AMQPBinding) Destination(name string, isQueue bool) {
b.destinationName = name
b.toQueue = isQueue
}

// Bind creates a binding between an exchange and a queue or exchange
// with the specified binding key.
// Returns the binding path that can be used to unbind the binding.
// Given a virtual host, the binding path is unique.
func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
destination := "destination_queue"
if !b.toQueue {
destination = "destination_exchange"
}

if len(b.sourceName) == 0 || len(b.destinationName) == 0 {
return "", errors.New("source and destination names are required")
}

path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceName
kv["destination_queue"] = b.destinationName
kv[destination] = b.destinationName
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
Expand Down
Loading