Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 32 additions & 38 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
@@ -1,93 +1,87 @@
package main

import (
"bufio"
"context"
"fmt"
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"os"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"time"
)

func main() {
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
chStatusChanged := make(chan *mq.StatusChanged, 1)
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)

go func(ch chan *mq.StatusChanged) {
go func(ch chan *rabbitmq_amqp.StatusChanged) {
for statusChanged := range ch {
fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To)
fmt.Printf("%s\n", statusChanged)
}
}(chStatusChanged)

amqpConnection := mq.NewAmqpConnection()
amqpConnection.NotifyStatusChange(chStatusChanged)
err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings())
amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged)
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings())
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
return
}

fmt.Printf("AMQP Connection opened.\n")
management := amqpConnection.Management()
queueSpec := management.Queue("getting_started_queue").
QueueType(mq.QueueType{Type: mq.Quorum}).
MaxLengthBytes(mq.CapacityGB(1))
exchangeSpec := management.Exchange("getting_started_exchange").
ExchangeType(mq.ExchangeType{Type: mq.Topic})

queueInfo, err := queueSpec.Declare(context.Background())
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
Name: "getting-started-exchange",
})
if err != nil {
fmt.Printf("Error declaring queue %s\n", err)
fmt.Printf("Error declaring exchange: %v\n", err)
return
}
fmt.Printf("Queue %s created.\n", queueInfo.GetName())

exchangeInfo, err := exchangeSpec.Declare(context.Background())
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
Name: "getting-started-queue",
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
})

if err != nil {
fmt.Printf("Error declaring exchange %s\n", err)
fmt.Printf("Error declaring queue: %v\n", err)
return
}
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())

bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key")
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
SourceExchange: exchangeInfo.Name(),
DestinationQueue: queueInfo.Name(),
BindingKey: "routing-key",
})

err = bindingSpec.Bind(context.Background())
if err != nil {
fmt.Printf("Error binding %s\n", err)
fmt.Printf("Error binding: %v\n", err)
return
}

fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName())

fmt.Println("Press any key to cleanup and exit")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
err = management.Unbind(context.TODO(), bindingPath)

err = bindingSpec.Unbind(context.Background())
if err != nil {
fmt.Printf("Error unbinding %s\n", err)
fmt.Printf("Error unbinding: %v\n", err)
return
}

fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName())

err = exchangeSpec.Delete(context.Background())
err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
fmt.Printf("Error deleting exchange %s\n", err)
fmt.Printf("Error deleting exchange: %v\n", err)
return
}

err = queueSpec.Delete(context.Background())
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)
return
}
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())

err = amqpConnection.Close(context.Background())
if err != nil {
fmt.Printf("Error closing connection: %v\n", err)
return
}

fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)

close(chStatusChanged)
}
125 changes: 125 additions & 0 deletions rabbitmq_amqp/address_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package rabbitmq_amqp

import (
"errors"
"fmt"
"net/url"
"strings"
)

type AddressBuilder struct {
queue *string
exchange *string
key *string
append *string
}

func NewAddressBuilder() *AddressBuilder {
return &AddressBuilder{}
}

func (a *AddressBuilder) Queue(queue string) *AddressBuilder {
a.queue = &queue
return a
}

func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder {
a.exchange = &exchange
return a
}

func (a *AddressBuilder) Key(key string) *AddressBuilder {
a.key = &key
return a
}

func (a *AddressBuilder) Append(append string) *AddressBuilder {
a.append = &append
return a
}

func (a *AddressBuilder) Address() (string, error) {
if a.exchange == nil && a.queue == nil {
return "", errors.New("exchange or queue must be set")
}

urlAppend := ""
if !isStringNilOrEmpty(a.append) {
urlAppend = *a.append
}
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) {
return "", errors.New("exchange and queue cannot be set together")
}

if !isStringNilOrEmpty(a.exchange) {
if !isStringNilOrEmpty(a.key) {
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil
}
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil
}

if a.queue == nil {
return "", nil
}

if isStringNilOrEmpty(a.queue) {
return "", errors.New("queue must be set")
}

return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil
}

// encodePathSegments takes a string and returns its percent-encoded representation.
func encodePathSegments(input string) string {
var encoded strings.Builder

// Iterate over each character in the input string
for _, char := range input {
// Check if the character is an unreserved character (i.e., it doesn't need encoding)
if isUnreserved(char) {
encoded.WriteRune(char) // Append as is
} else {
// Encode character To %HH format
encoded.WriteString(fmt.Sprintf("%%%02X", char))
}
}

return encoded.String()
}

// Decode takes a percent-encoded string and returns its decoded representation.
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
return "", err
}

return decoded, nil
}

// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
func isUnreserved(char rune) bool {
return (char >= 'A' && char <= 'Z') ||
(char >= 'a' && char <= 'z') ||
(char >= '0' && char <= '9') ||
char == '-' || char == '.' || char == '_' || char == '~'
}

func bindingPath() string {
return "/" + bindings
}

func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string {
sourceNameEncoded := encodePathSegments(sourceName)
destinationNameEncoded := encodePathSegments(destinationName)
keyEncoded := encodePathSegments(key)
destinationType := "dste"
if toQueue {
destinationType = "dstq"
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)

}
78 changes: 78 additions & 0 deletions rabbitmq_amqp/address_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rabbitmq_amqp

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("queue").Exchange("exchange").Key("key")
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})

It("Without exchange and queue should raise and error", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
_, err := addressBuilder.Address()
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})

It("With exchange and key should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange").Key("my_key")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})

It("With exchange should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_exchange")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})

It("With exchange and key with names to encode should return the encoded address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Exchange("my_ exchange/()").Key("my_key ")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})

It("With queue should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue>")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})

It("With queue and append should return address", func() {
addressBuilder := NewAddressBuilder()
Expect(addressBuilder).NotTo(BeNil())
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{}))
addressBuilder.Queue("my_queue").Append("/messages")
address, err := addressBuilder.Address()
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})

})
Loading