Skip to content

Commit 2bb54e3

Browse files
committed
Set reply-to queue address before post-processor
Because setting the reply-to address in the post-processor forces the users of custom post-processors to remember/add a line to set the reply-to address always. By setting the reply-to prior to the post-processor, the user has the option to modify the reply-to before sending, if desired. By setting the reply-to prior to post-processor, we free up some cognitive load on the users who wish to use a custom post-processor.
1 parent ffe8b52 commit 2bb54e3

File tree

5 files changed

+117
-52
lines changed

5 files changed

+117
-52
lines changed

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -268,14 +268,6 @@ func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOpt
268268
request.Properties = &amqp.MessageProperties{}
269269
}
270270
request.Properties.MessageID = correlationID
271-
replyTo, err := (&QueueAddress{
272-
Queue: q.Name(),
273-
}).toAddress()
274-
if err != nil {
275-
// this should never happen
276-
panic(err)
277-
}
278-
request.Properties.ReplyTo = &replyTo
279271
return request
280272
}
281273
}
@@ -292,6 +284,7 @@ func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOpt
292284

293285
client := &amqpRpcClient{
294286
requestQueue: requestQueue,
287+
replyToQueue: &QueueAddress{Queue: replyQueueName},
295288
publisher: publisher,
296289
requestPostProcessor: requestPostProcessor,
297290
correlationIdSupplier: correlationIdSupplier,

pkg/rabbitmqamqp/rpc_client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ type outstandingRequest struct {
120120

121121
type amqpRpcClient struct {
122122
requestQueue ITargetAddress
123+
replyToQueue ITargetAddress
123124
publisher *Publisher
124125
consumer *Consumer
125126
requestPostProcessor RequestPostProcessor
@@ -175,6 +176,14 @@ func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-c
175176
if a.isClosed() {
176177
return nil, fmt.Errorf("rpc client is closed")
177178
}
179+
replyTo, err := a.replyToQueue.toAddress()
180+
if err != nil {
181+
return nil, fmt.Errorf("failed to set reply-to address: %w", err)
182+
}
183+
if message.Properties == nil {
184+
message.Properties = &amqp.MessageProperties{}
185+
}
186+
message.Properties.ReplyTo = &replyTo
178187
correlationID := a.correlationIdSupplier.Get()
179188
m := a.requestPostProcessor(message, correlationID)
180189
pr, err := a.publisher.Publish(ctx, m)

pkg/rabbitmqamqp/rpc_client_test.go

Lines changed: 89 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,52 @@ var _ = Describe("RpcClient", func() {
1818
publisher *Publisher
1919
)
2020

21+
var pongRpcServer = func(ctx context.Context, publisher *Publisher, consumer *Consumer) {
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return
26+
default:
27+
// Receive a message from the server consumer
28+
receivedMessage, err := consumer.Receive(ctx)
29+
if err != nil {
30+
// Exit if we can't receive messages (e.g.,
31+
// consumer is closed)
32+
GinkgoWriter.Printf("Error receiving message: %v\n", err)
33+
return
34+
}
35+
36+
msg := receivedMessage.Message()
37+
if msg == nil {
38+
GinkgoWriter.Printf("Received nil message\n")
39+
continue
40+
}
41+
42+
// Create response with "Pong: " prefix
43+
responseData := "Pong: " + string(msg.GetData())
44+
replyMessage := amqp.NewMessage([]byte(responseData))
45+
46+
// Copy correlation ID and reply-to from request
47+
if msg.Properties != nil {
48+
if replyMessage.Properties == nil {
49+
replyMessage.Properties = &amqp.MessageProperties{}
50+
}
51+
replyMessage.Properties.CorrelationID =
52+
msg.Properties.MessageID
53+
}
54+
55+
// Send reply to the specified reply-to address
56+
if msg.Properties != nil && msg.Properties.ReplyTo != nil {
57+
replyMessage.Properties.To = msg.Properties.ReplyTo
58+
}
59+
60+
copyApplicationProperties(msg, replyMessage)
61+
62+
publisher.Publish(ctx, replyMessage)
63+
}
64+
}
65+
}
66+
2167
BeforeEach(func() {
2268
queueName = generateNameWithDateTime(CurrentSpecReport().LeafNodeText)
2369
var err error
@@ -37,49 +83,7 @@ var _ = Describe("RpcClient", func() {
3783

3884
It("should send a request and receive replies", func(ctx SpecContext) {
3985
// Server goroutine to handle incoming requests
40-
go func() {
41-
for {
42-
select {
43-
case <-ctx.Done():
44-
return
45-
default:
46-
// Receive a message from the server consumer
47-
receivedMessage, err := consumer.Receive(ctx)
48-
if err != nil {
49-
// Exit if we can't receive messages (e.g.,
50-
// consumer is closed)
51-
GinkgoWriter.Printf("Error receiving message: %v\n", err)
52-
return
53-
}
54-
55-
msg := receivedMessage.Message()
56-
if msg == nil {
57-
GinkgoWriter.Printf("Received nil message\n")
58-
continue
59-
}
60-
61-
// Create response with "Pong: " prefix
62-
responseData := "Pong: " + string(msg.GetData())
63-
replyMessage := amqp.NewMessage([]byte(responseData))
64-
65-
// Copy correlation ID and reply-to from request
66-
if msg.Properties != nil {
67-
if replyMessage.Properties == nil {
68-
replyMessage.Properties = &amqp.MessageProperties{}
69-
}
70-
replyMessage.Properties.CorrelationID =
71-
msg.Properties.MessageID
72-
}
73-
74-
// Send reply to the specified reply-to address
75-
if msg.Properties != nil && msg.Properties.ReplyTo != nil {
76-
replyMessage.Properties.To = msg.Properties.ReplyTo
77-
}
78-
79-
publisher.Publish(ctx, replyMessage)
80-
}
81-
}
82-
}()
86+
go pongRpcServer(ctx, publisher, consumer)
8387

8488
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
8589
RequestQueueName: queueName,
@@ -103,4 +107,46 @@ var _ = Describe("RpcClient", func() {
103107
}
104108
Ω(client.Close(ctx)).Should(Succeed())
105109
})
110+
111+
It("uses a custom correlation id extractor and post processor", func(ctx SpecContext) {
112+
go pongRpcServer(ctx, publisher, consumer)
113+
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
114+
RequestQueueName: queueName,
115+
CorrelationIdExtractor: func(message *amqp.Message) any {
116+
return message.ApplicationProperties["correlationId"]
117+
},
118+
RequestPostProcessor: func(request *amqp.Message, correlationID any) *amqp.Message {
119+
if request.ApplicationProperties == nil {
120+
request.ApplicationProperties = make(map[string]any)
121+
}
122+
request.ApplicationProperties["correlationId"] = correlationID
123+
if request.Properties == nil {
124+
request.Properties = &amqp.MessageProperties{}
125+
}
126+
request.Properties.MessageID = correlationID
127+
128+
return request
129+
},
130+
})
131+
Ω(err).ShouldNot(HaveOccurred())
132+
DeferCleanup(func(ctx SpecContext) {
133+
// Closing twice in case the test fails and the 'happy path' close is not called
134+
_ = client.Close(ctx)
135+
})
136+
137+
request := client.Message([]byte("Using a custom correlation id extractor and post processor"))
138+
request.ApplicationProperties = map[string]any{"this-property": "should-be-preserved"}
139+
replyCh, err := client.Publish(ctx, request)
140+
Ω(err).ShouldNot(HaveOccurred())
141+
142+
actualReply := &amqp.Message{}
143+
Eventually(replyCh).
144+
Within(time.Second).
145+
WithPolling(time.Millisecond * 100).
146+
Should(Receive(&actualReply))
147+
Expect(actualReply.GetData()).To(BeEquivalentTo("Pong: Using a custom correlation id extractor and post processor"))
148+
Expect(actualReply.ApplicationProperties).To(HaveKey("correlationId"))
149+
Expect(actualReply.ApplicationProperties).To(HaveKeyWithValue("this-property", "should-be-preserved"))
150+
Ω(client.Close(ctx)).Should(Succeed())
151+
})
106152
})

pkg/rabbitmqamqp/rpc_server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/onsi/gomega/gbytes"
1313
)
1414

15-
var _ = Describe("RpcServer E2E", func() {
15+
var _ = Describe("RpcServer", func() {
1616
var (
1717
conn *AmqpConnection
1818
requestQueue string

pkg/rabbitmqamqp/test_utils.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"fmt"
66
"io"
77
"log/slog"
8+
"maps"
89
"os"
910
"strconv"
1011
"time"
12+
13+
"github.com/Azure/go-amqp"
1114
)
1215

1316
func generateNameWithDateTime(name string) string {
@@ -78,3 +81,17 @@ func declareQueueAndConnection(name string) (*AmqpConnection, error) {
7881
}
7982
return connection, nil
8083
}
84+
85+
func copyApplicationProperties(from *amqp.Message, to *amqp.Message) {
86+
if to == nil || from == nil {
87+
return
88+
}
89+
if from.ApplicationProperties == nil {
90+
to.ApplicationProperties = nil
91+
return
92+
}
93+
if to.ApplicationProperties == nil {
94+
to.ApplicationProperties = make(map[string]any)
95+
}
96+
maps.Copy(to.ApplicationProperties, from.ApplicationProperties)
97+
}

0 commit comments

Comments
 (0)