Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Collections.Generic;

namespace RabbitMQ.AMQP.Client
{
public interface IMessage
Expand Down Expand Up @@ -32,6 +34,15 @@ public interface IMessage
IMessage GroupId(string groupId);
string GroupId();

// Application properties

public IMessage ApplicationProperty(string key, object value);

public object ApplicationProperty(string key);

public IDictionary<object, object> ApplicationProperties();

// Message annotations
public IMessage Annotation(string key, object value);

public object Annotation(string key);
Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.AMQP.Client/IRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBui
public interface IRpcClientBuilder
{
IRpcClientAddressBuilder RequestAddress();
IRpcClientBuilder ReplyToQueue(string replyToQueue);
IRpcClientBuilder ReplyToQueue(string replyToQueueName);

IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
Expand Down
30 changes: 30 additions & 0 deletions RabbitMQ.AMQP.Client/IRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,42 @@ namespace RabbitMQ.AMQP.Client

public interface IRpcServerBuilder
{
/// <summary>
/// The queue from which requests are consumed.
/// The client sends requests to this queue and the server consumes them.
/// </summary>
/// <param name="requestQueue"></param>
/// <returns></returns>
IRpcServerBuilder RequestQueue(string requestQueue);
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);

/// <summary>
/// Extracts the correlation id from the request message.
/// each message has a correlation id that is used to match the request with the response.
/// There are default implementations for the correlation id extractor.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="correlationIdExtractor"></param>
/// <returns></returns>

IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

/// <summary>
/// Post processes the reply message before sending it to the client.
/// The object parameter is the correlation id extracted from the request message.
/// There are default implementations for the reply post processor that use the correlationId() field
/// to set the correlation id of the reply message.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="replyPostProcessor"></param>
/// <returns></returns>
IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);

/// <summary>
/// Handle the request message and return the reply message.
/// </summary>
/// <param name="handler"></param>
/// <returns></returns>
IRpcServerBuilder Handler(RpcHandler handler);

Task<IRpcServer> BuildAsync();
Expand Down
33 changes: 33 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
Expand Down Expand Up @@ -58,6 +59,19 @@ private void EnsureAnnotations()
NativeMessage.MessageAnnotations ??= new MessageAnnotations();
}

private void ThrowIfApplicationPropertiesNotSet()
{
if (NativeMessage.ApplicationProperties == null)
{
throw new FieldNotSetException();
}
}

private void EnsureApplicationProperties()
{
NativeMessage.ApplicationProperties ??= new ApplicationProperties();
}

public object Body()
{
// TODO do we need to do anything with NativeMessage.BodySection?
Expand Down Expand Up @@ -156,6 +170,25 @@ public string GroupId()
return NativeMessage.Properties.GroupId;
}

public IMessage ApplicationProperty(string key, object value)
{
EnsureApplicationProperties();
NativeMessage.ApplicationProperties[key] = value;
return this;
}

public object ApplicationProperty(string key)
{
ThrowIfApplicationPropertiesNotSet();
return NativeMessage.ApplicationProperties[key];
}

public IDictionary<object, object> ApplicationProperties()
{
ThrowIfApplicationPropertiesNotSet();
return NativeMessage.ApplicationProperties.Map;
}

// Annotations

public IMessage Annotation(string key, object value)
Expand Down
67 changes: 50 additions & 17 deletions RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -37,9 +38,15 @@ public IRpcClientAddressBuilder RequestAddress()
return _addressBuilder;
}

public IRpcClientBuilder ReplyToQueue(string replyToQueue)
public IRpcClientBuilder ReplyToQueue(string replyToQueueName)
{
_configuration.ReplyToQueue = replyToQueue;
_configuration.ReplyToQueue = replyToQueueName;
return this;
}

public IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue)
{
_configuration.ReplyToQueue = replyToQueue.QueueName;
return this;
}

Expand Down Expand Up @@ -77,13 +84,24 @@ public async Task<IRpcClient> BuildAsync()
}
}

/// <summary>
/// AmqpRpcClient is an implementation of <see cref="IRpcClient"/>.
/// It is a wrapper around <see cref="IPublisher"/> and <see cref="IConsumer"/> to create an RPC client over AMQP 1.0.
/// even the PublishAsync is async the RPClient blocks the thread until the response is received.
/// within the timeout.
///
/// The PublishAsync is thread-safe and can be called from multiple threads.
///
/// See also the server side <see cref="IRpcServer"/>.
/// </summary>
public class AmqpRpcClient : AbstractLifeCycle, IRpcClient
{
private readonly RpcClientConfiguration _configuration;
private IConsumer? _consumer = null;
private IPublisher? _publisher = null;
private readonly Dictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new();
private readonly ConcurrentDictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new();
private readonly string _correlationId = Guid.NewGuid().ToString();
private readonly SemaphoreSlim _semaphore = new(1, 1);
private int _nextCorrelationId = 0;

private object CorrelationIdSupplier()
Expand Down Expand Up @@ -170,27 +188,42 @@ public override async Task CloseAsync()
}
}

/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
/// </summary>
/// <param name="message"> The request message</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
object correlationId = CorrelationIdSupplier();
message = RequestPostProcess(message, correlationId);
_pendingRequests.Add(correlationId, new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
if (_publisher != null)
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
PublishResult pr = await _publisher.PublishAsync(
message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);

if (pr.Outcome.State != OutcomeState.Accepted)
object correlationId = CorrelationIdSupplier();
message = RequestPostProcess(message, correlationId);
_pendingRequests.TryAdd(correlationId,
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
if (_publisher != null)
{
_pendingRequests[correlationId]
.SetException(new Exception($"Failed to send request state: {pr.Outcome.State}"));
PublishResult pr = await _publisher.PublishAsync(
message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);

if (pr.Outcome.State != OutcomeState.Accepted)
{
_pendingRequests[correlationId]
.SetException(new Exception($"Failed to send request state: {pr.Outcome.State}"));
}
}
}

await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
.ConfigureAwait(false);
await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
.ConfigureAwait(false);

return await _pendingRequests[correlationId].Task.ConfigureAwait(false);
return await _pendingRequests[correlationId].Task.ConfigureAwait(false);
}
finally
{
_semaphore.Release();
}
}
}
}
37 changes: 33 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ public class RpcConfiguration
public Func<IMessage, object, IMessage>? ReplyPostProcessor { get; set; }
}

/// <summary>
/// AmqpRpcServerBuilder is a builder for creating an AMQP RPC server.
/// </summary>
public class AmqpRpcServerBuilder : IRpcServerBuilder
{
readonly RpcConfiguration _configuration = new RpcConfiguration();
readonly RpcConfiguration _configuration = new();

public AmqpRpcServerBuilder(AmqpConnection connection)
{
Expand Down Expand Up @@ -60,6 +63,10 @@ public async Task<IRpcServer> BuildAsync()
}
}

/// <summary>
/// AmqpRpcServer implements the <see cref="IRpcServer"/> interface.
/// With the RpcClient you can create an RPC communication over AMQP 1.0.
/// </summary>
public class AmqpRpcServer : AbstractLifeCycle, IRpcServer
{
private readonly RpcConfiguration _configuration;
Expand Down Expand Up @@ -91,7 +98,9 @@ private object ExtractCorrelationId(IMessage message)

private IMessage ReplyPostProcessor(IMessage reply, object correlationId)
{
return _configuration.ReplyPostProcessor != null ? _configuration.ReplyPostProcessor(reply, correlationId) : reply.CorrelationId(correlationId);
return _configuration.ReplyPostProcessor != null
? _configuration.ReplyPostProcessor(reply, correlationId)
: reply.CorrelationId(correlationId);
}

public AmqpRpcServer(RpcConfiguration configuration)
Expand All @@ -117,12 +126,32 @@ public override async Task OpenAsync()
}
else
{
Trace.WriteLine(TraceLevel.Error, "No reply-to address in request");
Trace.WriteLine(TraceLevel.Error, "[RPC server] No reply-to address in request");
}

object correlationId = ExtractCorrelationId(request);
reply = ReplyPostProcessor(reply, correlationId);
await SendReply(reply).ConfigureAwait(false);
await Utils.WaitWithBackOffUntilFuncAsync(async () =>
{
try
{
await SendReply(reply).ConfigureAwait(false);
return true;
}
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Error,
$"[RPC server] Failed to send reply: {e.Message}");
return false;
}
},
(success, span) =>
{
if (!success)
{
Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}");
}
}, 3).ConfigureAwait(false);
}
})
.Queue(_configuration.RequestQueue).BuildAndStartAsync()
Expand Down
12 changes: 10 additions & 2 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ RabbitMQ.AMQP.Client.IManagement.Queue(string! name) -> RabbitMQ.AMQP.Client.IQu
RabbitMQ.AMQP.Client.IMessage
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object!
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.ApplicationProperties() -> System.Collections.Generic.IDictionary<object!, object!>!
RabbitMQ.AMQP.Client.IMessage.ApplicationProperty(string! key) -> object!
RabbitMQ.AMQP.Client.IMessage.ApplicationProperty(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.Body() -> object!
RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object!
RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
Expand Down Expand Up @@ -294,6 +297,9 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperties() -> System.Collections.Generic.IDictionary<object!, object!>!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperty(string! key) -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ApplicationProperty(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
Expand Down Expand Up @@ -365,7 +371,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQ
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcClient!>!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func<object!>? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!, RabbitMQ.AMQP.Client.IMessage!>? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
Expand Down Expand Up @@ -594,7 +601,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder
RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcClient!>!
RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func<object!>? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!, RabbitMQ.AMQP.Client.IMessage!>? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
Expand Down
Loading