Skip to content

Commit ffe8b52

Browse files
committed
Initial version of RPC client
1 parent 6ec0653 commit ffe8b52

File tree

6 files changed

+507
-20
lines changed

6 files changed

+507
-20
lines changed

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,9 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
172172
// NewRpcServer creates a new RPC server that processes requests from the
173173
// specified queue. The requestQueue in options is mandatory, while other
174174
// fields are optional and will use defaults if not provided.
175-
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options *RpcServerOptions) (RpcServer, error) {
176-
if options == nil {
177-
return nil, fmt.Errorf("options cannot be nil")
178-
}
179-
if options.RequestQueue == "" {
180-
return nil, fmt.Errorf("requestQueue is mandatory")
175+
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOptions) (RpcServer, error) {
176+
if err := options.validate(); err != nil {
177+
return nil, fmt.Errorf("rpc server options validation: %w", err)
181178
}
182179

183180
// Create consumer for receiving requests
@@ -206,9 +203,9 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options *RpcServerOpt
206203
correlationIdExtractor = defaultCorrelationIdExtractor
207204
}
208205

209-
postProcessor := options.PostProcessor
210-
if postProcessor == nil {
211-
postProcessor = defaultPostProcessor
206+
replyPostProcessor := options.ReplyPostProcessor
207+
if replyPostProcessor == nil {
208+
replyPostProcessor = defaultReplyPostProcessor
212209
}
213210

214211
server := &amqpRpcServer{
@@ -217,13 +214,108 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options *RpcServerOpt
217214
publisher: publisher,
218215
consumer: consumer,
219216
correlationIdExtractor: correlationIdExtractor,
220-
postProcessor: postProcessor,
217+
replyPostProcessor: replyPostProcessor,
221218
}
222219
go server.handle()
223220

224221
return server, nil
225222
}
226223

224+
// NewRpcClient creates a new RPC client that sends requests to the specified queue
225+
// and receives replies on a dynamically created reply queue.
226+
func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOptions) (RpcClient, error) {
227+
if options == nil {
228+
return nil, fmt.Errorf("options cannot be nil")
229+
}
230+
if options.RequestQueueName == "" {
231+
return nil, fmt.Errorf("requestQueueName is mandatory")
232+
}
233+
234+
// Create publisher for sending requests
235+
requestQueue := &QueueAddress{
236+
Queue: options.RequestQueueName,
237+
}
238+
publisher, err := a.NewPublisher(ctx, requestQueue, nil)
239+
if err != nil {
240+
return nil, fmt.Errorf("failed to create publisher: %w", err)
241+
}
242+
243+
replyQueueName := options.ReplyToQueueName
244+
if len(replyQueueName) == 0 {
245+
replyQueueName = generateNameWithDefaultPrefix()
246+
}
247+
248+
// Declare reply queue as exclusive, auto-delete classic queue
249+
q, err := a.management.DeclareQueue(ctx, &ClassicQueueSpecification{
250+
Name: replyQueueName,
251+
IsExclusive: true,
252+
IsAutoDelete: true,
253+
})
254+
if err != nil {
255+
return nil, fmt.Errorf("failed to declare reply queue: %w", err)
256+
}
257+
258+
// Set defaults for optional fields
259+
correlationIdSupplier := options.CorrelationIdSupplier
260+
if correlationIdSupplier == nil {
261+
correlationIdSupplier = newRandomUuidCorrelationIdSupplier()
262+
}
263+
264+
requestPostProcessor := options.RequestPostProcessor
265+
if requestPostProcessor == nil {
266+
requestPostProcessor = func(request *amqp.Message, correlationID any) *amqp.Message {
267+
if request.Properties == nil {
268+
request.Properties = &amqp.MessageProperties{}
269+
}
270+
request.Properties.MessageID = correlationID
271+
replyTo, err := (&QueueAddress{
272+
Queue: q.Name(),
273+
}).toAddress()
274+
if err != nil {
275+
// this should never happen
276+
panic(err)
277+
}
278+
request.Properties.ReplyTo = &replyTo
279+
return request
280+
}
281+
}
282+
283+
requestTimeout := options.RequestTimeout
284+
if requestTimeout == 0 {
285+
requestTimeout = DefaultRpcRequestTimeout
286+
}
287+
288+
correlationIdExtractor := options.CorrelationIdExtractor
289+
if correlationIdExtractor == nil {
290+
correlationIdExtractor = defaultReplyCorrelationIdExtractor
291+
}
292+
293+
client := &amqpRpcClient{
294+
requestQueue: requestQueue,
295+
publisher: publisher,
296+
requestPostProcessor: requestPostProcessor,
297+
correlationIdSupplier: correlationIdSupplier,
298+
correlationIdExtractor: correlationIdExtractor,
299+
requestTimeout: requestTimeout,
300+
pendingRequests: make(map[any]*outstandingRequest),
301+
done: make(chan struct{}),
302+
}
303+
304+
// Create consumer for receiving replies
305+
consumer, err := a.NewConsumer(ctx, q.Name(), nil)
306+
if err != nil {
307+
_ = publisher.Close(ctx) // cleanup publisher on failure
308+
return nil, fmt.Errorf("failed to create consumer: %w", err)
309+
}
310+
311+
client.consumer = consumer
312+
313+
go client.messageReceivedHandler()
314+
go client.requestTimeoutTask()
315+
316+
return client, nil
317+
}
318+
227319
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
228320
// Returns a pointer to the new AmqpConnection if successful else an error.
229321
func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error) {

pkg/rabbitmqamqp/pkg_suite_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package rabbitmqamqp_test
22

33
import (
4+
"log/slog"
45
"testing"
56

67
. "github.com/onsi/ginkgo/v2"
78
. "github.com/onsi/gomega"
9+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
810
)
911

1012
func TestPkg(t *testing.T) {
13+
rabbitmqamqp.SetSlogHandler(slog.NewTextHandler(GinkgoWriter, &slog.HandlerOptions{Level: slog.LevelDebug}))
1114
RegisterFailHandler(Fail)
1215
RunSpecs(t, "Pkg Suite")
1316
}

0 commit comments

Comments
 (0)