Skip to content

Commit e83ea2b

Browse files
committed
Add support for querying queue info
Request `GET /queues/:queue` and return the details or ErrDoesNotExist
1 parent 6d8aaeb commit e83ea2b

File tree

6 files changed

+77
-10
lines changed

6 files changed

+77
-10
lines changed

rabbitmq_amqp/amqp_exchange_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
56
. "github.com/onsi/ginkgo/v2"
67
. "github.com/onsi/gomega"
78
)

rabbitmq_amqp/amqp_management.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"github.com/Azure/go-amqp"
8-
"github.com/google/uuid"
97
"strconv"
108
"time"
9+
10+
"github.com/Azure/go-amqp"
11+
"github.com/google/uuid"
1112
)
1213

1314
var ErrPreconditionFailed = errors.New("precondition Failed")
15+
var ErrDoesNotExist = errors.New("does not exist")
1416

1517
type AmqpManagement struct {
1618
session *amqp.Session
@@ -185,20 +187,33 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
185187
return msg.Value.(map[string]any), nil
186188
}
187189

188-
i, _ := strconv.Atoi(*msg.Properties.Subject)
190+
responseCode, _ := strconv.Atoi(*msg.Properties.Subject)
189191

190-
err = a.validateResponseCode(i, expectedResponseCodes)
192+
err = a.validateResponseCode(responseCode, expectedResponseCodes)
191193
if err != nil {
192194
return nil, err
193195
}
194196

197+
if responseCode == responseCode404 {
198+
return nil, ErrDoesNotExist
199+
}
200+
195201
return make(map[string]any), nil
196202
}
197203

198204
func (a *AmqpManagement) Queue(queueName string) IQueueSpecification {
199205
return newAmqpQueue(a, queueName)
200206
}
201207

208+
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
209+
path := queuePath(queueName)
210+
result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
211+
if err != nil {
212+
return nil, err
213+
}
214+
return newAmqpQueueInfo(result), nil
215+
}
216+
202217
func (a *AmqpManagement) QueueClientName() IQueueSpecification {
203218
return newAmqpQueue(a, "")
204219
}

rabbitmq_amqp/amqp_management_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
"time"
6+
7+
"github.com/Azure/go-amqp"
58
. "github.com/onsi/ginkgo/v2"
69
. "github.com/onsi/gomega"
7-
"time"
810
)
911

1012
var _ = Describe("Management tests", func() {
@@ -68,4 +70,22 @@ var _ = Describe("Management tests", func() {
6870
Expect(management.Close(context.Background())).To(BeNil())
6971
amqpConnection.Close(context.Background())
7072
})
73+
74+
It("GET on non-existing queue returns ErrDoesNotExist", func() {
75+
amqpConnection := NewAmqpConnection()
76+
Expect(amqpConnection).NotTo(BeNil())
77+
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
78+
79+
connectionSettings := NewConnectionSettings()
80+
Expect(connectionSettings).NotTo(BeNil())
81+
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
82+
err := amqpConnection.Open(context.Background(), connectionSettings)
83+
Expect(err).To(BeNil())
84+
85+
management := amqpConnection.Management()
86+
path := "/queues/i-do-not-exist"
87+
result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
88+
Expect(err).To(Equal(ErrDoesNotExist))
89+
Expect(result).To(BeNil())
90+
})
7191
})

rabbitmq_amqp/amqp_queue_test.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmq_amqp
22

33
import (
44
"context"
5+
56
. "github.com/onsi/ginkgo/v2"
67
. "github.com/onsi/gomega"
78
)
@@ -26,7 +27,7 @@ var _ = Describe("AMQP Queue test ", func() {
2627
Expect(connection.Close(context.Background())).To(BeNil())
2728
})
2829

29-
It("AMQP Queue Declare With Response and Delete should succeed", func() {
30+
It("AMQP Queue Declare With Response and Get/Delete should succeed", func() {
3031
const queueName = "AMQP Queue Declare With Response and Delete should succeed"
3132
queueSpec := management.Queue(queueName)
3233
queueInfo, err := queueSpec.Declare(context.TODO())
@@ -37,11 +38,16 @@ var _ = Describe("AMQP Queue test ", func() {
3738
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
3839
Expect(queueInfo.IsExclusive()).To(BeFalse())
3940
Expect(queueInfo.Type()).To(Equal(Classic))
41+
42+
// validate GET (query queue info)
43+
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
44+
Expect(queueInfoReceived).To(Equal(queueInfo))
45+
4046
err = queueSpec.Delete(context.TODO())
4147
Expect(err).To(BeNil())
4248
})
4349

44-
It("AMQP Queue Declare With Parameters and Delete should succeed", func() {
50+
It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() {
4551
const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
4652
queueSpec := management.Queue(queueName).Exclusive(true).
4753
AutoDelete(true).
@@ -64,11 +70,15 @@ var _ = Describe("AMQP Queue test ", func() {
6470
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
6571
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))
6672

73+
// validate GET (query queue info)
74+
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
75+
Expect(queueInfoReceived).To(Equal(queueInfo))
76+
6777
err = queueSpec.Delete(context.TODO())
6878
Expect(err).To(BeNil())
6979
})
7080

71-
It("AMQP Declare Quorum Queue and Delete should succeed", func() {
81+
It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() {
7282
const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
7383
// Quorum queue will ignore Exclusive and AutoDelete settings
7484
// since they are not supported by quorum queues
@@ -83,11 +93,16 @@ var _ = Describe("AMQP Queue test ", func() {
8393
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
8494
Expect(queueInfo.IsExclusive()).To(BeFalse())
8595
Expect(queueInfo.Type()).To(Equal(Quorum))
96+
97+
// validate GET (query queue info)
98+
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
99+
Expect(queueInfoReceived).To(Equal(queueInfo))
100+
86101
err = queueSpec.Delete(context.TODO())
87102
Expect(err).To(BeNil())
88103
})
89104

90-
It("AMQP Declare Stream Queue and Delete should succeed", func() {
105+
It("AMQP Declare Stream Queue and Get/Delete should succeed", func() {
91106
const queueName = "AMQP Declare Stream Queue and Delete should succeed"
92107
// Stream queue will ignore Exclusive and AutoDelete settings
93108
// since they are not supported by quorum queues
@@ -102,6 +117,11 @@ var _ = Describe("AMQP Queue test ", func() {
102117
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
103118
Expect(queueInfo.IsExclusive()).To(BeFalse())
104119
Expect(queueInfo.Type()).To(Equal(Stream))
120+
121+
// validate GET (query queue info)
122+
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
123+
Expect(queueInfoReceived).To(Equal(queueInfo))
124+
105125
err = queueSpec.Delete(context.TODO())
106126
Expect(err).To(BeNil())
107127
})
@@ -146,4 +166,12 @@ var _ = Describe("AMQP Queue test ", func() {
146166
err = queueSpec.Delete(context.TODO())
147167
Expect(err).To(BeNil())
148168
})
169+
170+
It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() {
171+
const queueName = "This queue does not exist"
172+
result, err := management.QueueInfo(context.TODO(), queueName)
173+
Expect(err).To(Equal(ErrDoesNotExist))
174+
Expect(result).To(BeNil())
175+
})
176+
149177
})

rabbitmq_amqp/common.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ import (
44
"crypto/md5"
55
"encoding/base64"
66
"fmt"
7-
"github.com/google/uuid"
87
"net/url"
98
"strings"
9+
10+
"github.com/google/uuid"
1011
)
1112

1213
const (
1314
responseCode200 = 200
1415
responseCode201 = 201
1516
responseCode204 = 204
17+
responseCode404 = 404
1618
responseCode409 = 409
1719
commandPut = "PUT"
1820
commandGet = "GET"

rabbitmq_amqp/management.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ type IManagement interface {
88
Open(ctx context.Context, connection IConnection) error
99
Close(ctx context.Context) error
1010
Queue(queueName string) IQueueSpecification
11+
QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error)
1112
Exchange(exchangeName string) IExchangeSpecification
1213
Binding() IBindingSpecification
1314
QueueClientName() IQueueSpecification

0 commit comments

Comments
 (0)