Skip to content

Commit 46039a8

Browse files
committed
Add Go AMQP 1.0 direct reply-to example
1 parent 2854289 commit 46039a8

File tree

1 file changed

+232
-0
lines changed

1 file changed

+232
-0
lines changed

go/rpc_amqp10.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/Azure/go-amqp"
10+
)
11+
12+
const (
13+
amqpURL = "amqp://guest:guest@localhost:5672/"
14+
// Request queue name for RPC requests
15+
requestQueue = "rpc-requests"
16+
)
17+
18+
// RPC Server - handles ping requests and sends pong replies
19+
func rpcServer(ctx context.Context) error {
20+
// Connect to RabbitMQ
21+
conn, err := amqp.Dial(ctx, amqpURL, nil)
22+
if err != nil {
23+
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
24+
}
25+
defer conn.Close()
26+
27+
// Create a session
28+
session, err := conn.NewSession(ctx, nil)
29+
if err != nil {
30+
return fmt.Errorf("failed to create session: %w", err)
31+
}
32+
defer session.Close(ctx)
33+
34+
// Create a receiver for the request queue
35+
receiver, err := session.NewReceiver(ctx, requestQueue, nil)
36+
if err != nil {
37+
return fmt.Errorf("failed to create receiver: %w", err)
38+
}
39+
defer receiver.Close(ctx)
40+
41+
log.Println("RPC Server: Started and listening for requests...")
42+
43+
for {
44+
select {
45+
case <-ctx.Done():
46+
log.Println("RPC Server: Shutting down...")
47+
return nil
48+
default:
49+
// Receive a request message
50+
msg, err := receiver.Receive(ctx, nil)
51+
if err != nil {
52+
log.Printf("RPC Server: Error receiving message: %v", err)
53+
continue
54+
}
55+
56+
// Accept the message
57+
err = receiver.AcceptMessage(ctx, msg)
58+
if err != nil {
59+
log.Printf("RPC Server: Error accepting message: %v", err)
60+
continue
61+
}
62+
63+
// Extract message properties
64+
65+
messageID := msg.Properties.MessageID.(string)
66+
replyTo := *msg.Properties.ReplyTo
67+
68+
log.Printf("RPC Server: Received ping request (ID: %s, ReplyTo: %s)", messageID, replyTo)
69+
70+
// Check if we have a reply-to address
71+
if replyTo == "" {
72+
log.Println("RPC Server: No reply-to address, skipping reply")
73+
continue
74+
}
75+
76+
// Create a sender for the reply
77+
sender, err := session.NewSender(ctx, replyTo, nil)
78+
if err != nil {
79+
log.Printf("RPC Server: Error creating sender for reply: %v", err)
80+
continue
81+
}
82+
83+
// Create the pong reply message
84+
replyMsg := &amqp.Message{
85+
Properties: &amqp.MessageProperties{
86+
CorrelationID: messageID,
87+
},
88+
Data: [][]byte{[]byte("pong")},
89+
}
90+
91+
// Send the reply
92+
err = sender.Send(ctx, replyMsg, nil)
93+
if err != nil {
94+
log.Printf("RPC Server: Error sending reply: %v", err)
95+
} else {
96+
log.Printf("RPC Server: Sent pong reply (CorrelationID: %s)", messageID)
97+
}
98+
99+
// Close the sender
100+
sender.Close(ctx)
101+
}
102+
}
103+
}
104+
105+
// RPC Client - sends ping requests using Direct Reply-To
106+
func rpcClient(ctx context.Context) error {
107+
// Connect to RabbitMQ
108+
conn, err := amqp.Dial(ctx, amqpURL, nil)
109+
if err != nil {
110+
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
111+
}
112+
defer conn.Close()
113+
114+
// Create a session
115+
session, err := conn.NewSession(ctx, nil)
116+
if err != nil {
117+
return fmt.Errorf("failed to create session: %w", err)
118+
}
119+
defer session.Close(ctx)
120+
121+
// Create a receiver for Direct Reply-To
122+
receiver, err := session.NewReceiver(ctx, "", &amqp.ReceiverOptions{
123+
SourceCapabilities: []string{"rabbitmq:volatile-queue"},
124+
SourceExpiryPolicy: amqp.ExpiryPolicyLinkDetach,
125+
DynamicAddress: true,
126+
RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(),
127+
})
128+
if err != nil {
129+
return fmt.Errorf("failed to create Direct Reply-To receiver: %w", err)
130+
}
131+
defer receiver.Close(ctx)
132+
133+
// Get the generated reply address from the receiver
134+
replyAddress := receiver.Address()
135+
log.Printf("RPC Client: Generated reply address: %s", replyAddress)
136+
137+
// Create a sender for requests
138+
sender, err := session.NewSender(ctx, requestQueue, nil)
139+
if err != nil {
140+
return fmt.Errorf("failed to create sender: %w", err)
141+
}
142+
defer sender.Close(ctx)
143+
144+
log.Println("RPC Client: Started and ready to send requests...")
145+
146+
requestID := 0
147+
ticker := time.NewTicker(1 * time.Second)
148+
defer ticker.Stop()
149+
150+
for {
151+
select {
152+
case <-ctx.Done():
153+
log.Println("RPC Client: Shutting down...")
154+
return nil
155+
case <-ticker.C:
156+
requestID++
157+
messageID := fmt.Sprintf("ping-%d", requestID)
158+
159+
// Create the ping request message
160+
requestMsg := &amqp.Message{
161+
Properties: &amqp.MessageProperties{
162+
MessageID: messageID,
163+
ReplyTo: &replyAddress,
164+
},
165+
Data: [][]byte{[]byte("ping")},
166+
}
167+
168+
// Send the request
169+
err = sender.Send(ctx, requestMsg, nil)
170+
if err != nil {
171+
log.Printf("RPC Client: Error sending request: %v", err)
172+
continue
173+
}
174+
175+
log.Printf("RPC Client: Sent ping request (ID: %s)", messageID)
176+
177+
// Try to receive the reply with a timeout
178+
replyCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
179+
replyMsg, err := receiver.Receive(replyCtx, nil)
180+
cancel()
181+
182+
if err != nil {
183+
log.Printf("RPC Client: Error receiving reply for %s: %v", messageID, err)
184+
continue
185+
}
186+
187+
// Accept the reply message
188+
err = receiver.AcceptMessage(ctx, replyMsg)
189+
if err != nil {
190+
log.Printf("RPC Client: Error accepting reply: %v", err)
191+
continue
192+
}
193+
194+
// Extract correlation ID and payload
195+
correlationID := ""
196+
if replyMsg.Properties != nil && replyMsg.Properties.CorrelationID != nil {
197+
correlationID = replyMsg.Properties.CorrelationID.(string)
198+
}
199+
200+
replyPayload := string(replyMsg.Data[0])
201+
log.Printf("RPC Client: Received reply - %s (CorrelationID: %s)", replyPayload, correlationID)
202+
}
203+
}
204+
}
205+
206+
func main() {
207+
ctx, cancel := context.WithCancel(context.Background())
208+
defer cancel()
209+
210+
log.Println("Starting Direct Reply-To RPC example...")
211+
log.Println("Make sure RabbitMQ is running on localhost:5672")
212+
213+
// Start the RPC server in a goroutine
214+
go func() {
215+
if err := rpcServer(ctx); err != nil {
216+
log.Printf("RPC Server error: %v", err)
217+
cancel() // Cancel context to stop the client as well
218+
}
219+
}()
220+
221+
// Start the RPC client in a goroutine
222+
go func() {
223+
if err := rpcClient(ctx); err != nil {
224+
log.Printf("RPC Client error: %v", err)
225+
cancel() // Cancel context to stop the server as well
226+
}
227+
}()
228+
229+
// Wait for context cancellation (Ctrl+C or error)
230+
<-ctx.Done()
231+
log.Println("Application shutting down...")
232+
}

0 commit comments

Comments
 (0)