Skip to content

Commit 72ac394

Browse files
authored
Add examples (#27)
* Add examples --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 031a2ac commit 72ac394

File tree

10 files changed

+343
-25
lines changed

10 files changed

+343
-25
lines changed

README.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
# RabbitMQ AMQP 1.0 .Golang Client
22

3-
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
3+
This library is meant to be used with RabbitMQ 4.0.
4+
Suitable for testing in pre-production environments.
45

5-
## How to Run
6-
7-
- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
8-
- `make test` to run the tests
9-
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`
106

117
## Getting Started
128

13-
You can find an example in: `docs/examples/getting_started`
9+
- [Getting_started](docs/examples/getting_started)
10+
- [Examples](docs/examples)
1411

15-
## Examples
12+
## Build from source
1613

17-
You can find more examples in: `docs/examples`
14+
- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
15+
- `make test` to run the tests
16+
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`
1817

1918

docs/examples/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
### AMQP 1.0 .Golang Client Examples
2+
3+
4+
- [Getting Started](getting_started) - A simple example to get you started.
5+
- [Reliable](reliable) - An example of how to deal with reconnections and error handling.
6+
- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0
7+
- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions)
8+
- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties.

docs/examples/getting_started/main.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
)
1111

1212
func main() {
13-
exchangeName := "getting-started-exchange"
14-
queueName := "getting-started-queue"
13+
exchangeName := "getting-started-go-exchange"
14+
queueName := "getting-started-go-queue"
1515
routingKey := "routing-key"
1616

1717
rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")
@@ -24,16 +24,22 @@ func main() {
2424
}
2525
}(stateChanged)
2626

27-
// Open a connection to the AMQP 1.0 server
28-
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil)
27+
// rabbitmq_amqp.NewEnvironment setups the environment.
28+
// The environment is used to create connections
29+
// given the same parameters
30+
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
31+
32+
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
33+
amqpConnection, err := env.NewConnection(context.Background())
2934
if err != nil {
3035
rabbitmq_amqp.Error("Error opening connection", err)
3136
return
3237
}
3338
// Register the channel to receive status change notifications
39+
// this is valid for the connection lifecycle
3440
amqpConnection.NotifyStatusChange(stateChanged)
3541

36-
fmt.Printf("AMQP connection opened.\n")
42+
rabbitmq_amqp.Info("AMQP connection opened.\n")
3743
// Create the management interface for the connection
3844
// so we can declare exchanges, queues, and bindings
3945
management := amqpConnection.Management()
@@ -165,37 +171,39 @@ func main() {
165171
err = management.Unbind(context.TODO(), bindingPath)
166172

167173
if err != nil {
168-
fmt.Printf("Error unbinding: %v\n", err)
174+
rabbitmq_amqp.Error("Error unbinding: %v\n", err)
169175
return
170176
}
171177

172178
err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
173179
if err != nil {
174-
fmt.Printf("Error deleting exchange: %v\n", err)
180+
rabbitmq_amqp.Error("Error deleting exchange: %v\n", err)
175181
return
176182
}
177183

178184
// Purge the queue
179185
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
180186
if err != nil {
181-
fmt.Printf("Error purging queue: %v\n", err)
187+
rabbitmq_amqp.Error("Error purging queue: %v\n", err)
182188
return
183189
}
184-
fmt.Printf("Purged %d messages from the queue.\n", purged)
190+
rabbitmq_amqp.Info("Purged %d messages from the queue.\n", purged)
185191

186192
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
187193
if err != nil {
188-
fmt.Printf("Error deleting queue: %v\n", err)
194+
rabbitmq_amqp.Error("Error deleting queue: %v\n", err)
189195
return
190196
}
191197

192-
err = amqpConnection.Close(context.Background())
198+
// Close all the connections. but you can still use the environment
199+
// to create new connections
200+
err = env.CloseConnections(context.Background())
193201
if err != nil {
194-
fmt.Printf("Error closing connection: %v\n", err)
202+
rabbitmq_amqp.Error("Error closing connection: %v\n", err)
195203
return
196204
}
197205

198-
fmt.Printf("AMQP connection closed.\n")
206+
rabbitmq_amqp.Info("AMQP connection closed.\n")
199207
// not necessary. It waits for the status change to be printed
200208
time.Sleep(100 * time.Millisecond)
201209
close(stateChanged)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"github.com/Azure/go-amqp"
6+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
7+
)
8+
9+
func checkError(err error) {
10+
if err != nil {
11+
rabbitmq_amqp.Error("Error", err)
12+
// it should not happen for the example
13+
// so panic just to make sure we catch it
14+
panic(err)
15+
}
16+
}
17+
func main() {
18+
19+
rabbitmq_amqp.Info("Define the publisher message targets")
20+
21+
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
22+
amqpConnection, err := env.NewConnection(context.Background())
23+
checkError(err)
24+
queues := []string{"queue1", "queue2", "queue3"}
25+
management := amqpConnection.Management()
26+
for _, queue := range queues {
27+
_, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
28+
Name: queue,
29+
})
30+
checkError(err)
31+
}
32+
33+
// create a publisher without a target
34+
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, "stream-publisher")
35+
checkError(err)
36+
37+
// publish messages to the stream
38+
for i := 0; i < 12; i++ {
39+
40+
// with this helper function we create a message with a target
41+
// that is the same to create a message with:
42+
// msg := amqp.NewMessage([]byte("hello"))
43+
// MessageToAddressHelper(msg, &QueueAddress{Queue: qName})
44+
// same like:
45+
// msg := amqp.NewMessage([]byte("hello"))
46+
// msg.Properties = &amqp.MessageProperties{}
47+
// msg.Properties.To = &address
48+
// NewMessageToAddress and MessageToAddressHelper helpers are provided to make the
49+
// code more readable and easier to use
50+
msg, err := rabbitmq_amqp.NewMessageToAddress([]byte("Hello World"),
51+
&rabbitmq_amqp.QueueAddress{Queue: queues[i%3]})
52+
checkError(err)
53+
publishResult, err := publisher.Publish(context.Background(), msg)
54+
checkError(err)
55+
switch publishResult.Outcome.(type) {
56+
case *amqp.StateAccepted:
57+
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
58+
break
59+
default:
60+
rabbitmq_amqp.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
61+
}
62+
}
63+
64+
// check the UI, you should see 4 messages in each queue
65+
66+
// Close the publisher
67+
err = publisher.Close(context.Background())
68+
checkError(err)
69+
err = env.CloseConnections(context.Background())
70+
checkError(err)
71+
}

docs/examples/streams/streams.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"github.com/Azure/go-amqp"
6+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
7+
"time"
8+
)
9+
10+
func checkError(err error) {
11+
if err != nil {
12+
rabbitmq_amqp.Error("Error", err)
13+
// it should not happen for the example
14+
// so panic just to make sure we catch it
15+
panic(err)
16+
}
17+
}
18+
19+
func main() {
20+
21+
rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example")
22+
queueStream := "stream-go-queue-" + time.Now().String()
23+
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
24+
amqpConnection, err := env.NewConnection(context.Background())
25+
checkError(err)
26+
management := amqpConnection.Management()
27+
// define a stream queue
28+
_, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{
29+
Name: queueStream,
30+
// it is a best practice to set the max length of the stream
31+
// to avoid the stream to grow indefinitely
32+
// the value here is low just for the example
33+
MaxLengthBytes: rabbitmq_amqp.CapacityGB(5),
34+
})
35+
checkError(err)
36+
37+
// create a stream publisher. In this case we use the QueueAddress to make the example
38+
// simple. So we use the default exchange here.
39+
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher")
40+
checkError(err)
41+
42+
// publish messages to the stream
43+
for i := 0; i < 10; i++ {
44+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello World")))
45+
checkError(err)
46+
47+
// check the outcome of the publishResult
48+
switch publishResult.Outcome.(type) {
49+
case *amqp.StateAccepted:
50+
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
51+
break
52+
case *amqp.StateReleased:
53+
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
54+
break
55+
case *amqp.StateRejected:
56+
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
57+
stateType := publishResult.Outcome.(*amqp.StateRejected)
58+
if stateType.Error != nil {
59+
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
60+
}
61+
break
62+
default:
63+
// these status are not supported. Leave it for AMQP 1.0 compatibility
64+
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
65+
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
66+
}
67+
68+
}
69+
// create a stream consumer
70+
consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{
71+
// the offset is set to the first chunk of the stream
72+
// so here it starts from the beginning
73+
Offset: &rabbitmq_amqp.OffsetFirst{},
74+
})
75+
checkError(err)
76+
77+
// receive messages from the stream
78+
for i := 0; i < 10; i++ {
79+
deliveryContext, err := consumer.Receive(context.Background())
80+
checkError(err)
81+
rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0])
82+
// accept the message
83+
err = deliveryContext.Accept(context.Background())
84+
checkError(err)
85+
}
86+
87+
// close the consumer
88+
err = consumer.Close(context.Background())
89+
checkError(err)
90+
91+
err = amqpConnection.Management().DeleteQueue(context.Background(), queueStream)
92+
checkError(err)
93+
94+
err = env.CloseConnections(context.Background())
95+
checkError(err)
96+
97+
rabbitmq_amqp.Info("Example completed")
98+
}

0 commit comments

Comments
 (0)