Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rabbitmq_amqp/amqp_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq_amqp

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down
23 changes: 19 additions & 4 deletions rabbitmq_amqp/amqp_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/google/uuid"
"strconv"
"time"

"github.com/Azure/go-amqp"
"github.com/google/uuid"
)

var ErrPreconditionFailed = errors.New("precondition Failed")
var ErrDoesNotExist = errors.New("does not exist")

type AmqpManagement struct {
session *amqp.Session
Expand Down Expand Up @@ -185,20 +187,33 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return msg.Value.(map[string]any), nil
}

i, _ := strconv.Atoi(*msg.Properties.Subject)
responseCode, _ := strconv.Atoi(*msg.Properties.Subject)

err = a.validateResponseCode(i, expectedResponseCodes)
err = a.validateResponseCode(responseCode, expectedResponseCodes)
if err != nil {
return nil, err
}

if responseCode == responseCode404 {
return nil, ErrDoesNotExist
}

return make(map[string]any), nil
}

func (a *AmqpManagement) Queue(queueName string) IQueueSpecification {
return newAmqpQueue(a, queueName)
}

func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
path := queuePath(queueName)
result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
if err != nil {
return nil, err
}
return newAmqpQueueInfo(result), nil
}

func (a *AmqpManagement) QueueClientName() IQueueSpecification {
return newAmqpQueue(a, "")
}
Expand Down
22 changes: 21 additions & 1 deletion rabbitmq_amqp/amqp_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package rabbitmq_amqp

import (
"context"
"time"

"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"time"
)

var _ = Describe("Management tests", func() {
Expand Down Expand Up @@ -68,4 +70,22 @@ var _ = Describe("Management tests", func() {
Expect(management.Close(context.Background())).To(BeNil())
amqpConnection.Close(context.Background())
})

It("GET on non-existing queue returns ErrDoesNotExist", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))

connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())

management := amqpConnection.Management()
path := "/queues/i-do-not-exist"
result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
Expect(err).To(Equal(ErrDoesNotExist))
Expect(result).To(BeNil())
})
})
34 changes: 30 additions & 4 deletions rabbitmq_amqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})

It("AMQP Queue Declare With Response and Delete should succeed", func() {
It("AMQP Queue Declare With Response and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Response and Delete should succeed"
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
Expand All @@ -40,11 +40,16 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Classic))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Queue Declare With Parameters and Delete should succeed", func() {
It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
queueSpec := management.Queue(queueName).Exclusive(true).
AutoDelete(true).
Expand All @@ -67,11 +72,15 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Declare Quorum Queue and Delete should succeed", func() {
It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
// Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
Expand All @@ -86,11 +95,16 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Quorum))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Declare Stream Queue and Delete should succeed", func() {
It("AMQP Declare Stream Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Stream Queue and Delete should succeed"
// Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
Expand All @@ -105,6 +119,11 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Stream))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
Expand Down Expand Up @@ -160,6 +179,13 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})

It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() {
const queueName = "This queue does not exist"
result, err := management.QueueInfo(context.TODO(), queueName)
Expect(err).To(Equal(ErrDoesNotExist))
Expect(result).To(BeNil())
})
})

// TODO: This should be replaced with this library's publish function
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode404 = 404
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type IManagement interface {
Open(ctx context.Context, connection IConnection) error
Close(ctx context.Context) error
Queue(queueName string) IQueueSpecification
QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error)
Exchange(exchangeName string) IExchangeSpecification
Binding() IBindingSpecification
QueueClientName() IQueueSpecification
Expand Down