Skip to content

Commit e662d95

Browse files
authored
Add AmqpQueue.Purge() (#14)
1 parent 95d4df6 commit e662d95

File tree

4 files changed

+53
-1
lines changed

4 files changed

+53
-1
lines changed

rabbitmq_amqp/amqp_queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
56
"github.com/Azure/go-amqp"
67
)
78

@@ -164,6 +165,12 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
164165
return err
165166
}
166167

168+
func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
169+
path := queuePurgePath(a.name)
170+
response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
171+
return int(response["message_count"].(uint64)), err
172+
}
173+
167174
func (a *AmqpQueue) Name(queueName string) IQueueSpecification {
168175
a.name = queueName
169176
return a

rabbitmq_amqp/amqp_queue_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
"strconv"
6+
7+
"github.com/Azure/go-amqp"
58
. "github.com/onsi/ginkgo/v2"
69
. "github.com/onsi/gomega"
710
)
@@ -146,4 +149,40 @@ var _ = Describe("AMQP Queue test ", func() {
146149
err = queueSpec.Delete(context.TODO())
147150
Expect(err).To(BeNil())
148151
})
152+
153+
It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
154+
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
155+
queueSpec := management.Queue(queueName)
156+
_, err := queueSpec.Declare(context.TODO())
157+
Expect(err).To(BeNil())
158+
publishMessages(queueName, 10)
159+
purged, err := queueSpec.Purge(context.TODO())
160+
Expect(err).To(BeNil())
161+
Expect(purged).To(Equal(10))
162+
})
149163
})
164+
165+
// TODO: This should be replaced with this library's publish function
166+
// but for the time being, we need a way to publish messages or test purposes
167+
func publishMessages(queueName string, count int) {
168+
conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil)
169+
if err != nil {
170+
Fail(err.Error())
171+
}
172+
session, err := conn.NewSession(context.TODO(), nil)
173+
if err != nil {
174+
Fail(err.Error())
175+
}
176+
sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil)
177+
if err != nil {
178+
Fail(err.Error())
179+
}
180+
181+
for i := 0; i < count; i++ {
182+
err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil)
183+
if err != nil {
184+
Fail(err.Error())
185+
}
186+
}
187+
188+
}

rabbitmq_amqp/common.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"crypto/md5"
55
"encoding/base64"
66
"fmt"
7-
"github.com/google/uuid"
87
"net/url"
98
"strings"
9+
10+
"github.com/google/uuid"
1011
)
1112

1213
const (
@@ -69,6 +70,10 @@ func queuePath(queueName string) string {
6970
return "/" + queues + "/" + encodePathSegments(queueName)
7071
}
7172

73+
func queuePurgePath(queueName string) string {
74+
return "/" + queues + "/" + encodePathSegments(queueName) + "/messages"
75+
}
76+
7277
func exchangePath(exchangeName string) string {
7378
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
7479
}

rabbitmq_amqp/entities.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type IQueueSpecification interface {
3737
MaxLengthBytes(length int64) IQueueSpecification
3838
DeadLetterExchange(dlx string) IQueueSpecification
3939
DeadLetterRoutingKey(dlrk string) IQueueSpecification
40+
Purge(ctx context.Context) (int, error)
4041
}
4142

4243
// IQueueInfo represents the information of a queue

0 commit comments

Comments
 (0)