diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index 81b5462..d146a90 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -2,6 +2,7 @@ package rabbitmq_amqp import ( "context" + "github.com/Azure/go-amqp" ) @@ -164,6 +165,12 @@ func (a *AmqpQueue) Delete(ctx context.Context) error { return err } +func (a *AmqpQueue) Purge(ctx context.Context) (int, error) { + path := queuePurgePath(a.name) + response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200}) + return int(response["message_count"].(uint64)), err +} + func (a *AmqpQueue) Name(queueName string) IQueueSpecification { a.name = queueName return a diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index 2423ae7..5055ffc 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -2,6 +2,9 @@ package rabbitmq_amqp import ( "context" + "strconv" + + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -146,4 +149,40 @@ var _ = Describe("AMQP Queue test ", func() { err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) + + It("AMQP Purge Queue should succeed and return the number of messages purged", func() { + const queueName = "AMQP Purge Queue should succeed and return the number of messages purged" + queueSpec := management.Queue(queueName) + _, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + publishMessages(queueName, 10) + purged, err := queueSpec.Purge(context.TODO()) + Expect(err).To(BeNil()) + Expect(purged).To(Equal(10)) + }) }) + +// TODO: This should be replaced with this library's publish function +// but for the time being, we need a way to publish messages or test purposes +func publishMessages(queueName string, count int) { + conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil) + if err != nil { + Fail(err.Error()) + } + session, err := conn.NewSession(context.TODO(), nil) + if err != nil { + Fail(err.Error()) + } + sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil) + if err != nil { + Fail(err.Error()) + } + + for i := 0; i < count; i++ { + err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil) + if err != nil { + Fail(err.Error()) + } + } + +} diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go index a19b9ac..201cb2b 100644 --- a/rabbitmq_amqp/common.go +++ b/rabbitmq_amqp/common.go @@ -4,9 +4,10 @@ import ( "crypto/md5" "encoding/base64" "fmt" - "github.com/google/uuid" "net/url" "strings" + + "github.com/google/uuid" ) const ( @@ -69,6 +70,10 @@ func queuePath(queueName string) string { return "/" + queues + "/" + encodePathSegments(queueName) } +func queuePurgePath(queueName string) string { + return "/" + queues + "/" + encodePathSegments(queueName) + "/messages" +} + func exchangePath(exchangeName string) string { return "/" + exchanges + "/" + encodePathSegments(exchangeName) } diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go index 13d3d9f..4ea0c4c 100644 --- a/rabbitmq_amqp/entities.go +++ b/rabbitmq_amqp/entities.go @@ -37,6 +37,7 @@ type IQueueSpecification interface { MaxLengthBytes(length int64) IQueueSpecification DeadLetterExchange(dlx string) IQueueSpecification DeadLetterRoutingKey(dlrk string) IQueueSpecification + Purge(ctx context.Context) (int, error) } // IQueueInfo represents the information of a queue