Skip to content

Commit 1a6679a

Browse files
authored
Implement publisher (#16)
* Implement publisher * API refactor --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 60e006b commit 1a6679a

20 files changed

+759
-356
lines changed

examples/getting_started/main.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,51 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"github.com/Azure/go-amqp"
67
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
78
"time"
89
)
910

1011
func main() {
12+
13+
exchangeName := "getting-started-exchange"
14+
queueName := "getting-started-queue"
15+
routingKey := "routing-key"
16+
1117
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
12-
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
1318

19+
/// Create a channel to receive status change notifications
20+
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
1421
go func(ch chan *rabbitmq_amqp.StatusChanged) {
1522
for statusChanged := range ch {
1623
fmt.Printf("%s\n", statusChanged)
1724
}
1825
}(chStatusChanged)
1926

20-
amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged)
21-
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings())
27+
// Open a connection to the AMQP 1.0 server
28+
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
2229
if err != nil {
2330
fmt.Printf("Error opening connection: %v\n", err)
2431
return
2532
}
33+
// Register the channel to receive status change notifications
34+
amqpConnection.NotifyStatusChange(chStatusChanged)
35+
2636
fmt.Printf("AMQP Connection opened.\n")
37+
// Create the management interface for the connection
38+
// so we can declare exchanges, queues, and bindings
2739
management := amqpConnection.Management()
2840
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
29-
Name: "getting-started-exchange",
41+
Name: exchangeName,
3042
})
3143
if err != nil {
3244
fmt.Printf("Error declaring exchange: %v\n", err)
3345
return
3446
}
3547

48+
// Declare a Quorum queue
3649
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
37-
Name: "getting-started-queue",
50+
Name: queueName,
3851
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
3952
})
4053

@@ -43,17 +56,45 @@ func main() {
4356
return
4457
}
4558

59+
// Bind the queue to the exchange
4660
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
47-
SourceExchange: exchangeInfo.Name(),
48-
DestinationQueue: queueInfo.Name(),
49-
BindingKey: "routing-key",
61+
SourceExchange: exchangeName,
62+
DestinationQueue: queueName,
63+
BindingKey: routingKey,
5064
})
5165

5266
if err != nil {
5367
fmt.Printf("Error binding: %v\n", err)
5468
return
5569
}
5670

71+
addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
72+
73+
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
74+
if err != nil {
75+
fmt.Printf("Error creating publisher: %v\n", err)
76+
return
77+
}
78+
79+
// Publish a message to the exchange
80+
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
81+
if err != nil {
82+
fmt.Printf("Error publishing message: %v\n", err)
83+
return
84+
}
85+
86+
println("press any key to close the connection")
87+
88+
var input string
89+
_, _ = fmt.Scanln(&input)
90+
91+
// Close the publisher
92+
err = publisher.Close(context.Background())
93+
if err != nil {
94+
return
95+
}
96+
// Unbind the queue from the exchange
97+
5798
err = management.Unbind(context.TODO(), bindingPath)
5899

59100
if err != nil {
@@ -67,6 +108,14 @@ func main() {
67108
return
68109
}
69110

111+
// Purge the queue
112+
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
113+
if err != nil {
114+
fmt.Printf("Error purging queue: %v\n", err)
115+
return
116+
}
117+
fmt.Printf("Purged %d messages from the queue.\n", purged)
118+
70119
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
71120
if err != nil {
72121
fmt.Printf("Error deleting queue: %v\n", err)

rabbitmq_amqp/address_builder_test.go

Lines changed: 0 additions & 78 deletions
This file was deleted.

rabbitmq_amqp/address_builder.go renamed to rabbitmq_amqp/address_helper.go

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,66 +7,56 @@ import (
77
"strings"
88
)
99

10-
type AddressBuilder struct {
11-
queue *string
12-
exchange *string
13-
key *string
14-
append *string
15-
}
16-
17-
func NewAddressBuilder() *AddressBuilder {
18-
return &AddressBuilder{}
19-
}
20-
21-
func (a *AddressBuilder) Queue(queue string) *AddressBuilder {
22-
a.queue = &queue
23-
return a
24-
}
25-
26-
func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder {
27-
a.exchange = &exchange
28-
return a
29-
}
30-
31-
func (a *AddressBuilder) Key(key string) *AddressBuilder {
32-
a.key = &key
33-
return a
34-
}
35-
36-
func (a *AddressBuilder) Append(append string) *AddressBuilder {
37-
a.append = &append
38-
return a
39-
}
40-
41-
func (a *AddressBuilder) Address() (string, error) {
42-
if a.exchange == nil && a.queue == nil {
10+
// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
11+
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
12+
func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
13+
if exchange == nil && queue == nil {
4314
return "", errors.New("exchange or queue must be set")
4415
}
4516

4617
urlAppend := ""
47-
if !isStringNilOrEmpty(a.append) {
48-
urlAppend = *a.append
18+
if !isStringNilOrEmpty(urlParameters) {
19+
urlAppend = *urlParameters
4920
}
50-
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) {
21+
if !isStringNilOrEmpty(exchange) && !isStringNilOrEmpty(queue) {
5122
return "", errors.New("exchange and queue cannot be set together")
5223
}
5324

54-
if !isStringNilOrEmpty(a.exchange) {
55-
if !isStringNilOrEmpty(a.key) {
56-
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil
25+
if !isStringNilOrEmpty(exchange) {
26+
if !isStringNilOrEmpty(key) {
27+
return "/" + exchanges + "/" + encodePathSegments(*exchange) + "/" + encodePathSegments(*key) + urlAppend, nil
5728
}
58-
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil
29+
return "/" + exchanges + "/" + encodePathSegments(*exchange) + urlAppend, nil
5930
}
6031

61-
if a.queue == nil {
32+
if queue == nil {
6233
return "", nil
6334
}
6435

65-
if isStringNilOrEmpty(a.queue) {
36+
if isStringNilOrEmpty(queue) {
6637
return "", errors.New("queue must be set")
6738
}
6839

69-
return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil
40+
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
41+
}
42+
43+
// ExchangeAddress Creates the address for the exchange
44+
// See Address for more information
45+
func ExchangeAddress(exchange, key *string) (string, error) {
46+
return Address(exchange, key, nil, nil)
47+
}
48+
49+
// QueueAddress Creates the address for the queue.
50+
// See Address for more information
51+
func QueueAddress(queue *string) (string, error) {
52+
return Address(nil, nil, queue, nil)
53+
}
54+
55+
// PurgeQueueAddress Creates the address for purging the queue.
56+
// See Address for more information
57+
func PurgeQueueAddress(queue *string) (string, error) {
58+
parameter := "/messages"
59+
return Address(nil, nil, queue, &parameter)
7060
}
7161

7262
// encodePathSegments takes a string and returns its percent-encoded representation.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package rabbitmq_amqp
2+
3+
import (
4+
. "github.com/onsi/ginkgo/v2"
5+
. "github.com/onsi/gomega"
6+
)
7+
8+
var _ = Describe("Address builder test ", func() {
9+
It("With exchange, queue and key should raise and error", func() {
10+
queue := "my_queue"
11+
exchange := "my_exchange"
12+
13+
_, err := Address(&exchange, nil, &queue, nil)
14+
Expect(err).NotTo(BeNil())
15+
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
16+
})
17+
18+
It("Without exchange and queue should raise and error", func() {
19+
_, err := Address(nil, nil, nil, nil)
20+
Expect(err).NotTo(BeNil())
21+
Expect(err.Error()).To(Equal("exchange or queue must be set"))
22+
})
23+
24+
It("With exchange and key should return address", func() {
25+
exchange := "my_exchange"
26+
key := "my_key"
27+
28+
address, err := Address(&exchange, &key, nil, nil)
29+
Expect(err).To(BeNil())
30+
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
31+
})
32+
33+
It("With exchange should return address", func() {
34+
exchange := "my_exchange"
35+
address, err := Address(&exchange, nil, nil, nil)
36+
Expect(err).To(BeNil())
37+
Expect(address).To(Equal("/exchanges/my_exchange"))
38+
})
39+
40+
It("With exchange and key with names to encode should return the encoded address", func() {
41+
42+
exchange := "my_ exchange/()"
43+
key := "my_key "
44+
45+
address, err := Address(&exchange, &key, nil, nil)
46+
Expect(err).To(BeNil())
47+
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
48+
})
49+
50+
It("With queue should return address", func() {
51+
queue := "my_queue>"
52+
address, err := Address(nil, nil, &queue, nil)
53+
Expect(err).To(BeNil())
54+
Expect(address).To(Equal("/queues/my_queue%3E"))
55+
})
56+
57+
It("With queue and urlParameters should return address", func() {
58+
queue := "my_queue"
59+
address, err := PurgeQueueAddress(&queue)
60+
Expect(err).To(BeNil())
61+
Expect(address).To(Equal("/queues/my_queue/messages"))
62+
})
63+
64+
})

rabbitmq_amqp/amqp_binding_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,9 @@ var _ = Describe("AMQP Bindings test ", func() {
1010
var connection IConnection
1111
var management IManagement
1212
BeforeEach(func() {
13-
connection = NewAmqpConnection()
14-
Expect(connection).NotTo(BeNil())
15-
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
16-
connectionSettings := NewConnectionSettings()
17-
Expect(connectionSettings).NotTo(BeNil())
18-
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
19-
err := connection.Open(context.TODO(), connectionSettings)
13+
conn, err := Dial(context.TODO(), "amqp://", nil)
2014
Expect(err).To(BeNil())
15+
connection = conn
2116
management = connection.Management()
2217
})
2318

0 commit comments

Comments
 (0)