diff --git a/RabbitMQ.AMQP.Client/IAddressBuilder.cs b/RabbitMQ.AMQP.Client/IAddressBuilder.cs index 6cee9cbd..e8372d69 100644 --- a/RabbitMQ.AMQP.Client/IAddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/IAddressBuilder.cs @@ -23,4 +23,10 @@ public interface IAddressBuilder T Key(string key); } + + public interface IMessageAddressBuilder : IAddressBuilder + { + + IMessage Build(); + } } diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs index 0c944049..f4aac082 100644 --- a/RabbitMQ.AMQP.Client/IConnection.cs +++ b/RabbitMQ.AMQP.Client/IConnection.cs @@ -26,6 +26,10 @@ public interface IConnection : ILifeCycle IConsumerBuilder ConsumerBuilder(); + IRpcServerBuilder RpcServerBuilder(); + + IRpcClientBuilder RpcClientBuilder(); + public ReadOnlyCollection GetPublishers(); public ReadOnlyCollection GetConsumers(); diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index a9b567a0..00b7d289 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -12,20 +12,30 @@ public interface IMessage public object Body(); // properties - string MessageId(); IMessage MessageId(string id); - string CorrelationId(); + IMessage MessageId(object id); + object MessageId(); + + object CorrelationId(); IMessage CorrelationId(string id); + IMessage CorrelationId(object id); string ReplyTo(); IMessage ReplyTo(string id); + string To(); + IMessage To(string id); + string Subject(); IMessage Subject(string subject); + IMessage GroupId(string groupId); + string GroupId(); public IMessage Annotation(string key, object value); public object Annotation(string key); + + IMessageAddressBuilder ToAddress(); } } diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs new file mode 100644 index 00000000..baf73b64 --- /dev/null +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace RabbitMQ.AMQP.Client +{ + + public interface IRpcClientAddressBuilder : IAddressBuilder + { + IRpcClientBuilder RpcClient(); + } + + public interface IRpcClientBuilder + { + IRpcClientAddressBuilder RequestAddress(); + IRpcClientBuilder ReplyToQueue(string replyToQueue); + IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor); + + IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor); + + IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier); + IRpcClientBuilder Timeout(TimeSpan timeout); + Task BuildAsync(); + } + + public interface IRpcClient : ILifeCycle + { + Task PublishAsync(IMessage message, CancellationToken cancellationToken = default); + } +} diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs new file mode 100644 index 00000000..79d93246 --- /dev/null +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; + +namespace RabbitMQ.AMQP.Client +{ + public delegate Task RpcHandler(IRpcServer.IContext context, IMessage request); + + public interface IRpcServerBuilder + { + IRpcServerBuilder RequestQueue(string requestQueue); + IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); + IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor); + + IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor); + + IRpcServerBuilder Handler(RpcHandler handler); + + Task BuildAsync(); + } + + public interface IRpcServer : ILifeCycle + { + + public interface IContext + { + IMessage Message(object body); + } + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs new file mode 100644 index 00000000..cf8a2d17 --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -0,0 +1,137 @@ +using System; + +namespace RabbitMQ.AMQP.Client.Impl +{ + public abstract class DefaultAddressBuilder : IAddressBuilder + { + private string? _exchange = null; + private string? _queue = null; + private string? _key = null; + protected T? _owner = default; + + public T Exchange(IExchangeSpecification exchangeSpec) + { + return Exchange(exchangeSpec.ExchangeName); + } + + public T Exchange(string? exchangeName) + { + _exchange = exchangeName; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; + } + + public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName); + + public T Queue(string? queueName) + { + _queue = queueName; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; + } + + public T Key(string? key) + { + _key = key; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; + } + + public string Address() + { + if (_exchange == null && _queue == null) + { + throw new InvalidAddressException("Exchange or Queue must be set"); + } + + if (_exchange != null && _queue != null) + { + throw new InvalidAddressException("Exchange and Queue cannot be set together"); + } + + if (_exchange != null) + { + if (string.IsNullOrEmpty(_exchange)) + { + throw new InvalidAddressException("Exchange must be set"); + } + + if (_key != null && false == string.IsNullOrEmpty(_key)) + { + return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}"; + } + + return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}"; + } + + if (_queue == null) + { + return ""; + } + + if (string.IsNullOrEmpty(_queue)) + { + throw new InvalidAddressException("Queue must be set"); + } + + return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}"; + } + } + + public class AddressBuilder : DefaultAddressBuilder + { + public AddressBuilder() + { + _owner = this; + } + } + + public static class AddressBuilderHelper + { + public static AddressBuilder AddressBuilder() => new(); + } + + public class MessageAddressBuilder : DefaultAddressBuilder, IMessageAddressBuilder + { + private readonly IMessage _message; + + public MessageAddressBuilder(IMessage message) + { + _message = message; + _owner = this; + } + + public IMessage Build() + { + _message.To(Address()); + return _message; + } + } + + public class RpcClientAddressBuilder : DefaultAddressBuilder, IRpcClientAddressBuilder + { + readonly AmqpRpcClientBuilder _builder; + public RpcClientAddressBuilder(AmqpRpcClientBuilder builder) + { + _builder = builder; + _owner = this; + } + + public IRpcClientBuilder RpcClient() + { + return _builder; + } + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index adf4332f..d951bebd 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -50,6 +50,17 @@ public class AmqpConnection : AbstractLifeCycle, IConnection private readonly TaskCompletionSource _connectionClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + public IRpcServerBuilder RpcServerBuilder() + { + return new AmqpRpcServerBuilder(this); + } + + public IRpcClientBuilder RpcClientBuilder() + { + + return new AmqpRpcClientBuilder(this); + } + /// /// Read-only collection of publishers. /// See diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs index 5b9b720b..9da78b72 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs @@ -43,7 +43,7 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec) public IConsumerBuilder Queue(string queueName) { - string address = new AddressBuilder().Queue(queueName).Address(); + string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address(); _configuration.Address = address; return this; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 51455bf9..8cc9b54c 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -64,20 +64,27 @@ public object Body() return NativeMessage.Body; } - public string MessageId() + public object MessageId() { ThrowIfPropertiesNotSet(); - return NativeMessage.Properties.MessageId; + return NativeMessage.Properties.GetMessageId(); } public IMessage MessageId(string id) { EnsureProperties(); - NativeMessage.Properties.MessageId = id; + NativeMessage.Properties.SetMessageId(id); return this; } - public string CorrelationId() + public IMessage MessageId(object id) + { + EnsureProperties(); + NativeMessage.Properties.SetMessageId(id); + return this; + } + + public object CorrelationId() { ThrowIfPropertiesNotSet(); return NativeMessage.Properties.CorrelationId; @@ -86,7 +93,14 @@ public string CorrelationId() public IMessage CorrelationId(string id) { EnsureProperties(); - NativeMessage.Properties.CorrelationId = id; + NativeMessage.Properties.SetCorrelationId(id); + return this; + } + + public IMessage CorrelationId(object id) + { + EnsureProperties(); + NativeMessage.Properties.SetCorrelationId(id); return this; } @@ -103,6 +117,19 @@ public IMessage ReplyTo(string id) return this; } + public string To() + { + ThrowIfPropertiesNotSet(); + return NativeMessage.Properties.To; + } + + public IMessage To(string id) + { + EnsureProperties(); + NativeMessage.Properties.To = id; + return this; + } + public string Subject() { ThrowIfPropertiesNotSet(); @@ -116,6 +143,19 @@ public IMessage Subject(string subject) return this; } + public IMessage GroupId(string groupId) + { + EnsureProperties(); + NativeMessage.Properties.GroupId = groupId; + return this; + } + + public string GroupId() + { + ThrowIfPropertiesNotSet(); + return NativeMessage.Properties.GroupId; + } + // Annotations public IMessage Annotation(string key, object value) @@ -130,5 +170,10 @@ public object Annotation(string key) ThrowIfAnnotationsNotSet(); return NativeMessage.MessageAnnotations[new Symbol(key)]; } + + public IMessageAddressBuilder ToAddress() + { + return new MessageAddressBuilder(this); + } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index f16014f3..51f30d1e 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -16,12 +16,12 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher { private readonly AmqpConnection _connection; private readonly TimeSpan _timeout; - private readonly string _address; + private readonly string? _address; private readonly Guid _id = Guid.NewGuid(); private SenderLink? _senderLink = null; - public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout) + public AmqpPublisher(AmqpConnection connection, string? address, TimeSpan timeout) { _connection = connection; _address = address; diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs index 8de206bf..8a03f882 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs @@ -8,89 +8,12 @@ namespace RabbitMQ.AMQP.Client.Impl { - public class AddressBuilder : IAddressBuilder - { - private string? _exchange; - - private string? _queue; - - private string? _key; - - public AddressBuilder Exchange(IExchangeSpecification exchangeSpec) - { - return Exchange(exchangeSpec.ExchangeName); - } - - public AddressBuilder Exchange(string? exchange) - { - _exchange = exchange; - return this; - } - - public AddressBuilder Queue(IQueueSpecification queueSpec) - { - return Queue(queueSpec.QueueName); - } - - public AddressBuilder Queue(string? queue) - { - _queue = queue; - return this; - } - - public AddressBuilder Key(string? key) - { - _key = key; - return this; - } - - public string Address() - { - if (_exchange == null && _queue == null) - { - throw new InvalidAddressException("Exchange or Queue must be set"); - } - - if (_exchange != null && _queue != null) - { - throw new InvalidAddressException("Exchange and Queue cannot be set together"); - } - - if (_exchange != null) - { - if (string.IsNullOrEmpty(_exchange)) - { - throw new InvalidAddressException("Exchange must be set"); - } - - if (_key != null && false == string.IsNullOrEmpty(_key)) - { - return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}"; - } - - return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}"; - } - - if (_queue == null) - { - return ""; - } - - if (string.IsNullOrEmpty(_queue)) - { - throw new InvalidAddressException("Queue must be set"); - } - - return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}"; - } - } - public class AmqpPublisherBuilder : IPublisherBuilder { private readonly AmqpConnection _connection; - private string? _exchange; - private string? _key; - private string? _queue; + private string? _exchange = null; + private string? _key = null; + private string? _queue = null; private TimeSpan _timeout = TimeSpan.FromSeconds(10); public AmqpPublisherBuilder(AmqpConnection connection) @@ -132,9 +55,18 @@ public IPublisherBuilder PublishTimeout(TimeSpan timeout) return this; } + private bool IsAnonymous() + { + return string.IsNullOrEmpty(_exchange) && string.IsNullOrEmpty(_queue) && string.IsNullOrEmpty(_key); + } + public async Task BuildAsync(CancellationToken cancellationToken = default) { - string address = new AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); + string? address = null; + if (!IsAnonymous()) + { + address = AddressBuilderHelper.AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); + } AmqpPublisher publisher = new(_connection, address, _timeout); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs new file mode 100644 index 00000000..a65b78a0 --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -0,0 +1,196 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Amqp.Types; + +namespace RabbitMQ.AMQP.Client.Impl +{ + public class RpcClientConfiguration + { + public AmqpConnection Connection { get; set; } = null!; + public string ReplyToQueue { get; set; } = ""; + public string RequestAddress { get; set; } = ""; + public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); + + public Func? CorrelationIdSupplier { get; set; } = null; + + public Func? CorrelationIdExtractor { get; set; } + + public Func? RequestPostProcessor { get; set; } + } + + public class AmqpRpcClientBuilder : IRpcClientBuilder + { + private readonly RpcClientAddressBuilder _addressBuilder; + private readonly AmqpConnection _connection; + private readonly RpcClientConfiguration _configuration = new(); + + public AmqpRpcClientBuilder(AmqpConnection connection) + { + _connection = connection; + _addressBuilder = new RpcClientAddressBuilder(this); + } + + public IRpcClientAddressBuilder RequestAddress() + { + return _addressBuilder; + } + + public IRpcClientBuilder ReplyToQueue(string replyToQueue) + { + _configuration.ReplyToQueue = replyToQueue; + return this; + } + + public IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor) + { + _configuration.CorrelationIdExtractor = correlationIdExtractor; + return this; + } + + public IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor) + { + _configuration.RequestPostProcessor = requestPostProcessor; + return this; + } + + public IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier) + { + _configuration.CorrelationIdSupplier = correlationIdSupplier; + return this; + } + + public IRpcClientBuilder Timeout(TimeSpan timeout) + { + _configuration.Timeout = timeout; + return this; + } + + public async Task BuildAsync() + { + _configuration.RequestAddress = _addressBuilder.Address(); + _configuration.Connection = _connection; + var rpcClient = new AmqpRpcClient(_configuration); + await rpcClient.OpenAsync().ConfigureAwait(false); + return rpcClient; + } + } + + public class AmqpRpcClient : AbstractLifeCycle, IRpcClient + { + private readonly RpcClientConfiguration _configuration; + private IConsumer? _consumer = null; + private IPublisher? _publisher = null; + private readonly Dictionary> _pendingRequests = new(); + private readonly string _correlationId = Guid.NewGuid().ToString(); + private int _nextCorrelationId = 0; + + private object CorrelationIdSupplier() + { + if (_configuration.CorrelationIdSupplier != null) + { + return _configuration.CorrelationIdSupplier(); + } + + return $"{_correlationId}-" + Interlocked.Increment(ref _nextCorrelationId); + } + + private object ExtractCorrelationId(IMessage message) + { + object corr = message.CorrelationId(); + if (_configuration.CorrelationIdExtractor != null) + { + corr = _configuration.CorrelationIdExtractor(message); + } + + return corr; + } + + private IMessage RequestPostProcess(IMessage request, object correlationId) + { + if (_configuration.RequestPostProcessor != null) + { + return _configuration.RequestPostProcessor(request, correlationId); + } + + return request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) + .MessageId(correlationId); + } + + public AmqpRpcClient(RpcClientConfiguration configuration) + { + _configuration = configuration; + } + + public override async Task OpenAsync() + { + if (string.IsNullOrEmpty(_configuration.ReplyToQueue)) + { + IQueueInfo queueInfo = await _configuration.Connection.Management().Queue().AutoDelete(true) + .Exclusive(true).DeclareAsync() + .ConfigureAwait(false); + _configuration.ReplyToQueue = queueInfo.Name(); + } + + _publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false); + _consumer = await _configuration.Connection.ConsumerBuilder() + .Queue(_configuration.ReplyToQueue) + .MessageHandler(async (context, message) => + { + await context.AcceptAsync().ConfigureAwait(false); + object correlationId = ExtractCorrelationId(message); + if (_pendingRequests.TryGetValue(correlationId, out TaskCompletionSource? request)) + { + request.SetResult(message); + } + }).BuildAndStartAsync().ConfigureAwait(false); + + await base.OpenAsync().ConfigureAwait(false); + } + + public override async Task CloseAsync() + { + OnNewStatus(State.Closing, null); + try + { + if (_publisher != null) + { + await _publisher.CloseAsync().ConfigureAwait(false); + } + + if (_consumer != null) + { + await _consumer.CloseAsync().ConfigureAwait(false); + } + } + finally + { + OnNewStatus(State.Closed, null); + } + } + + public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) + { + object correlationId = CorrelationIdSupplier(); + message = RequestPostProcess(message, correlationId); + _pendingRequests.Add(correlationId, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + if (_publisher != null) + { + PublishResult pr = await _publisher.PublishAsync( + message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); + + if (pr.Outcome.State != OutcomeState.Accepted) + { + _pendingRequests[correlationId] + .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); + } + } + + await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout) + .ConfigureAwait(false); + + return await _pendingRequests[correlationId].Task.ConfigureAwait(false); + } + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs new file mode 100644 index 00000000..b1cea207 --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -0,0 +1,160 @@ +using System; +using System.Threading.Tasks; +using Amqp; + +namespace RabbitMQ.AMQP.Client.Impl +{ + public class RpcConfiguration + { + public AmqpConnection Connection { get; set; } = null!; + public RpcHandler? Handler { get; set; } + public string RequestQueue { get; set; } = ""; + public Func? CorrelationIdExtractor { get; set; } + public Func? ReplyPostProcessor { get; set; } + } + + public class AmqpRpcServerBuilder : IRpcServerBuilder + { + readonly RpcConfiguration _configuration = new RpcConfiguration(); + + public AmqpRpcServerBuilder(AmqpConnection connection) + { + _configuration.Connection = connection; + } + + public IRpcServerBuilder RequestQueue(string requestQueue) + { + _configuration.RequestQueue = requestQueue; + return this; + } + + public IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue) + { + _configuration.RequestQueue = requestQueue.QueueName; + return this; + } + + public IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor) + { + _configuration.CorrelationIdExtractor = correlationIdExtractor; + return this; + } + + public IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor) + { + _configuration.ReplyPostProcessor = replyPostProcessor; + return this; + } + + public IRpcServerBuilder Handler(RpcHandler handler) + { + _configuration.Handler = handler; + return this; + } + + public async Task BuildAsync() + { + AmqpRpcServer amqpRpcServer = new(_configuration); + await amqpRpcServer.OpenAsync().ConfigureAwait(false); + return amqpRpcServer; + } + } + + public class AmqpRpcServer : AbstractLifeCycle, IRpcServer + { + private readonly RpcConfiguration _configuration; + private IPublisher? _publisher = null; + private IConsumer? _consumer = null; + + private async Task SendReply(IMessage reply) + { + if (_publisher != null) + { + PublishResult pr = await _publisher.PublishAsync(reply).ConfigureAwait(false); + if (pr.Outcome.State != OutcomeState.Accepted) + { + Trace.WriteLine(TraceLevel.Error, "Failed to send reply"); + } + } + } + + private object ExtractCorrelationId(IMessage message) + { + object corr = message.MessageId(); + if (_configuration.CorrelationIdExtractor != null) + { + corr = _configuration.CorrelationIdExtractor(message); + } + + return corr; + } + + private IMessage ReplyPostProcessor(IMessage reply, object correlationId) + { + return _configuration.ReplyPostProcessor != null ? _configuration.ReplyPostProcessor(reply, correlationId) : reply.CorrelationId(correlationId); + } + + public AmqpRpcServer(RpcConfiguration configuration) + { + _configuration = configuration; + } + + public override async Task OpenAsync() + { + _publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false); + + _consumer = await _configuration.Connection.ConsumerBuilder().MessageHandler(async (context, request) => + { + await context.AcceptAsync().ConfigureAwait(false); + if (_configuration.Handler != null) + { + IMessage reply = await _configuration.Handler(new RpcServerContext(), request) + .ConfigureAwait(false); + + if (request.ReplyTo() != "") + { + reply.To(request.ReplyTo()); + } + else + { + Trace.WriteLine(TraceLevel.Error, "No reply-to address in request"); + } + + object correlationId = ExtractCorrelationId(request); + reply = ReplyPostProcessor(reply, correlationId); + await SendReply(reply).ConfigureAwait(false); + } + }) + .Queue(_configuration.RequestQueue).BuildAndStartAsync() + .ConfigureAwait(false); + + await base.OpenAsync().ConfigureAwait(false); + } + + private class RpcServerContext : IRpcServer.IContext + { + public IMessage Message(object body) => new AmqpMessage(body); + } + + public override async Task CloseAsync() + { + OnNewStatus(State.Closing, null); + try + { + if (_publisher != null) + { + await _publisher.CloseAsync().ConfigureAwait(false); + } + + if (_consumer != null) + { + await _consumer.CloseAsync().ConfigureAwait(false); + } + } + finally + { + OnNewStatus(State.Closed, null); + } + } + } +} diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 4fe42bf3..0e9f5c19 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -20,6 +20,10 @@ override RabbitMQ.AMQP.Client.Impl.AmqpManagement.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.CloseAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.ToString() -> string! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.CloseAsync() -> System.Threading.Tasks.Task! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.OpenAsync() -> System.Threading.Tasks.Task! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.CloseAsync() -> System.Threading.Tasks.Task! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(object? obj) -> bool override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.GetHashCode() -> int @@ -96,6 +100,8 @@ RabbitMQ.AMQP.Client.IConnection.Id.get -> long RabbitMQ.AMQP.Client.IConnection.Id.set -> void RabbitMQ.AMQP.Client.IConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.IConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IConnectionSettings RabbitMQ.AMQP.Client.IConnectionSettings.ContainerId.get -> string! RabbitMQ.AMQP.Client.IConnectionSettings.Host.get -> string! @@ -176,14 +182,23 @@ RabbitMQ.AMQP.Client.IMessage RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.Body() -> object! -RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> string! +RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object! +RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.IMessage.MessageId() -> string! +RabbitMQ.AMQP.Client.IMessage.GroupId() -> string! +RabbitMQ.AMQP.Client.IMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.MessageId() -> object! +RabbitMQ.AMQP.Client.IMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.ReplyTo() -> string! RabbitMQ.AMQP.Client.IMessage.ReplyTo(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.Subject() -> string! RabbitMQ.AMQP.Client.IMessage.Subject(string! subject) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.To() -> string! +RabbitMQ.AMQP.Client.IMessage.To(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.ToAddress() -> RabbitMQ.AMQP.Client.IMessageAddressBuilder! +RabbitMQ.AMQP.Client.IMessageAddressBuilder +RabbitMQ.AMQP.Client.IMessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.AbstractLifeCycle() -> void RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ChangeState -> RabbitMQ.AMQP.Client.LifeCycleCallBack? @@ -195,13 +210,8 @@ RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle._disposed -> bool RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle.AbstractReconnectLifeCycle() -> void RabbitMQ.AMQP.Client.Impl.AddressBuilder -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Address() -> string! RabbitMQ.AMQP.Client.Impl.AddressBuilder.AddressBuilder() -> void -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Exchange(string? exchange) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Key(string? key) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Queue(string? queue) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! +RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.AmqpBindingSpecification(RabbitMQ.AMQP.Client.Impl.AmqpManagement! management) -> void RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification! @@ -235,6 +245,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.get -> long RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.set -> void RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpConsumer RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration! configuration) -> void RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void @@ -283,19 +295,26 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId() -> object! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.NativeMessage.get -> Amqp.Message! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ReplyTo() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ReplyTo(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Subject() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Subject(string! subject) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.To() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.To(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.ToAddress() -> RabbitMQ.AMQP.Client.IMessageAddressBuilder! RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException.AmqpNotOpenException(string! message) -> void RabbitMQ.AMQP.Client.Impl.AmqpPublisher -RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, System.TimeSpan timeout) -> void +RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string? address, System.TimeSpan timeout) -> void RabbitMQ.AMQP.Client.Impl.AmqpPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.AmqpPublisherBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void @@ -338,6 +357,28 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! configuration) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! @@ -404,6 +445,15 @@ RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.set -> void RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.get -> int RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.set -> void RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ListenerContext -> System.Action? +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Address() -> string! +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DefaultAddressBuilder() -> void +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(string? exchangeName) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Key(string? key) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Queue(string? queueName) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder._owner -> T? RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Arguments() -> System.Collections.Generic.Dictionary! RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.AutoDelete() -> bool @@ -421,6 +471,9 @@ RabbitMQ.AMQP.Client.Impl.InvalidCodeException RabbitMQ.AMQP.Client.Impl.InvalidCodeException.InvalidCodeException(string! message) -> void RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! @@ -428,6 +481,37 @@ RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.GetBackOffDelayPolicy() -> Rabbi RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClientAddressBuilder(RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder! builder) -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RpcClientConfiguration() -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.get -> System.TimeSpan +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RpcConfiguration() -> void RabbitMQ.AMQP.Client.Impl.StreamOptions RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterMatchUnfiltered(bool matchUnfiltered) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterValues(string![]! values) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! @@ -502,6 +586,28 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.GetBackOffDelayPolicy() -> RabbitMQ. RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.IRpcClient +RabbitMQ.AMQP.Client.IRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcClientAddressBuilder +RabbitMQ.AMQP.Client.IRpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder +RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcServer +RabbitMQ.AMQP.Client.IRpcServer.IContext +RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IRpcServerBuilder +RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IStreamSpecification RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! @@ -550,6 +656,7 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy +RabbitMQ.AMQP.Client.RpcHandler RabbitMQ.AMQP.Client.SaslMechanism RabbitMQ.AMQP.Client.SaslMechanism.Equals(RabbitMQ.AMQP.Client.SaslMechanism? other) -> bool RabbitMQ.AMQP.Client.SaslMechanism.Mechanism.get -> string! @@ -568,6 +675,7 @@ static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Cli static RabbitMQ.AMQP.Client.ByteCapacity.Kb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Mb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Tb(long terabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! +static RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper.AddressBuilder() -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! static RabbitMQ.AMQP.Client.Impl.AmqpConnection.CreateAsync(RabbitMQ.AMQP.Client.IConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task! static RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CreateAsync(RabbitMQ.AMQP.Client.IConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task! static RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.Create() -> RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy! diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index 1f5444fe..0b0fb3e0 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -119,7 +119,7 @@ internal static void ValidatePositive(string label, long value) // protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST); // break; - internal static Attach CreateAttach(string address, + internal static Attach CreateAttach(string? address, DeliveryMode deliveryMode, Guid linkId, Map? sourceFilter = null) { var attach = new Attach diff --git a/Tests/AmqpTests.cs b/Tests/AmqpTests.cs index 91aaf5c7..ee171d9b 100644 --- a/Tests/AmqpTests.cs +++ b/Tests/AmqpTests.cs @@ -108,7 +108,7 @@ async Task MessageHandler(IContext ctx, IMessage msg) try { receivedSubject = msg.Subject(); - messageIds.Add(int.Parse(msg.MessageId())); + messageIds.Add(int.Parse((string)msg.MessageId())); await ctx.AcceptAsync(); if (Interlocked.Increment(ref receivedMessageCount) == messageCount) { diff --git a/Tests/AnonymousPublisherTests.cs b/Tests/AnonymousPublisherTests.cs new file mode 100644 index 00000000..68ccc0bf --- /dev/null +++ b/Tests/AnonymousPublisherTests.cs @@ -0,0 +1,95 @@ +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests +{ + public class AnonymousPublisherTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task AnonymousPublisherPublishResultNotAcceptedWhenQueueDoesNotExist() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Queue("DoesNotExist").Build()); + Assert.Equal(OutcomeState.Released, pr.Outcome.State); + } + + [Fact] + public async Task AnonymousPublisherPublishResultNotAcceptedWhenExchangeDoesNotExist() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Exchange("DoesNotExist").Build()); + Assert.Equal(OutcomeState.Released, pr.Outcome.State); + } + + [Fact] + public async Task AnonymousPublisherPublishResultAcceptedWhenQueueExists() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Queue(_queueName).Build()); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + await aPublisher.CloseAsync(); + } + + /// + /// Test when the exchange exists and the key is valid. + /// Released when the key is invalid. + /// + /// + /// + [Theory] + [InlineData(OutcomeState.Accepted, "myValidKey")] + [InlineData(OutcomeState.Released, "myInvalidKey")] + public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(OutcomeState outcomeState, + string key) + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Exchange(_exchangeName).Type(ExchangeType.TOPIC).DeclareAsync(); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + await _management.Binding().SourceExchange(_exchangeName).DestinationQueue(_queueName).Key(key).BindAsync(); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Exchange(_exchangeName).Key("myValidKey").Build()); + Assert.Equal(outcomeState, pr.Outcome.State); + await aPublisher.CloseAsync(); + } + + /// + /// In this test, we are sending a message to a queue that is different from the one defined in the address. + /// So we mix anonymous and defined queues. + /// The winner is the defined queue. So the + /// + [Fact] + public async Task MessageShouldGoToTheDefinedQueueAndNotToTheAddressTo() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + await _management.Queue(_queueName + "2").Quorum().Queue().DeclareAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + PublishResult pr = + await publisher.PublishAsync(new AmqpMessage("Hello, World!").ToAddress().Queue(_queueName + "2") + .Build()); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + Thread.Sleep(200); + await SystemUtils.WaitUntilQueueMessageCount(_queueName, 1); + await SystemUtils.WaitUntilQueueMessageCount(_queueName + "2", 0); + await _management.Queue(_queueName + "2").Quorum().Queue().DeleteAsync(); + } + } +} diff --git a/Tests/Consumer/StreamConsumerTests.cs b/Tests/Consumer/StreamConsumerTests.cs index a8fbe7d7..b4188fae 100644 --- a/Tests/Consumer/StreamConsumerTests.cs +++ b/Tests/Consumer/StreamConsumerTests.cs @@ -38,7 +38,7 @@ public async Task StreamConsumerBuilderShouldRestartFromTheBeginning() { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } @@ -75,7 +75,7 @@ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfiguration() { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } @@ -111,7 +111,7 @@ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfigurationWh { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } diff --git a/Tests/PublisherTests.cs b/Tests/PublisherTests.cs index 81b0395c..e7f328ff 100644 --- a/Tests/PublisherTests.cs +++ b/Tests/PublisherTests.cs @@ -21,11 +21,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect Assert.NotNull(_management); await Assert.ThrowsAsync(() => - _connection.PublisherBuilder().Queue("does_not_matter").Exchange("i_should_not_stay_here").BuildAsync()); - - await Assert.ThrowsAsync(() => _connection.PublisherBuilder().Exchange("").BuildAsync()); - - await Assert.ThrowsAsync(() => _connection.PublisherBuilder().Queue("").BuildAsync()); + _connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together"). + Exchange("queue_and_exchange_cant_set_together").BuildAsync()); await _connection.CloseAsync(); Assert.Empty(_connection.GetPublishers()); diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs new file mode 100644 index 00000000..8fee5c32 --- /dev/null +++ b/Tests/Rpc/RpcServerTests.cs @@ -0,0 +1,257 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Rpc +{ + public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task MockRpcServerPingPong() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + tcs.SetResult(reply); + return Task.FromResult(reply); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + + await p.PublishAsync(new AmqpMessage("test")); + IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("pong", m.Body()); + await rpcServer.CloseAsync(); + } + + [Fact] + public async Task RpcServerValidateStateChange() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + List<(State, State)> states = []; + await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message(request.Body()); + return Task.FromResult(m); + }).RequestQueue(_queueName).BuildAsync(); + rpcServer.ChangeState += (sender, fromState, toState, e) => + { + states.Add((fromState, toState)); + if (states.Count == 2) + { + tsc.SetResult(states.Count); + } + }; + Assert.NotNull(rpcServer); + await rpcServer.CloseAsync(); + int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(2, count); + Assert.Equal(State.Open, states[0].Item1); + Assert.Equal(State.Closing, states[0].Item2); + Assert.Equal(State.Closing, states[1].Item1); + Assert.Equal(State.Closed, states[1].Item2); + } + + /// + /// Simulate RPC communication with a publisher + /// + [Fact] + public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(requestQueue).BuildAsync(); + + Assert.NotNull(rpcServer); + string queueReplyTo = $"queueReplyTo-{Now}"; + IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); + await spec.DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( + async (context, message) => + { + await context.AcceptAsync(); + tcs.SetResult(message); + }).BuildAndStartAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); + Assert.NotNull(publisher); + AddressBuilder addressBuilder = new(); + + IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); + PublishResult pr = await publisher.PublishAsync(message); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + + IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("pong", m.Body()); + + await rpcServer.CloseAsync(); + await consumer.CloseAsync(); + await publisher.CloseAsync(); + } + + /// + /// In this test the client has to create a reply queue since is not provided by the user + /// with the ReplyToQueue method + /// + [Fact] + public async Task RpcServerClientPingPongWithDefault() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + + /// + /// In this test the client has to use the ReplyToQueue provided by the user + /// + [Fact] + public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + .BuildAsync(); + Assert.NotNull(rpcServer); + + // custom replyTo queue + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .CorrelationIdSupplier(() => correlationId) + .CorrelationIdExtractor(message => message.CorrelationId()) + .ReplyToQueue(replyTo.Name()) + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + Assert.Equal(correlationId, response.CorrelationId()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + + /// + /// This test combine all the features with the overriding of the request and response post processor + /// the correlation id supplier and the extraction of the correlationId. + /// Here the client uses the replyTo queue provided by the user and the correlationId supplier + /// the field "Subject" is used as correlationId + /// The server uses the field "GroupId" as correlationId + /// Both use the extraction correlationId to get the correlationId + /// + /// The fields "Subject" and "GroupId" are used ONLY for test. + /// You should not use these fields for this purpose. + /// + /// + + [Fact] + public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + //come from the client + .CorrelationIdExtractor(message => message.Subject()) + // replace the correlation id location with GroupId + .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( + replyCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + Assert.NotNull(rpcServer); + + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + int correlationIdCounter = 0; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .ReplyToQueue(replyTo.Name()) + // replace the correlation id creation with a custom function + .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") + // The server will reply with the correlation id in the groupId + // This is only for testing. You should not use the groupId for this. + .CorrelationIdExtractor(message => message.GroupId()) + // The client will use Subject to store the correlation id + // this is only for testing. You should not use Subject for this. + .RequestPostProcessor((request, requestCorrelationId) + => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) + .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + int i = 1; + while (i < 30) + { + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + // the server replies with the correlation id in the GroupId field + Assert.Equal($"{correlationId}_{i}", response.GroupId()); + i++; + } + + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + } +}