Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"os"
"time"
)

func main() {
Expand All @@ -30,28 +31,64 @@ func main() {
management := amqpConnection.Management()
queueSpec := management.Queue("getting_started_queue").
QueueType(mq.QueueType{Type: mq.Quorum}).
MaxLengthBytes(mq.CapacityGB(1)).
DeadLetterExchange("dead-letter-exchange").
DeadLetterRoutingKey("dead-letter-routing-key")
MaxLengthBytes(mq.CapacityGB(1))
exchangeSpec := management.Exchange("getting_started_exchange").
ExchangeType(mq.ExchangeType{Type: mq.Topic})

queueInfo, err := queueSpec.Declare(context.Background())
if err != nil {
fmt.Printf("Error declaring queue %s\n", err)
return
}
fmt.Printf("Queue %s created.\n", queueInfo.GetName())
err = queueSpec.Delete(context.Background())

exchangeInfo, err := exchangeSpec.Declare(context.Background())
if err != nil {
fmt.Printf("Error declaring exchange %s\n", err)
return
}
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())

fmt.Println("Press any key to stop ")
bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key")

err = bindingSpec.Bind(context.Background())
if err != nil {
fmt.Printf("Error binding %s\n", err)
return
}

fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName())

fmt.Println("Press any key to cleanup and exit")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')

err = bindingSpec.Unbind(context.Background())
if err != nil {
fmt.Printf("Error unbinding %s\n", err)
return
}

fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName())

err = exchangeSpec.Delete(context.Background())
if err != nil {
fmt.Printf("Error deleting exchange %s\n", err)
return
}

err = queueSpec.Delete(context.Background())
if err != nil {
return
}
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())

err = amqpConnection.Close(context.Background())
if err != nil {
return
}
fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)
close(chStatusChanged)
}
51 changes: 51 additions & 0 deletions rabbitmq_amqp/amqp_binding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rabbitmq_amqp

import "context"

type AMQPBindingInfo struct {
}

type AMQPBinding struct {
sourceExchangeName string
destinationQueue string
bindingKey string
management *AmqpManagement
}

func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
return &AMQPBinding{management: management}
}

func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
b.bindingKey = bindingKey
return b
}

func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification {
b.sourceExchangeName = exchangeName
return b
}

func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification {
b.destinationQueue = queueName
return b
}

func (b *AMQPBinding) Bind(ctx context.Context) error {

path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceExchangeName
kv["destination_queue"] = b.destinationQueue
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
return err

}

func (b *AMQPBinding) Unbind(ctx context.Context) error {
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey)
_, err := b.management.Request(ctx, nil, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
return err
}
58 changes: 58 additions & 0 deletions rabbitmq_amqp/amqp_binding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rabbitmq_amqp

import (
"context"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("AMQP Bindings test ", func() {

var connection IConnection
var management IManagement
BeforeEach(func() {
connection = NewAmqpConnection()
Expect(connection).NotTo(BeNil())
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := connection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
management = connection.Management()
})

AfterEach(func() {
Expect(connection.Close(context.Background())).To(BeNil())
})

It("AMQP Bindings between Exchange and Queue Should success ", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success"
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success"
exchangeSpec := management.Exchange(exchangeName)
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))

queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.GetName()).To(Equal(queueName))

bindingSpec := management.Binding().SourceExchange(exchangeName).
DestinationQueue(queueName).
Key("routing-key")
err = bindingSpec.Bind(context.TODO())
Expect(err).To(BeNil())
err = bindingSpec.Unbind(context.TODO())
Expect(err).To(BeNil())
err = exchangeSpec.Delete(context.TODO())
Expect(err).To(BeNil())
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())

})

})
35 changes: 25 additions & 10 deletions rabbitmq_amqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@ import (
)

type ConnectionSettings struct {
host string
port int
user string
password string
virtualHost string
scheme string
containerId string
useSsl bool
tlsConfig *tls.Config
host string
port int
user string
password string
virtualHost string
scheme string
containerId string
useSsl bool
tlsConfig *tls.Config
saslMechanism TSaslMechanism
}

func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism {
return c.saslMechanism
}

func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings {
c.saslMechanism = mechanism.Type
return c
}

func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings {
Expand Down Expand Up @@ -138,8 +148,13 @@ func NewAmqpConnection() IConnection {
}

func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error {
// TODO: add support for other SASL types
sASLType := amqp.SASLTypeAnonymous()
switch connectionSettings.GetSaslMechanism() {
case Plain:
sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword())
case External:
sASLType = amqp.SASLTypeExternal("")
}

conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{
ContainerID: connectionSettings.GetContainerId(),
Expand Down
16 changes: 15 additions & 1 deletion rabbitmq_amqp/amqp_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,32 @@ import (
)

var _ = Describe("AMQP Connection Test", func() {
It("AMQP Connection should success", func() {
It("AMQP SASLTypeAnonymous Connection should success", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))

connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
})

It("AMQP SASLTypePlain Connection should success", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))

connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
err := amqpConnection.Open(context.TODO(), connectionSettings)
Expect(err).To(BeNil())
})

It("AMQP Connection should fail due of wrong port", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expand Down
74 changes: 74 additions & 0 deletions rabbitmq_amqp/amqp_exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package rabbitmq_amqp

import "context"

type AmqpExchangeInfo struct {
name string
}

func newAmqpExchangeInfo(name string) IExchangeInfo {
return &AmqpExchangeInfo{name: name}
}

func (a *AmqpExchangeInfo) GetName() string {
return a.name
}

type AmqpExchange struct {
name string
management *AmqpManagement
arguments map[string]any
isAutoDelete bool
exchangeType ExchangeType
}

func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
return &AmqpExchange{management: management,
name: name,
arguments: make(map[string]any),
exchangeType: ExchangeType{Type: Direct},
}
}

func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {

path := exchangePath(e.name)
kv := make(map[string]any)
kv["auto_delete"] = e.isAutoDelete
kv["durable"] = true
kv["type"] = e.exchangeType.String()
kv["arguments"] = e.arguments
_, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409})
if err != nil {
return nil, err
}
return newAmqpExchangeInfo(e.name), nil
}

func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification {
e.isAutoDelete = isAutoDelete
return e
}

func (e *AmqpExchange) IsAutoDelete() bool {
return e.isAutoDelete
}

func (e *AmqpExchange) Delete(ctx context.Context) error {
path := exchangePath(e.name)
_, err := e.management.Request(ctx, nil, path, commandDelete, []int{responseCode204})
return err
}

func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification {
e.exchangeType = exchangeType
return e
}

func (e *AmqpExchange) GetExchangeType() TExchangeType {
return e.exchangeType.Type
}

func (e *AmqpExchange) GetName() string {
return e.name
}
Loading