Skip to content

Commit f52c798

Browse files
authored
Broadcast to tmq queues example (#43)
* broadcast example --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d84c3d2 commit f52c798

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88
- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties.
99
- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI)
1010
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
11-
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
11+
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
12+
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
7+
"time"
8+
)
9+
10+
func main() {
11+
12+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
13+
14+
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
15+
amqpConnection, err := env.NewConnection(context.Background())
16+
17+
if err != nil {
18+
rmq.Error("Error opening connection", err)
19+
return
20+
}
21+
const broadcastExchange = "broadcast"
22+
// Create the management interface for the connection
23+
// so we can declare exchanges, queues, and bindings
24+
management := amqpConnection.Management()
25+
_, err = management.DeclareExchange(context.Background(), &rmq.FanOutExchangeSpecification{
26+
Name: broadcastExchange,
27+
})
28+
29+
if err != nil {
30+
rmq.Error("Error declaring exchange", err)
31+
return
32+
}
33+
34+
for i := 0; i < 5; i++ {
35+
// create temp queues
36+
q, err := management.DeclareQueue(context.Background(), &rmq.AutoGeneratedQueueSpecification{
37+
IsAutoDelete: true,
38+
IsExclusive: true,
39+
})
40+
41+
if err != nil {
42+
rmq.Error("Error DeclareQueue", err)
43+
return
44+
}
45+
46+
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
47+
SourceExchange: broadcastExchange,
48+
DestinationQueue: q.Name(),
49+
})
50+
51+
if err != nil {
52+
rmq.Error("Error binding", err)
53+
return
54+
}
55+
56+
go func(idx int) {
57+
consumer, err := amqpConnection.NewConsumer(context.Background(), q.Name(), nil)
58+
if err != nil {
59+
rmq.Error("Error creating consumer", err)
60+
return
61+
}
62+
for {
63+
dcx, err1 := consumer.Receive(context.Background())
64+
if err1 != nil {
65+
rmq.Error("Error receiving message", err)
66+
return
67+
}
68+
rmq.Info("[Consumer]", "index", idx, "msg", fmt.Sprintf("%s", dcx.Message().Data), "[queue]", q.Name())
69+
err1 = dcx.Accept(context.Background())
70+
if err1 != nil {
71+
rmq.Error("Error accepting message", err)
72+
return
73+
}
74+
}
75+
}(i)
76+
}
77+
78+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
79+
Exchange: broadcastExchange,
80+
}, nil)
81+
82+
if err != nil {
83+
rmq.Error("Error creating publisher", err)
84+
return
85+
}
86+
87+
for i := 0; i < 10_000; i++ {
88+
publishResult, err := publisher.Publish(context.Background(),
89+
rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i))))
90+
if err != nil {
91+
rmq.Error("Error publishing message", err)
92+
return
93+
}
94+
95+
switch publishResult.Outcome.(type) {
96+
// publish result
97+
case *rmq.StateAccepted:
98+
rmq.Info("[Publisher] Message accepted", "message", publishResult.Message.GetData())
99+
default:
100+
rmq.Error("[Publisher] Message not accepted", "outcome", publishResult.Outcome)
101+
}
102+
time.Sleep(1 * time.Second)
103+
}
104+
105+
// press any key to close the connection
106+
107+
var input string
108+
_, _ = fmt.Scanln(&input)
109+
110+
}

0 commit comments

Comments
 (0)