Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,51 @@ package main
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"time"
)

func main() {

exchangeName := "getting-started-exchange"
queueName := "getting-started-queue"
routingKey := "routing-key"

fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)

/// Create a channel to receive status change notifications
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
go func(ch chan *rabbitmq_amqp.StatusChanged) {
for statusChanged := range ch {
fmt.Printf("%s\n", statusChanged)
}
}(chStatusChanged)

amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged)
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings())
// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
return
}
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(chStatusChanged)

fmt.Printf("AMQP Connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
Name: "getting-started-exchange",
Name: exchangeName,
})
if err != nil {
fmt.Printf("Error declaring exchange: %v\n", err)
return
}

// Declare a Quorum queue
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
Name: "getting-started-queue",
Name: queueName,
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
})

Expand All @@ -43,17 +56,45 @@ func main() {
return
}

// Bind the queue to the exchange
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
SourceExchange: exchangeInfo.Name(),
DestinationQueue: queueInfo.Name(),
BindingKey: "routing-key",
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})

if err != nil {
fmt.Printf("Error binding: %v\n", err)
return
}

addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)

publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil {
fmt.Printf("Error creating publisher: %v\n", err)
return
}

// Publish a message to the exchange
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
if err != nil {
fmt.Printf("Error publishing message: %v\n", err)
return
}

println("press any key to close the connection")

var input string
_, _ = fmt.Scanln(&input)

// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
return
}
// Unbind the queue from the exchange

err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
Expand All @@ -67,6 +108,14 @@ func main() {
return
}

// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error purging queue: %v\n", err)
return
}
fmt.Printf("Purged %d messages from the queue.\n", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)
Expand Down
78 changes: 0 additions & 78 deletions rabbitmq_amqp/address_builder_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,56 @@ import (
"strings"
)

type AddressBuilder struct {
queue *string
exchange *string
key *string
append *string
}

func NewAddressBuilder() *AddressBuilder {
return &AddressBuilder{}
}

func (a *AddressBuilder) Queue(queue string) *AddressBuilder {
a.queue = &queue
return a
}

func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder {
a.exchange = &exchange
return a
}

func (a *AddressBuilder) Key(key string) *AddressBuilder {
a.key = &key
return a
}

func (a *AddressBuilder) Append(append string) *AddressBuilder {
a.append = &append
return a
}

func (a *AddressBuilder) Address() (string, error) {
if a.exchange == nil && a.queue == nil {
// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
if exchange == nil && queue == nil {
return "", errors.New("exchange or queue must be set")
}

urlAppend := ""
if !isStringNilOrEmpty(a.append) {
urlAppend = *a.append
if !isStringNilOrEmpty(urlParameters) {
urlAppend = *urlParameters
}
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) {
if !isStringNilOrEmpty(exchange) && !isStringNilOrEmpty(queue) {
return "", errors.New("exchange and queue cannot be set together")
}

if !isStringNilOrEmpty(a.exchange) {
if !isStringNilOrEmpty(a.key) {
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil
if !isStringNilOrEmpty(exchange) {
if !isStringNilOrEmpty(key) {
return "/" + exchanges + "/" + encodePathSegments(*exchange) + "/" + encodePathSegments(*key) + urlAppend, nil
}
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil
return "/" + exchanges + "/" + encodePathSegments(*exchange) + urlAppend, nil
}

if a.queue == nil {
if queue == nil {
return "", nil
}

if isStringNilOrEmpty(a.queue) {
if isStringNilOrEmpty(queue) {
return "", errors.New("queue must be set")
}

return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
}

// ExchangeAddress Creates the address for the exchange
// See Address for more information
func ExchangeAddress(exchange, key *string) (string, error) {
return Address(exchange, key, nil, nil)
}

// QueueAddress Creates the address for the queue.
// See Address for more information
func QueueAddress(queue *string) (string, error) {
return Address(nil, nil, queue, nil)
}

// PurgeQueueAddress Creates the address for purging the queue.
// See Address for more information
func PurgeQueueAddress(queue *string) (string, error) {
parameter := "/messages"
return Address(nil, nil, queue, &parameter)
}

// encodePathSegments takes a string and returns its percent-encoded representation.
Expand Down
64 changes: 64 additions & 0 deletions rabbitmq_amqp/address_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package rabbitmq_amqp

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
queue := "my_queue"
exchange := "my_exchange"

_, err := Address(&exchange, nil, &queue, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})

It("Without exchange and queue should raise and error", func() {
_, err := Address(nil, nil, nil, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})

It("With exchange and key should return address", func() {
exchange := "my_exchange"
key := "my_key"

address, err := Address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})

It("With exchange should return address", func() {
exchange := "my_exchange"
address, err := Address(&exchange, nil, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})

It("With exchange and key with names to encode should return the encoded address", func() {

exchange := "my_ exchange/()"
key := "my_key "

address, err := Address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})

It("With queue should return address", func() {
queue := "my_queue>"
address, err := Address(nil, nil, &queue, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})

It("With queue and urlParameters should return address", func() {
queue := "my_queue"
address, err := PurgeQueueAddress(&queue)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})

})
9 changes: 2 additions & 7 deletions rabbitmq_amqp/amqp_binding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
conn, err := Dial(context.TODO(), "amqp://", nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()
})

Expand Down
Loading