diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go index 8dd997c..82b326e 100644 --- a/rabbitmq_amqp/amqp_exchange_test.go +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -2,6 +2,7 @@ package rabbitmq_amqp import ( "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index 2e79b9a..85e8c74 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -4,13 +4,15 @@ import ( "context" "errors" "fmt" - "github.com/Azure/go-amqp" - "github.com/google/uuid" "strconv" "time" + + "github.com/Azure/go-amqp" + "github.com/google/uuid" ) var ErrPreconditionFailed = errors.New("precondition Failed") +var ErrDoesNotExist = errors.New("does not exist") type AmqpManagement struct { session *amqp.Session @@ -185,13 +187,17 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path return msg.Value.(map[string]any), nil } - i, _ := strconv.Atoi(*msg.Properties.Subject) + responseCode, _ := strconv.Atoi(*msg.Properties.Subject) - err = a.validateResponseCode(i, expectedResponseCodes) + err = a.validateResponseCode(responseCode, expectedResponseCodes) if err != nil { return nil, err } + if responseCode == responseCode404 { + return nil, ErrDoesNotExist + } + return make(map[string]any), nil } @@ -199,6 +205,15 @@ func (a *AmqpManagement) Queue(queueName string) IQueueSpecification { return newAmqpQueue(a, queueName) } +func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) { + path := queuePath(queueName) + result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404}) + if err != nil { + return nil, err + } + return newAmqpQueueInfo(result), nil +} + func (a *AmqpManagement) QueueClientName() IQueueSpecification { return newAmqpQueue(a, "") } diff --git a/rabbitmq_amqp/amqp_management_test.go b/rabbitmq_amqp/amqp_management_test.go index 9bd51d2..8b8d840 100644 --- a/rabbitmq_amqp/amqp_management_test.go +++ b/rabbitmq_amqp/amqp_management_test.go @@ -2,9 +2,11 @@ package rabbitmq_amqp import ( "context" + "time" + + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "time" ) var _ = Describe("Management tests", func() { @@ -68,4 +70,22 @@ var _ = Describe("Management tests", func() { Expect(management.Close(context.Background())).To(BeNil()) amqpConnection.Close(context.Background()) }) + + It("GET on non-existing queue returns ErrDoesNotExist", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := amqpConnection.Open(context.Background(), connectionSettings) + Expect(err).To(BeNil()) + + management := amqpConnection.Management() + path := "/queues/i-do-not-exist" + result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404}) + Expect(err).To(Equal(ErrDoesNotExist)) + Expect(result).To(BeNil()) + }) }) diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index 5055ffc..1dfe444 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -29,7 +29,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(connection.Close(context.Background())).To(BeNil()) }) - It("AMQP Queue Declare With Response and Delete should succeed", func() { + It("AMQP Queue Declare With Response and Get/Delete should succeed", func() { const queueName = "AMQP Queue Declare With Response and Delete should succeed" queueSpec := management.Queue(queueName) queueInfo, err := queueSpec.Declare(context.TODO()) @@ -40,11 +40,16 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Classic)) + + // validate GET (query queue info) + queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(queueInfoReceived).To(Equal(queueInfo)) + err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Queue Declare With Parameters and Delete should succeed", func() { + It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() { const queueName = "AMQP Queue Declare With Parameters and Delete should succeed" queueSpec := management.Queue(queueName).Exclusive(true). AutoDelete(true). @@ -67,11 +72,15 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key")) Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000))) + // validate GET (query queue info) + queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(queueInfoReceived).To(Equal(queueInfo)) + err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Declare Quorum Queue and Delete should succeed", func() { + It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() { const queueName = "AMQP Declare Quorum Queue and Delete should succeed" // Quorum queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues @@ -86,11 +95,16 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Quorum)) + + // validate GET (query queue info) + queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(queueInfoReceived).To(Equal(queueInfo)) + err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Declare Stream Queue and Delete should succeed", func() { + It("AMQP Declare Stream Queue and Get/Delete should succeed", func() { const queueName = "AMQP Declare Stream Queue and Delete should succeed" // Stream queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues @@ -105,6 +119,11 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Stream)) + + // validate GET (query queue info) + queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) + Expect(queueInfoReceived).To(Equal(queueInfo)) + err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) @@ -160,6 +179,13 @@ var _ = Describe("AMQP Queue test ", func() { Expect(err).To(BeNil()) Expect(purged).To(Equal(10)) }) + + It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() { + const queueName = "This queue does not exist" + result, err := management.QueueInfo(context.TODO(), queueName) + Expect(err).To(Equal(ErrDoesNotExist)) + Expect(result).To(BeNil()) + }) }) // TODO: This should be replaced with this library's publish function diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go index 201cb2b..a270b98 100644 --- a/rabbitmq_amqp/common.go +++ b/rabbitmq_amqp/common.go @@ -14,6 +14,7 @@ const ( responseCode200 = 200 responseCode201 = 201 responseCode204 = 204 + responseCode404 = 404 responseCode409 = 409 commandPut = "PUT" commandGet = "GET" diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go index 0f2226e..e28084b 100644 --- a/rabbitmq_amqp/management.go +++ b/rabbitmq_amqp/management.go @@ -8,6 +8,7 @@ type IManagement interface { Open(ctx context.Context, connection IConnection) error Close(ctx context.Context) error Queue(queueName string) IQueueSpecification + QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) Exchange(exchangeName string) IExchangeSpecification Binding() IBindingSpecification QueueClientName() IQueueSpecification