Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rabbitmq_amqp/amqp_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq_amqp

import (
"context"

"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -164,6 +165,12 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
return err
}

func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
path := queuePurgePath(a.name)
response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
return int(response["message_count"].(uint64)), err
}

func (a *AmqpQueue) Name(queueName string) IQueueSpecification {
a.name = queueName
return a
Expand Down
39 changes: 39 additions & 0 deletions rabbitmq_amqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package rabbitmq_amqp

import (
"context"
"strconv"

"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -146,4 +149,40 @@ var _ = Describe("AMQP Queue test ", func() {
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
queueSpec := management.Queue(queueName)
_, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
publishMessages(queueName, 10)
purged, err := queueSpec.Purge(context.TODO())
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})
})

// TODO: This should be replaced with this library's publish function
// but for the time being, we need a way to publish messages or test purposes
func publishMessages(queueName string, count int) {
conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil)
if err != nil {
Fail(err.Error())
}
session, err := conn.NewSession(context.TODO(), nil)
if err != nil {
Fail(err.Error())
}
sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil)
if err != nil {
Fail(err.Error())
}

for i := 0; i < count; i++ {
err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil)
if err != nil {
Fail(err.Error())
}
}

}
7 changes: 6 additions & 1 deletion rabbitmq_amqp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"crypto/md5"
"encoding/base64"
"fmt"
"github.com/google/uuid"
"net/url"
"strings"

"github.com/google/uuid"
)

const (
Expand Down Expand Up @@ -69,6 +70,10 @@ func queuePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName)
}

func queuePurgePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName) + "/messages"
}

func exchangePath(exchangeName string) string {
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
}
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type IQueueSpecification interface {
MaxLengthBytes(length int64) IQueueSpecification
DeadLetterExchange(dlx string) IQueueSpecification
DeadLetterRoutingKey(dlrk string) IQueueSpecification
Purge(ctx context.Context) (int, error)
}

// IQueueInfo represents the information of a queue
Expand Down