Skip to content

Commit 1686a9c

Browse files
committed
tmq queues example
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d84c3d2 commit 1686a9c

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
7+
"time"
8+
)
9+
10+
func main() {
11+
12+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
13+
14+
// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
15+
amqpConnection, err := env.NewConnection(context.Background())
16+
17+
if err != nil {
18+
rmq.Error("Error opening connection", err)
19+
return
20+
}
21+
22+
// Create the management interface for the connection
23+
// so we can declare exchanges, queues, and bindings
24+
management := amqpConnection.Management()
25+
_, err = management.DeclareExchange(context.Background(), &rmq.FanOutExchangeSpecification{
26+
Name: "broadcast",
27+
})
28+
29+
if err != nil {
30+
rmq.Error("Error declaring exchange", err)
31+
return
32+
}
33+
34+
// Declare a temp queues
35+
36+
tmpQueueInfo1, err := management.DeclareQueue(context.Background(), &rmq.AutoGeneratedQueueSpecification{
37+
IsAutoDelete: true,
38+
IsExclusive: true,
39+
})
40+
41+
if err != nil {
42+
rmq.Error("Error declaring queue", err)
43+
return
44+
}
45+
46+
go func() {
47+
consumer, err := amqpConnection.NewConsumer(context.Background(), tmpQueueInfo1.Name(), nil)
48+
if err != nil {
49+
rmq.Error("Error creating consumer", err)
50+
return
51+
}
52+
for {
53+
dcx, err1 := consumer.Receive(context.Background())
54+
if err1 != nil {
55+
rmq.Error("Error receiving message", err)
56+
return
57+
}
58+
rmq.Info("[Consumer tmp1] Received message", "msg", fmt.Sprintf("%s", dcx.Message().Data))
59+
err1 = dcx.Accept(context.Background())
60+
if err1 != nil {
61+
rmq.Error("Error accepting message", err)
62+
return
63+
}
64+
}
65+
}()
66+
67+
tmpQueueInfo2, err := management.DeclareQueue(context.Background(), &rmq.AutoGeneratedQueueSpecification{
68+
IsAutoDelete: true,
69+
IsExclusive: true,
70+
})
71+
72+
if err != nil {
73+
rmq.Error("Error declaring queue", err)
74+
return
75+
}
76+
77+
go func() {
78+
consumer, err := amqpConnection.NewConsumer(context.Background(), tmpQueueInfo2.Name(), nil)
79+
if err != nil {
80+
rmq.Error("Error creating consumer", err)
81+
return
82+
}
83+
for {
84+
dcx, err1 := consumer.Receive(context.Background())
85+
if err1 != nil {
86+
rmq.Error("Error receiving message", err)
87+
return
88+
}
89+
rmq.Info("[Consumer tmp2] Received message", "msg", fmt.Sprintf("%s", dcx.Message().Data))
90+
err1 = dcx.Accept(context.Background())
91+
if err1 != nil {
92+
rmq.Error("Error accepting message", err)
93+
return
94+
}
95+
}
96+
}()
97+
98+
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
99+
SourceExchange: "broadcast",
100+
DestinationQueue: tmpQueueInfo1.Name(),
101+
})
102+
103+
if err != nil {
104+
rmq.Error("Error binding", err)
105+
return
106+
}
107+
108+
_, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
109+
SourceExchange: "broadcast",
110+
DestinationQueue: tmpQueueInfo2.Name(),
111+
})
112+
113+
if err != nil {
114+
rmq.Error("Error binding", err)
115+
return
116+
}
117+
118+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
119+
Exchange: "broadcast",
120+
}, nil)
121+
122+
if err != nil {
123+
rmq.Error("Error creating publisher", err)
124+
return
125+
}
126+
127+
for i := 0; i < 10_000; i++ {
128+
publishResult, err := publisher.Publish(context.Background(),
129+
rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i))))
130+
if err != nil {
131+
rmq.Error("Error publishing message", err)
132+
return
133+
}
134+
135+
switch publishResult.Outcome.(type) {
136+
// publish result
137+
case *rmq.StateAccepted:
138+
rmq.Info("Message accepted", "message", publishResult.Message.GetData())
139+
default:
140+
rmq.Error("Message not accepted", "outcome", publishResult.Outcome)
141+
}
142+
time.Sleep(1 * time.Second)
143+
}
144+
145+
// press any key to close the connection
146+
147+
var input string
148+
_, _ = fmt.Scanln(&input)
149+
150+
}

0 commit comments

Comments
 (0)