From 83ee796c82d88c1b9c34a2c0408d8d5cd460ae98 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 4 Oct 2024 14:15:34 +0200 Subject: [PATCH 01/17] First RPC classes Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IRpcServer.cs | 32 +++++++++++++++++++++ RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 33 ++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 RabbitMQ.AMQP.Client/IRpcServer.cs create mode 100644 RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs new file mode 100644 index 00000000..89358adc --- /dev/null +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading.Tasks; + +namespace RabbitMQ.AMQP.Client +{ + public interface IRpcServerBuilder + { + IRpcServerBuilder RequestQueue(string requestQueue); + IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); + IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor); + IRpcServerBuilder Handler(IRpcServer.IHandler handler); + } + + public interface IRpcServer : ILifeCycle + { + public IHandler? Handler { get; } + + + public interface IHandler + { + IMessage Handle(IContext ctx, IMessage request); + } + + + public IContext Context { get; } + + public interface IContext + { + IMessage Message(object body); + } + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs new file mode 100644 index 00000000..231f2c47 --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; + +namespace RabbitMQ.AMQP.Client.Impl +{ + public class AmqpRpcServer : AbstractLifeCycle, IRpcServer + { + private AmqpConnection _connection; + + public AmqpRpcServer(AmqpConnection connection) + { + _connection = connection; + } + + private class RpcServerHandler : IRpcServer.IHandler + { + public IMessage Handle(IRpcServer.IContext ctx, IMessage request) => + throw new System.NotImplementedException(); + } + + private class RpcServerContext : IRpcServer.IContext + { + public IMessage Message(object body) => throw new System.NotImplementedException(); + } + + + + public override Task CloseAsync() => throw new System.NotImplementedException(); + + public IRpcServer.IHandler Handler { get; } = new RpcServerHandler(); + public IRpcServer.IContext Context { get; } = new RpcServerContext(); + } +} From f874b11d8145db8586500399adcc34d059360bec Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 4 Oct 2024 14:18:07 +0200 Subject: [PATCH 02/17] First RPC classes Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs | 79 +++++++++++++++++++ .../Impl/AmqpPublisherBuilder.cs | 77 ------------------ RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 17 ++++ 3 files changed, 96 insertions(+), 77 deletions(-) create mode 100644 RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs new file mode 100644 index 00000000..b5b61eab --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -0,0 +1,79 @@ +namespace RabbitMQ.AMQP.Client.Impl +{ + public class AddressBuilder : IAddressBuilder + { + private string? _exchange; + + private string? _queue; + + private string? _key; + + public AddressBuilder Exchange(IExchangeSpecification exchangeSpec) + { + return Exchange(exchangeSpec.ExchangeName); + } + + public AddressBuilder Exchange(string? exchange) + { + _exchange = exchange; + return this; + } + + public AddressBuilder Queue(IQueueSpecification queueSpec) + { + return Queue(queueSpec.QueueName); + } + + public AddressBuilder Queue(string? queue) + { + _queue = queue; + return this; + } + + public AddressBuilder Key(string? key) + { + _key = key; + return this; + } + + public string Address() + { + if (_exchange == null && _queue == null) + { + throw new InvalidAddressException("Exchange or Queue must be set"); + } + + if (_exchange != null && _queue != null) + { + throw new InvalidAddressException("Exchange and Queue cannot be set together"); + } + + if (_exchange != null) + { + if (string.IsNullOrEmpty(_exchange)) + { + throw new InvalidAddressException("Exchange must be set"); + } + + if (_key != null && false == string.IsNullOrEmpty(_key)) + { + return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}"; + } + + return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}"; + } + + if (_queue == null) + { + return ""; + } + + if (string.IsNullOrEmpty(_queue)) + { + throw new InvalidAddressException("Queue must be set"); + } + + return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}"; + } + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs index b4bffb8a..5ed1f134 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs @@ -8,83 +8,6 @@ namespace RabbitMQ.AMQP.Client.Impl { - public class AddressBuilder : IAddressBuilder - { - private string? _exchange; - - private string? _queue; - - private string? _key; - - public AddressBuilder Exchange(IExchangeSpecification exchangeSpec) - { - return Exchange(exchangeSpec.ExchangeName); - } - - public AddressBuilder Exchange(string? exchange) - { - _exchange = exchange; - return this; - } - - public AddressBuilder Queue(IQueueSpecification queueSpec) - { - return Queue(queueSpec.QueueName); - } - - public AddressBuilder Queue(string? queue) - { - _queue = queue; - return this; - } - - public AddressBuilder Key(string? key) - { - _key = key; - return this; - } - - public string Address() - { - if (_exchange == null && _queue == null) - { - throw new InvalidAddressException("Exchange or Queue must be set"); - } - - if (_exchange != null && _queue != null) - { - throw new InvalidAddressException("Exchange and Queue cannot be set together"); - } - - if (_exchange != null) - { - if (string.IsNullOrEmpty(_exchange)) - { - throw new InvalidAddressException("Exchange must be set"); - } - - if (_key != null && false == string.IsNullOrEmpty(_key)) - { - return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}/{Utils.EncodePathSegment(_key)}"; - } - - return $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_exchange)}"; - } - - if (_queue == null) - { - return ""; - } - - if (string.IsNullOrEmpty(_queue)) - { - throw new InvalidAddressException("Queue must be set"); - } - - return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}"; - } - } - public class AmqpPublisherBuilder : IPublisherBuilder { private readonly AmqpConnection _connection; diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 46998d31..a109c245 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -19,6 +19,7 @@ override RabbitMQ.AMQP.Client.Impl.AmqpManagement.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.CloseAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.ToString() -> string! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.CloseAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(object? obj) -> bool override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.GetHashCode() -> int @@ -332,6 +333,10 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.Context.get -> RabbitMQ.AMQP.Client.IRpcServer.IContext! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.Handler.get -> RabbitMQ.AMQP.Client.IRpcServer.IHandler! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! @@ -495,6 +500,18 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.GetBackOffDelayPolicy() -> RabbitMQ. RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.IRpcServer +RabbitMQ.AMQP.Client.IRpcServer.Context.get -> RabbitMQ.AMQP.Client.IRpcServer.IContext! +RabbitMQ.AMQP.Client.IRpcServer.Handler.get -> RabbitMQ.AMQP.Client.IRpcServer.IHandler? +RabbitMQ.AMQP.Client.IRpcServer.IContext +RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IRpcServer.IHandler +RabbitMQ.AMQP.Client.IRpcServer.IHandler.Handle(RabbitMQ.AMQP.Client.IRpcServer.IContext! ctx, RabbitMQ.AMQP.Client.IMessage! request) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IRpcServerBuilder +RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.IRpcServer.IHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IStreamSpecification RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! RabbitMQ.AMQP.Client.IStreamSpecification.MaxAge(System.TimeSpan maxAge) -> RabbitMQ.AMQP.Client.IStreamSpecification! From 160b451897fc06b7f1cb646a970a3fe5bf123edb Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 4 Oct 2024 16:30:42 +0200 Subject: [PATCH 03/17] First Anonymous publisher Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IAddressBuilder.cs | 7 ++ RabbitMQ.AMQP.Client/IMessage.cs | 5 ++ RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs | 76 ++++++++++++++----- RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs | 19 +++++ RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs | 4 +- .../Impl/AmqpPublisherBuilder.cs | 17 ++++- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 28 +++++-- RabbitMQ.AMQP.Client/Utils.cs | 2 +- 8 files changed, 126 insertions(+), 32 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IAddressBuilder.cs b/RabbitMQ.AMQP.Client/IAddressBuilder.cs index 6cee9cbd..79f8f385 100644 --- a/RabbitMQ.AMQP.Client/IAddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/IAddressBuilder.cs @@ -23,4 +23,11 @@ public interface IAddressBuilder T Key(string key); } + + + public interface IMessageAddressBuilder : IAddressBuilder + { + + IMessage Build(); + } } diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index b18e4dc3..d89d7b65 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -21,6 +21,9 @@ public interface IMessage string ReplyTo(); IMessage ReplyTo(string id); + string To(); + IMessage To(string id); + string Subject(); IMessage Subject(string subject); @@ -28,5 +31,7 @@ public interface IMessage public IMessage Annotation(string key, object value); public object Annotation(string key); + + IMessageAddressBuilder ToAddress(); } } diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs index b5b61eab..1161a054 100644 --- a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -1,39 +1,54 @@ +using System; + namespace RabbitMQ.AMQP.Client.Impl { - public class AddressBuilder : IAddressBuilder + public abstract class DefaultAddressBuilder : IAddressBuilder { - private string? _exchange; - - private string? _queue; + private string? _exchange = null; + private string? _queue = null; + private string? _key = null; + protected T? _owner = default; - private string? _key; - public AddressBuilder Exchange(IExchangeSpecification exchangeSpec) + public T Exchange(IExchangeSpecification exchangeSpec) { return Exchange(exchangeSpec.ExchangeName); } - public AddressBuilder Exchange(string? exchange) - { - _exchange = exchange; - return this; - } - public AddressBuilder Queue(IQueueSpecification queueSpec) + public T Exchange(string? exchangeName) { - return Queue(queueSpec.QueueName); + _exchange = exchangeName; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; } - public AddressBuilder Queue(string? queue) + public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName); + + public T Queue(string? queueName) { - _queue = queue; - return this; + _queue = queueName; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; } - public AddressBuilder Key(string? key) + public T Key(string? key) { _key = key; - return this; + if (_owner == null) + { + throw new InvalidOperationException("Owner is null"); + } + + return _owner; } public string Address() @@ -76,4 +91,29 @@ public string Address() return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}"; } } + + public class AddressBuilder : DefaultAddressBuilder + { + public AddressBuilder() + { + _owner = this; + } + } + + public class MessageAddressBuilder : DefaultAddressBuilder, IMessageAddressBuilder + { + private readonly IMessage _message; + + public MessageAddressBuilder(IMessage message) + { + _message = message; + _owner = this; + } + + public IMessage Build() + { + _message.To(Address()); + return _message; + } + } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 020d4646..000d8b30 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -13,6 +13,7 @@ public class FieldNotSetException : Exception { } + public class AmqpMessage : IMessage { public Message NativeMessage { get; } @@ -105,6 +106,19 @@ public IMessage ReplyTo(string id) return this; } + public string To() + { + ThrowIfPropertiesNotSet(); + return NativeMessage.Properties.To; + } + + public IMessage To(string id) + { + EnsureProperties(); + NativeMessage.Properties.To = id; + return this; + } + public string Subject() { ThrowIfPropertiesNotSet(); @@ -132,5 +146,10 @@ public object Annotation(string key) ThrowIfAnnotationsNotSet(); return NativeMessage.MessageAnnotations[new Symbol(key)]; } + + public IMessageAddressBuilder ToAddress() + { + return new MessageAddressBuilder(this); + } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs index 8b2cc2aa..53a23ecf 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs @@ -16,12 +16,12 @@ public class AmqpPublisher : AbstractReconnectLifeCycle, IPublisher { private readonly AmqpConnection _connection; private readonly TimeSpan _timeout; - private readonly string _address; + private readonly string? _address; private readonly Guid _id = Guid.NewGuid(); private SenderLink? _senderLink = null; - public AmqpPublisher(AmqpConnection connection, string address, TimeSpan timeout) + public AmqpPublisher(AmqpConnection connection, string? address, TimeSpan timeout) { _connection = connection; _address = address; diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs index 5ed1f134..93c7cd2a 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs @@ -11,9 +11,9 @@ namespace RabbitMQ.AMQP.Client.Impl public class AmqpPublisherBuilder : IPublisherBuilder { private readonly AmqpConnection _connection; - private string? _exchange; - private string? _key; - private string? _queue; + private string? _exchange = null; + private string? _key = null; + private string? _queue = null; private TimeSpan _timeout = TimeSpan.FromSeconds(10); public AmqpPublisherBuilder(AmqpConnection connection) @@ -55,10 +55,19 @@ public IPublisherBuilder PublishTimeout(TimeSpan timeout) return this; } + private bool IsAnonymous() + { + return string.IsNullOrEmpty(_exchange) && string.IsNullOrEmpty(_queue) && string.IsNullOrEmpty(_key); + } + public async Task BuildAsync(CancellationToken cancellationToken = default) { - string address = new AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); + string? address = null; + if (!IsAnonymous()) + { + address = new AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); + } AmqpPublisher publisher = new(_connection, address, _timeout); diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index a109c245..ce761308 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -181,6 +181,11 @@ RabbitMQ.AMQP.Client.IMessage.ReplyTo() -> string! RabbitMQ.AMQP.Client.IMessage.ReplyTo(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.Subject() -> string! RabbitMQ.AMQP.Client.IMessage.Subject(string! subject) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.To() -> string! +RabbitMQ.AMQP.Client.IMessage.To(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.ToAddress() -> RabbitMQ.AMQP.Client.IMessageAddressBuilder! +RabbitMQ.AMQP.Client.IMessageAddressBuilder +RabbitMQ.AMQP.Client.IMessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.AbstractLifeCycle() -> void RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.ChangeState -> RabbitMQ.AMQP.Client.LifeCycleCallBack? @@ -192,13 +197,7 @@ RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle._disposed -> bool RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle.AbstractReconnectLifeCycle() -> void RabbitMQ.AMQP.Client.Impl.AddressBuilder -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Address() -> string! RabbitMQ.AMQP.Client.Impl.AddressBuilder.AddressBuilder() -> void -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Exchange(string? exchange) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Key(string? key) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! -RabbitMQ.AMQP.Client.Impl.AddressBuilder.Queue(string? queue) -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.AmqpBindingSpecification(RabbitMQ.AMQP.Client.Impl.AmqpManagement! management) -> void RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification! @@ -288,10 +287,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.ReplyTo() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ReplyTo(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Subject() -> string! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Subject(string! subject) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.To() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.To(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.ToAddress() -> RabbitMQ.AMQP.Client.IMessageAddressBuilder! RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException RabbitMQ.AMQP.Client.Impl.AmqpNotOpenException.AmqpNotOpenException(string! message) -> void RabbitMQ.AMQP.Client.Impl.AmqpPublisher -RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string! address, System.TimeSpan timeout) -> void +RabbitMQ.AMQP.Client.Impl.AmqpPublisher.AmqpPublisher(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, string? address, System.TimeSpan timeout) -> void RabbitMQ.AMQP.Client.Impl.AmqpPublisher.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder RabbitMQ.AMQP.Client.Impl.AmqpPublisherBuilder.AmqpPublisherBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void @@ -403,6 +405,15 @@ RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.Handler.set -> void RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.get -> int RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.InitialCredits.set -> void RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration.ListenerContext -> System.Action? +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Address() -> string! +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DefaultAddressBuilder() -> void +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(string? exchangeName) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Key(string? key) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Queue(string? queueName) -> T +RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder._owner -> T? RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.Arguments() -> System.Collections.Generic.Dictionary! RabbitMQ.AMQP.Client.Impl.DefaultQueueInfo.AutoDelete() -> bool @@ -420,6 +431,9 @@ RabbitMQ.AMQP.Client.Impl.InvalidCodeException RabbitMQ.AMQP.Client.Impl.InvalidCodeException.InvalidCodeException(string! message) -> void RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types.Map! filters) -> void +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs index 1f5444fe..0b0fb3e0 100644 --- a/RabbitMQ.AMQP.Client/Utils.cs +++ b/RabbitMQ.AMQP.Client/Utils.cs @@ -119,7 +119,7 @@ internal static void ValidatePositive(string label, long value) // protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST); // break; - internal static Attach CreateAttach(string address, + internal static Attach CreateAttach(string? address, DeliveryMode deliveryMode, Guid linkId, Map? sourceFilter = null) { var attach = new Attach From 1a76b691733025e01691bd4c629e33ff15b7542b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 4 Oct 2024 16:34:59 +0200 Subject: [PATCH 04/17] formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IAddressBuilder.cs | 1 - RabbitMQ.AMQP.Client/IRpcServer.cs | 2 -- RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs | 2 -- RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs | 1 - RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 2 -- 5 files changed, 8 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IAddressBuilder.cs b/RabbitMQ.AMQP.Client/IAddressBuilder.cs index 79f8f385..e8372d69 100644 --- a/RabbitMQ.AMQP.Client/IAddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/IAddressBuilder.cs @@ -24,7 +24,6 @@ public interface IAddressBuilder T Key(string key); } - public interface IMessageAddressBuilder : IAddressBuilder { diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index 89358adc..a5bc4846 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -15,13 +15,11 @@ public interface IRpcServer : ILifeCycle { public IHandler? Handler { get; } - public interface IHandler { IMessage Handle(IContext ctx, IMessage request); } - public IContext Context { get; } public interface IContext diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs index 1161a054..6245f2a0 100644 --- a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -9,13 +9,11 @@ public abstract class DefaultAddressBuilder : IAddressBuilder private string? _key = null; protected T? _owner = default; - public T Exchange(IExchangeSpecification exchangeSpec) { return Exchange(exchangeSpec.ExchangeName); } - public T Exchange(string? exchangeName) { _exchange = exchangeName; diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 05e88e83..2c68ca78 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -13,7 +13,6 @@ public class FieldNotSetException : Exception { } - public class AmqpMessage : IMessage { public Message NativeMessage { get; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index 231f2c47..bfaab50f 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -23,8 +23,6 @@ private class RpcServerContext : IRpcServer.IContext public IMessage Message(object body) => throw new System.NotImplementedException(); } - - public override Task CloseAsync() => throw new System.NotImplementedException(); public IRpcServer.IHandler Handler { get; } = new RpcServerHandler(); From 32cd3632b30fdd6b7a37afbdcc290640d552b729 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 7 Oct 2024 10:34:45 +0200 Subject: [PATCH 05/17] Tests for AnonymousPublisher Signed-off-by: Gabriele Santomaggio --- Tests/AnonymousPublisherTests.cs | 63 ++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 Tests/AnonymousPublisherTests.cs diff --git a/Tests/AnonymousPublisherTests.cs b/Tests/AnonymousPublisherTests.cs new file mode 100644 index 00000000..16e5751a --- /dev/null +++ b/Tests/AnonymousPublisherTests.cs @@ -0,0 +1,63 @@ +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests +{ + public class AnonymousPublisherTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task AnonymousPublisherPublishResultNotAcceptedWhenQueueDoesNotExist() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Queue("DoesNotExist").Build()); + Assert.Equal(OutcomeState.Released, pr.Outcome.State); + } + + [Fact] + public async Task AnonymousPublisherPublishResultNotAcceptedWhenExchangeDoesNotExist() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Exchange("DoesNotExist").Build()); + Assert.Equal(OutcomeState.Released, pr.Outcome.State); + } + + [Fact] + public async Task AnonymousPublisherPublishResultAcceptedWhenQueueExists() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Queue(_queueName).Build()); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + await aPublisher.CloseAsync(); + } + + [Theory] + [InlineData(OutcomeState.Accepted, "myValidKey")] + [InlineData(OutcomeState.Released, "myInvalidKey")] + public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(OutcomeState outcomeState, string key) + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Exchange(_exchangeName).Type(ExchangeType.TOPIC).DeclareAsync(); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + await _management.Binding().SourceExchange(_exchangeName).DestinationQueue(_queueName).Key(key).BindAsync(); + IPublisher aPublisher = await _connection.PublisherBuilder().BuildAsync(); + PublishResult pr = await aPublisher.PublishAsync( + new AmqpMessage("Hello, World!").ToAddress().Exchange(_exchangeName).Key("myValidKey").Build()); + Assert.Equal(outcomeState, pr.Outcome.State); + await aPublisher.CloseAsync(); + } + } +} From 265817ef485374fb26681cfc32629043fa2ce1ab Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 7 Oct 2024 10:49:04 +0200 Subject: [PATCH 06/17] Tests for AnonymousPublisher Signed-off-by: Gabriele Santomaggio --- Tests/AnonymousPublisherTests.cs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/Tests/AnonymousPublisherTests.cs b/Tests/AnonymousPublisherTests.cs index 16e5751a..deb873ec 100644 --- a/Tests/AnonymousPublisherTests.cs +++ b/Tests/AnonymousPublisherTests.cs @@ -1,3 +1,4 @@ +using System.Threading; using System.Threading.Tasks; using RabbitMQ.AMQP.Client; using RabbitMQ.AMQP.Client.Impl; @@ -46,7 +47,8 @@ public async Task AnonymousPublisherPublishResultAcceptedWhenQueueExists() [Theory] [InlineData(OutcomeState.Accepted, "myValidKey")] [InlineData(OutcomeState.Released, "myInvalidKey")] - public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(OutcomeState outcomeState, string key) + public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(OutcomeState outcomeState, + string key) { Assert.NotNull(_connection); Assert.NotNull(_management); @@ -59,5 +61,30 @@ public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(Outc Assert.Equal(outcomeState, pr.Outcome.State); await aPublisher.CloseAsync(); } + + + /// + /// In this test, we are sending a message to a queue that is different from the one defined in the address. + /// So we mix anonymous and defined queues. + /// The winner is the defined queue. So the + /// + [Fact] + public async Task MessageShouldGoToTheDefinedQueueAndNotToTheAddressTo() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Quorum().Queue().DeclareAsync(); + await _management.Queue(_queueName + "2").Quorum().Queue().DeclareAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + PublishResult pr = + await publisher.PublishAsync(new AmqpMessage("Hello, World!").ToAddress().Queue(_queueName + "2") + .Build()); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + Thread.Sleep(200); + await SystemUtils.WaitUntilQueueMessageCount(_queueName, 1); + await SystemUtils.WaitUntilQueueMessageCount(_queueName + "2", 0); + await _management.Queue(_queueName + "2").Quorum().Queue().DeleteAsync(); + } } } From 8acb89f2af5d89ae71edf2d1f94204141d91ce4a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 7 Oct 2024 11:18:38 +0200 Subject: [PATCH 07/17] formatting Signed-off-by: Gabriele Santomaggio --- Tests/AnonymousPublisherTests.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Tests/AnonymousPublisherTests.cs b/Tests/AnonymousPublisherTests.cs index deb873ec..68ccc0bf 100644 --- a/Tests/AnonymousPublisherTests.cs +++ b/Tests/AnonymousPublisherTests.cs @@ -44,6 +44,12 @@ public async Task AnonymousPublisherPublishResultAcceptedWhenQueueExists() await aPublisher.CloseAsync(); } + /// + /// Test when the exchange exists and the key is valid. + /// Released when the key is invalid. + /// + /// + /// [Theory] [InlineData(OutcomeState.Accepted, "myValidKey")] [InlineData(OutcomeState.Released, "myInvalidKey")] @@ -62,7 +68,6 @@ public async Task AnonymousPublisherPublishResultAcceptedWhenExchangeExists(Outc await aPublisher.CloseAsync(); } - /// /// In this test, we are sending a message to a queue that is different from the one defined in the address. /// So we mix anonymous and defined queues. From c9e780e93b67a4f72662937bfa53e8d752c8751f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 7 Oct 2024 11:32:31 +0200 Subject: [PATCH 08/17] remove part of the test that is not valid anymore Signed-off-by: Gabriele Santomaggio --- Tests/PublisherTests.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Tests/PublisherTests.cs b/Tests/PublisherTests.cs index 81b0395c..e7f328ff 100644 --- a/Tests/PublisherTests.cs +++ b/Tests/PublisherTests.cs @@ -21,11 +21,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect Assert.NotNull(_management); await Assert.ThrowsAsync(() => - _connection.PublisherBuilder().Queue("does_not_matter").Exchange("i_should_not_stay_here").BuildAsync()); - - await Assert.ThrowsAsync(() => _connection.PublisherBuilder().Exchange("").BuildAsync()); - - await Assert.ThrowsAsync(() => _connection.PublisherBuilder().Queue("").BuildAsync()); + _connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together"). + Exchange("queue_and_exchange_cant_set_together").BuildAsync()); await _connection.CloseAsync(); Assert.Empty(_connection.GetPublishers()); From 07269a6867524977b00f5457f7f3c87b405f75a4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 7 Oct 2024 16:35:23 +0200 Subject: [PATCH 09/17] First RPC Implementation Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IConnection.cs | 2 + RabbitMQ.AMQP.Client/IRpcServer.cs | 14 ++-- RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs | 5 ++ RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 76 +++++++++++++++++--- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 30 +++++--- Tests/Rpc/RpcServerTests.cs | 35 +++++++++ 6 files changed, 135 insertions(+), 27 deletions(-) create mode 100644 Tests/Rpc/RpcServerTests.cs diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs index 0c944049..6e0298ad 100644 --- a/RabbitMQ.AMQP.Client/IConnection.cs +++ b/RabbitMQ.AMQP.Client/IConnection.cs @@ -26,6 +26,8 @@ public interface IConnection : ILifeCycle IConsumerBuilder ConsumerBuilder(); + IRpcServerBuilder RpcServerBuilder(); + public ReadOnlyCollection GetPublishers(); public ReadOnlyCollection GetConsumers(); diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index a5bc4846..a20fdc7e 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -3,24 +3,20 @@ namespace RabbitMQ.AMQP.Client { + public delegate Task RpcHandler(IRpcServer.IContext context, IMessage request); + public interface IRpcServerBuilder { IRpcServerBuilder RequestQueue(string requestQueue); IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor); - IRpcServerBuilder Handler(IRpcServer.IHandler handler); + IRpcServerBuilder Handler(RpcHandler handler); + + Task BuildAsync(); } public interface IRpcServer : ILifeCycle { - public IHandler? Handler { get; } - - public interface IHandler - { - IMessage Handle(IContext ctx, IMessage request); - } - - public IContext Context { get; } public interface IContext { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index adf4332f..ebe15d98 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -50,6 +50,11 @@ public class AmqpConnection : AbstractLifeCycle, IConnection private readonly TaskCompletionSource _connectionClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + public IRpcServerBuilder RpcServerBuilder() + { + return new AmqpRpcServerBuilder(this); + } + /// /// Read-only collection of publishers. /// See diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index bfaab50f..c47b95cb 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -3,29 +3,85 @@ namespace RabbitMQ.AMQP.Client.Impl { + public class RpcConfiguration + { + public AmqpConnection Connection { get; set; } = null!; + public RpcHandler? Handler { get; set; } + public string RequestQueue { get; set; } = ""; + } + + public class AmqpRpcServerBuilder : IRpcServerBuilder + { + RpcConfiguration _configuration = new RpcConfiguration(); + + public AmqpRpcServerBuilder(AmqpConnection connection) + { + _configuration.Connection = connection; + } + + public IRpcServerBuilder RequestQueue(string requestQueue) + { + _configuration.RequestQueue = requestQueue; + return this; + } + + public IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue) + { + _configuration.RequestQueue = requestQueue.QueueName; + return this; + } + + public IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor) => + throw new NotImplementedException(); + + public IRpcServerBuilder Handler(RpcHandler handler) + { + _configuration.Handler = handler; + return this; + } + + public async Task BuildAsync() + { + AmqpRpcServer amqpRpcServer = new(_configuration); + await amqpRpcServer.OpenAsync().ConfigureAwait(false); + return amqpRpcServer; + } + } + public class AmqpRpcServer : AbstractLifeCycle, IRpcServer { - private AmqpConnection _connection; + private readonly RpcConfiguration _configuration; + private IPublisher? _publisher = null; + private IConsumer? _consumer = null; - public AmqpRpcServer(AmqpConnection connection) + public AmqpRpcServer(RpcConfiguration builder) { - _connection = connection; + _configuration = builder; } - private class RpcServerHandler : IRpcServer.IHandler + public override async Task OpenAsync() { - public IMessage Handle(IRpcServer.IContext ctx, IMessage request) => - throw new System.NotImplementedException(); + _publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false); + + _consumer = await _configuration.Connection.ConsumerBuilder().MessageHandler(async (context, request) => + { + await context.AcceptAsync().ConfigureAwait(false); + if (_configuration.Handler != null) + { + await _configuration.Handler(new RpcServerContext(), request).ConfigureAwait(false); + } + }) + .Queue(_configuration.RequestQueue).BuildAndStartAsync() + .ConfigureAwait(false); + + await base.OpenAsync().ConfigureAwait(false); } private class RpcServerContext : IRpcServer.IContext { - public IMessage Message(object body) => throw new System.NotImplementedException(); + public IMessage Message(object body) => new AmqpMessage(body); } public override Task CloseAsync() => throw new System.NotImplementedException(); - - public IRpcServer.IHandler Handler { get; } = new RpcServerHandler(); - public IRpcServer.IContext Context { get; } = new RpcServerContext(); } } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index b16cb28b..922c450a 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -21,6 +21,7 @@ override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.CloseAsync() -> System.Threadin override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.CloseAsync() -> System.Threading.Tasks.Task! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(object? obj) -> bool override RabbitMQ.AMQP.Client.Impl.ConnectionSettings.GetHashCode() -> int @@ -97,6 +98,7 @@ RabbitMQ.AMQP.Client.IConnection.Id.get -> long RabbitMQ.AMQP.Client.IConnection.Id.set -> void RabbitMQ.AMQP.Client.IConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IConnectionSettings RabbitMQ.AMQP.Client.IConnectionSettings.ContainerId.get -> string! RabbitMQ.AMQP.Client.IConnectionSettings.Host.get -> string! @@ -235,6 +237,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.get -> long RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.set -> void RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpConsumer RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration! configuration) -> void RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void @@ -342,9 +345,14 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> Ra RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpRpcServer -RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void -RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.Context.get -> RabbitMQ.AMQP.Client.IRpcServer.IContext! -RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.Handler.get -> RabbitMQ.AMQP.Client.IRpcServer.IHandler! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! builder) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification! @@ -447,6 +455,14 @@ RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.GetBackOffDelayPolicy() -> Rabbi RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.Impl.RpcConfiguration +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RpcConfiguration() -> void RabbitMQ.AMQP.Client.Impl.StreamOptions RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterMatchUnfiltered(bool matchUnfiltered) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! RabbitMQ.AMQP.Client.Impl.StreamOptions.FilterValues(string![]! values) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions! @@ -522,15 +538,12 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! RabbitMQ.AMQP.Client.IRpcServer -RabbitMQ.AMQP.Client.IRpcServer.Context.get -> RabbitMQ.AMQP.Client.IRpcServer.IContext! -RabbitMQ.AMQP.Client.IRpcServer.Handler.get -> RabbitMQ.AMQP.Client.IRpcServer.IHandler? RabbitMQ.AMQP.Client.IRpcServer.IContext RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.IRpcServer.IHandler -RabbitMQ.AMQP.Client.IRpcServer.IHandler.Handle(RabbitMQ.AMQP.Client.IRpcServer.IContext! ctx, RabbitMQ.AMQP.Client.IMessage! request) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IRpcServerBuilder +RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! -RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.IRpcServer.IHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IStreamSpecification @@ -581,6 +594,7 @@ RabbitMQ.AMQP.Client.QueueType.STREAM = 2 -> RabbitMQ.AMQP.Client.QueueType RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtLeastOnce = 1 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy.AtMostOnce = 0 -> RabbitMQ.AMQP.Client.QuorumQueueDeadLetterStrategy +RabbitMQ.AMQP.Client.RpcHandler RabbitMQ.AMQP.Client.SaslMechanism RabbitMQ.AMQP.Client.SaslMechanism.Equals(RabbitMQ.AMQP.Client.SaslMechanism? other) -> bool RabbitMQ.AMQP.Client.SaslMechanism.Mechanism.get -> string! diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs new file mode 100644 index 00000000..f0a008ee --- /dev/null +++ b/Tests/Rpc/RpcServerTests.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.AMQP.Client; +using RabbitMQ.AMQP.Client.Impl; +using Xunit; +using Xunit.Abstractions; + +namespace Tests.Rpc +{ + public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) + { + [Fact] + public async Task RpcServerPingPong() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message(request.Body()).MessageId("pong_from_the_server"); + tcs.SetResult(m); + return Task.FromResult(m); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); + + await p.PublishAsync(new AmqpMessage("test")); + IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test", m.Body()); + Assert.Equal("pong_from_the_server", m.MessageId()); + } + } +} From 40f55f838462aa0bda43b8c603a934f694e2c93f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 8 Oct 2024 10:41:29 +0200 Subject: [PATCH 10/17] Add replyTo to the RPC server Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 51 +++++++++++++- Tests/Rpc/RpcServerTests.cs | 81 +++++++++++++++++++++- 2 files changed, 128 insertions(+), 4 deletions(-) diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index c47b95cb..7e777b9d 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Amqp; namespace RabbitMQ.AMQP.Client.Impl { @@ -12,7 +13,7 @@ public class RpcConfiguration public class AmqpRpcServerBuilder : IRpcServerBuilder { - RpcConfiguration _configuration = new RpcConfiguration(); + readonly RpcConfiguration _configuration = new RpcConfiguration(); public AmqpRpcServerBuilder(AmqpConnection connection) { @@ -54,6 +55,19 @@ public class AmqpRpcServer : AbstractLifeCycle, IRpcServer private IPublisher? _publisher = null; private IConsumer? _consumer = null; + private async Task SendReply(IMessage reply) + { + if (_publisher != null) + { + PublishResult pr = await _publisher.PublishAsync(reply).ConfigureAwait(false); + if (pr.Outcome.State != OutcomeState.Accepted) + { + Trace.WriteLine(TraceLevel.Error, "Failed to send reply"); + } + + } + } + public AmqpRpcServer(RpcConfiguration builder) { _configuration = builder; @@ -68,7 +82,19 @@ public override async Task OpenAsync() await context.AcceptAsync().ConfigureAwait(false); if (_configuration.Handler != null) { - await _configuration.Handler(new RpcServerContext(), request).ConfigureAwait(false); + IMessage reply = await _configuration.Handler(new RpcServerContext(), request) + .ConfigureAwait(false); + + if (request.ReplyTo() != "") + { + reply.To(request.ReplyTo()); + } + else + { + Trace.WriteLine(TraceLevel.Error, "No reply-to address in request"); + } + + await SendReply(reply).ConfigureAwait(false); } }) .Queue(_configuration.RequestQueue).BuildAndStartAsync() @@ -82,6 +108,25 @@ private class RpcServerContext : IRpcServer.IContext public IMessage Message(object body) => new AmqpMessage(body); } - public override Task CloseAsync() => throw new System.NotImplementedException(); + public override async Task CloseAsync() + { + OnNewStatus(State.Closing, null); + try + { + if (_publisher != null) + { + await _publisher.CloseAsync().ConfigureAwait(false); + } + + if (_consumer != null) + { + await _consumer.CloseAsync().ConfigureAwait(false); + } + } + finally + { + OnNewStatus(State.Closed, null); + } + } } } diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index f0a008ee..c7e991b2 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.AMQP.Client; @@ -11,7 +12,7 @@ namespace Tests.Rpc public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper) { [Fact] - public async Task RpcServerPingPong() + public async Task MockRpcServerPingPong() { Assert.NotNull(_connection); Assert.NotNull(_management); @@ -30,6 +31,84 @@ public async Task RpcServerPingPong() IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.Equal("test", m.Body()); Assert.Equal("pong_from_the_server", m.MessageId()); + await rpcServer.CloseAsync(); + } + + [Fact] + public async Task RpcServerValidateStateChange() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + List<(State, State)> states = []; + await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message(request.Body()); + return Task.FromResult(m); + }).RequestQueue(_queueName).BuildAsync(); + rpcServer.ChangeState += (sender, fromState, toState, e) => + { + states.Add((fromState, toState)); + if (states.Count == 2) + { + tsc.SetResult(); + } + }; + Assert.NotNull(rpcServer); + await rpcServer.CloseAsync(); + await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(State.Open, states[0].Item1); + Assert.Equal(State.Closing, states[0].Item2); + Assert.Equal(State.Closing, states[1].Item1); + Assert.Equal(State.Closed, states[1].Item2); + } + + /// + /// Simulate RPC communication with a publisher + /// + [Fact] + public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message(request.Body()).MessageId("pong_from_the_server"); + return Task.FromResult(m); + }).RequestQueue(requestQueue).BuildAsync(); + + Assert.NotNull(rpcServer); + string queueReplyTo = $"queueReplyTo-{Now}"; + IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true); + await spec.DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler( + async (context, message) => + { + await context.AcceptAsync(); + tcs.SetResult(message); + }).BuildAndStartAsync(); + + IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync(); + Assert.NotNull(publisher); + AddressBuilder addressBuilder = new(); + + IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address()); + PublishResult pr = await publisher.PublishAsync(message); + Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); + + IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal("test", m.Body()); + Assert.Equal("pong_from_the_server", m.MessageId()); + + await spec.DeleteAsync(); + await rpcServer.CloseAsync(); + await consumer.CloseAsync(); + await publisher.CloseAsync(); } } } From eb58c570e8950b21d56f6eb7d6857b0719d722b0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 8 Oct 2024 10:46:04 +0200 Subject: [PATCH 11/17] Add replyTo to the RPC server Signed-off-by: Gabriele Santomaggio --- Tests/Rpc/RpcServerTests.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index c7e991b2..217d220b 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -41,7 +41,7 @@ public async Task RpcServerValidateStateChange() Assert.NotNull(_management); List<(State, State)> states = []; await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync(); - TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource tsc = new(TaskCreationOptions.RunContinuationsAsynchronously); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { var m = context.Message(request.Body()); @@ -52,12 +52,13 @@ public async Task RpcServerValidateStateChange() states.Add((fromState, toState)); if (states.Count == 2) { - tsc.SetResult(); + tsc.SetResult(states.Count); } }; Assert.NotNull(rpcServer); await rpcServer.CloseAsync(); - await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + int count = await tsc.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(2, count); Assert.Equal(State.Open, states[0].Item1); Assert.Equal(State.Closing, states[0].Item2); Assert.Equal(State.Closing, states[1].Item1); From c03bd1b128c9851bb19d460f167b6f708bfbbb94 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 9 Oct 2024 16:02:25 +0200 Subject: [PATCH 12/17] implement rpc client Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IConnection.cs | 2 + RabbitMQ.AMQP.Client/IRpcClient.cs | 25 ++++ RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs | 15 +++ RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs | 6 + RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs | 128 +++++++++++++++++++ RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 35 +++++ Tests/Rpc/RpcServerTests.cs | 29 +++++ 7 files changed, 240 insertions(+) create mode 100644 RabbitMQ.AMQP.Client/IRpcClient.cs create mode 100644 RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs index 6e0298ad..f4aac082 100644 --- a/RabbitMQ.AMQP.Client/IConnection.cs +++ b/RabbitMQ.AMQP.Client/IConnection.cs @@ -28,6 +28,8 @@ public interface IConnection : ILifeCycle IRpcServerBuilder RpcServerBuilder(); + IRpcClientBuilder RpcClientBuilder(); + public ReadOnlyCollection GetPublishers(); public ReadOnlyCollection GetConsumers(); diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs new file mode 100644 index 00000000..978acbf5 --- /dev/null +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace RabbitMQ.AMQP.Client +{ + + public interface IRpcClientAddressBuilder : IAddressBuilder + { + IRpcClientBuilder RpcClient(); + } + + public interface IRpcClientBuilder + { + IRpcClientAddressBuilder RequestAddress(); + IRpcClientBuilder ReplyToQueue(string replyToQueue); + IRpcClientBuilder Timeout(TimeSpan timeout); + Task BuildAsync(); + } + + public interface IRpcClient : ILifeCycle + { + Task PublishAsync(IMessage message, CancellationToken cancellationToken = default); + } +} diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs index 6245f2a0..9700d373 100644 --- a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -114,4 +114,19 @@ public IMessage Build() return _message; } } + + public class RpcClientAddressBuilder : DefaultAddressBuilder, IRpcClientAddressBuilder + { + readonly AmqpRpcClientBuilder _builder; + public RpcClientAddressBuilder(AmqpRpcClientBuilder builder) + { + _builder = builder; + _owner = this; + } + + public IRpcClientBuilder RpcClient() + { + return _builder; + } + } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index ebe15d98..d951bebd 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -55,6 +55,12 @@ public IRpcServerBuilder RpcServerBuilder() return new AmqpRpcServerBuilder(this); } + public IRpcClientBuilder RpcClientBuilder() + { + + return new AmqpRpcClientBuilder(this); + } + /// /// Read-only collection of publishers. /// See diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs new file mode 100644 index 00000000..97662af9 --- /dev/null +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -0,0 +1,128 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Amqp.Types; + +namespace RabbitMQ.AMQP.Client.Impl +{ + public class RpcClientConfiguration + { + public AmqpConnection Connection { get; set; } = null!; + public string ReplyToQueue { get; set; } = ""; + public string RequestAddress { get; set; } = ""; + public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); + } + + public class AmqpRpcClientBuilder : IRpcClientBuilder + { + private readonly RpcClientAddressBuilder _addressBuilder; + private AmqpConnection _connection; + private RpcClientConfiguration _configuration = new(); + + public AmqpRpcClientBuilder(AmqpConnection connection) + { + _connection = connection; + _addressBuilder = new RpcClientAddressBuilder(this); + } + + public IRpcClientAddressBuilder RequestAddress() + { + return _addressBuilder; + } + + public IRpcClientBuilder ReplyToQueue(string replyToQueue) + { + _configuration.ReplyToQueue = replyToQueue; + return this; + } + + public IRpcClientBuilder Timeout(TimeSpan timeout) + { + _configuration.Timeout = timeout; + return this; + } + + public async Task BuildAsync() + { + _configuration.RequestAddress = _addressBuilder.Address(); + _configuration.Connection = _connection; + var rpcClient = new AmqpRpcClient(_configuration); + await rpcClient.OpenAsync().ConfigureAwait(false); + return rpcClient; + } + } + + public class AmqpRpcClient : AbstractLifeCycle, IRpcClient + { + private readonly RpcClientConfiguration _configuration; + private IConsumer? _consumer = null; + private IPublisher? _publisher = null; + private readonly Dictionary> _pendingRequests = new(); + + public AmqpRpcClient(RpcClientConfiguration configuration) + { + _configuration = configuration; + } + + public override async Task OpenAsync() + { + _publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false); + _consumer = await _configuration.Connection.ConsumerBuilder() + .Queue(_configuration.ReplyToQueue) + .MessageHandler(async (context, message) => + { + await context.AcceptAsync().ConfigureAwait(false); + if (_pendingRequests.ContainsKey(message.MessageId())) + { + _pendingRequests[message.MessageId()].SetResult(message); + } + }).BuildAndStartAsync().ConfigureAwait(false); + + await base.OpenAsync().ConfigureAwait(false); + } + + public override async Task CloseAsync() + { + OnNewStatus(State.Closing, null); + try + { + if (_publisher != null) + { + await _publisher.CloseAsync().ConfigureAwait(false); + } + + if (_consumer != null) + { + await _consumer.CloseAsync().ConfigureAwait(false); + } + } + finally + { + OnNewStatus(State.Closed, null); + } + } + + public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) + { + _pendingRequests.Add(message.MessageId(), new TaskCompletionSource()); + if (_publisher != null) + { + PublishResult pr = await _publisher.PublishAsync( + message.ReplyTo(new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) + .To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); + + if (pr.Outcome.State != OutcomeState.Accepted) + { + _pendingRequests[message.MessageId()] + .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); + } + } + + await _pendingRequests[message.MessageId()].Task.WaitAsync(_configuration.Timeout) + .ConfigureAwait(false); + + return await _pendingRequests[message.MessageId()].Task.ConfigureAwait(false); + } + } +} diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 922c450a..3962730e 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -20,6 +20,8 @@ override RabbitMQ.AMQP.Client.Impl.AmqpManagement.ToString() -> string! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.CloseAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.ToString() -> string! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.CloseAsync() -> System.Threading.Tasks.Task! +override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.CloseAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.OpenAsync() -> System.Threading.Tasks.Task! override RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.ToString() -> string! @@ -98,6 +100,7 @@ RabbitMQ.AMQP.Client.IConnection.Id.get -> long RabbitMQ.AMQP.Client.IConnection.Id.set -> void RabbitMQ.AMQP.Client.IConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.IConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IConnectionSettings RabbitMQ.AMQP.Client.IConnectionSettings.ContainerId.get -> string! @@ -237,6 +240,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.get -> long RabbitMQ.AMQP.Client.Impl.AmqpConnection.Id.set -> void RabbitMQ.AMQP.Client.Impl.AmqpConnection.Management() -> RabbitMQ.AMQP.Client.IManagement! RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpConsumer RabbitMQ.AMQP.Client.Impl.AmqpConsumer.AmqpConsumer(RabbitMQ.AMQP.Client.Impl.ConsumerConfiguration! configuration) -> void @@ -344,6 +348,15 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeadLetterStrategy(RabbitMQ.AM RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification! RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServer RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! builder) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder @@ -455,6 +468,19 @@ RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.GetBackOffDelayPolicy() -> Rabbi RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClientAddressBuilder(RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder! builder) -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.get -> string! +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RpcClientConfiguration() -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.get -> System.TimeSpan +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.set -> void @@ -537,6 +563,15 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.GetBackOffDelayPolicy() -> RabbitMQ. RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivate() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration! +RabbitMQ.AMQP.Client.IRpcClient +RabbitMQ.AMQP.Client.IRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcClientAddressBuilder +RabbitMQ.AMQP.Client.IRpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder +RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcServer RabbitMQ.AMQP.Client.IRpcServer.IContext RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 217d220b..fbbdeb1c 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -111,5 +111,34 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() await consumer.CloseAsync(); await publisher.CloseAsync(); } + + [Fact] + public async Task RpcServerClientPingPong() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var m = context.Message("this_come_from_the_server").MessageId(request.MessageId()); + return Task.FromResult(m); + }).RequestQueue(_queueName).BuildAsync(); + Assert.NotNull(rpcServer); + + string replyToQueue = $"replyToQueue-{Now}"; + await _management.Queue(replyToQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .ReplyToQueue(replyToQueue).BuildAsync(); + + IMessage message = new AmqpMessage("test").MessageId("that_just_a_id"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("this_come_from_the_server", response.Body()); + } } } From 37bde3854124a7d0fbede3d594df3806c2e20d16 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 14 Oct 2024 13:12:33 +0200 Subject: [PATCH 13/17] Implement the correlation id Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IMessage.cs | 7 ++- RabbitMQ.AMQP.Client/IRpcClient.cs | 3 ++ RabbitMQ.AMQP.Client/IRpcServer.cs | 1 + RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs | 24 +++++++-- RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs | 57 +++++++++++++++++--- RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 27 ++++++++-- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 24 +++++++-- Tests/AmqpTests.cs | 2 +- Tests/Consumer/StreamConsumerTests.cs | 6 +-- Tests/Rpc/RpcServerTests.cs | 8 ++- 10 files changed, 129 insertions(+), 30 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index 11ebc999..d1ab1d85 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -12,11 +12,14 @@ public interface IMessage public object Body(); // properties - string MessageId(); IMessage MessageId(string id); - string CorrelationId(); + IMessage MessageId(object id); + object MessageId(); + + object CorrelationId(); IMessage CorrelationId(string id); + IMessage CorrelationId(object id); string ReplyTo(); IMessage ReplyTo(string id); diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs index 978acbf5..6bf1db32 100644 --- a/RabbitMQ.AMQP.Client/IRpcClient.cs +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -14,6 +14,9 @@ public interface IRpcClientBuilder { IRpcClientAddressBuilder RequestAddress(); IRpcClientBuilder ReplyToQueue(string replyToQueue); + IRpcClientBuilder CorrelationIdExtractor(Func correlationIdExtractor); + + IRpcClientBuilder CorrelationIdSupplier(Func correlationIdSupplier); IRpcClientBuilder Timeout(TimeSpan timeout); Task BuildAsync(); } diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index a20fdc7e..8b6d79e0 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -10,6 +10,7 @@ public interface IRpcServerBuilder IRpcServerBuilder RequestQueue(string requestQueue); IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor); + IRpcServerBuilder Handler(RpcHandler handler); Task BuildAsync(); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 2c68ca78..2eb285e0 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -64,20 +64,27 @@ public object Body() return NativeMessage.Body; } - public string MessageId() + public object MessageId() { ThrowIfPropertiesNotSet(); - return NativeMessage.Properties.MessageId; + return NativeMessage.Properties.GetMessageId(); } public IMessage MessageId(string id) { EnsureProperties(); - NativeMessage.Properties.MessageId = id; + NativeMessage.Properties.SetMessageId(id); return this; } - public string CorrelationId() + public IMessage MessageId(object id) + { + EnsureProperties(); + NativeMessage.Properties.SetMessageId(id); + return this; + } + + public object CorrelationId() { ThrowIfPropertiesNotSet(); return NativeMessage.Properties.CorrelationId; @@ -86,7 +93,14 @@ public string CorrelationId() public IMessage CorrelationId(string id) { EnsureProperties(); - NativeMessage.Properties.CorrelationId = id; + NativeMessage.Properties.SetCorrelationId(id); + return this; + } + + public IMessage CorrelationId(object id) + { + EnsureProperties(); + NativeMessage.Properties.SetCorrelationId(id); return this; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs index 97662af9..48948339 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -12,13 +12,17 @@ public class RpcClientConfiguration public string ReplyToQueue { get; set; } = ""; public string RequestAddress { get; set; } = ""; public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); + + public Func? CorrelationIdSupplier { get; set; } = null; + + public Func? CorrelationIdExtractor { get; set; } } public class AmqpRpcClientBuilder : IRpcClientBuilder { private readonly RpcClientAddressBuilder _addressBuilder; - private AmqpConnection _connection; - private RpcClientConfiguration _configuration = new(); + private readonly AmqpConnection _connection; + private readonly RpcClientConfiguration _configuration = new(); public AmqpRpcClientBuilder(AmqpConnection connection) { @@ -37,6 +41,18 @@ public IRpcClientBuilder ReplyToQueue(string replyToQueue) return this; } + public IRpcClientBuilder CorrelationIdExtractor(Func correlationIdExtractor) + { + _configuration.CorrelationIdExtractor = correlationIdExtractor; + return this; + } + + public IRpcClientBuilder CorrelationIdSupplier(Func correlationIdSupplier) + { + _configuration.CorrelationIdSupplier = correlationIdSupplier; + return this; + } + public IRpcClientBuilder Timeout(TimeSpan timeout) { _configuration.Timeout = timeout; @@ -58,8 +74,31 @@ public class AmqpRpcClient : AbstractLifeCycle, IRpcClient private readonly RpcClientConfiguration _configuration; private IConsumer? _consumer = null; private IPublisher? _publisher = null; - private readonly Dictionary> _pendingRequests = new(); + private readonly Dictionary> _pendingRequests = new(); + private readonly string _correlationId = Guid.NewGuid().ToString(); + private int _nextCorrelationId = 0; + + private object CorrelationIdSupplier() + { + if (_configuration.CorrelationIdSupplier != null) + { + return _configuration.CorrelationIdSupplier(); + } + return $"{_correlationId}-" + Interlocked.Increment(ref _nextCorrelationId); + } + + private object ExtractCorrelationId(IMessage message) + { + object corr = message.CorrelationId(); + if (_configuration.CorrelationIdExtractor != null) + { + corr = _configuration.CorrelationIdExtractor(message); + } + + return corr; + + } public AmqpRpcClient(RpcClientConfiguration configuration) { _configuration = configuration; @@ -73,9 +112,10 @@ public override async Task OpenAsync() .MessageHandler(async (context, message) => { await context.AcceptAsync().ConfigureAwait(false); - if (_pendingRequests.ContainsKey(message.MessageId())) + object correlationId = ExtractCorrelationId(message); + if (_pendingRequests.TryGetValue(correlationId, out TaskCompletionSource? request)) { - _pendingRequests[message.MessageId()].SetResult(message); + request.SetResult(message); } }).BuildAndStartAsync().ConfigureAwait(false); @@ -105,16 +145,19 @@ public override async Task CloseAsync() public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { + message.MessageId(CorrelationIdSupplier()); + //TODO: use correlation id to match request and response _pendingRequests.Add(message.MessageId(), new TaskCompletionSource()); if (_publisher != null) { PublishResult pr = await _publisher.PublishAsync( - message.ReplyTo(new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) + message.ReplyTo( + new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) .To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); if (pr.Outcome.State != OutcomeState.Accepted) { - _pendingRequests[message.MessageId()] + _pendingRequests[message.CorrelationId()] .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index 7e777b9d..da4399ff 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -9,6 +9,7 @@ public class RpcConfiguration public AmqpConnection Connection { get; set; } = null!; public RpcHandler? Handler { get; set; } public string RequestQueue { get; set; } = ""; + public Func? CorrelationIdExtractor { get; set; } } public class AmqpRpcServerBuilder : IRpcServerBuilder @@ -32,8 +33,11 @@ public IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue) return this; } - public IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor) => - throw new NotImplementedException(); + public IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor) + { + _configuration.CorrelationIdExtractor = correlationIdExtractor; + return this; + } public IRpcServerBuilder Handler(RpcHandler handler) { @@ -64,13 +68,24 @@ private async Task SendReply(IMessage reply) { Trace.WriteLine(TraceLevel.Error, "Failed to send reply"); } + } + } + private object ExtractCorrelationId(IMessage message) + { + object corr = message.MessageId(); + if (_configuration.CorrelationIdExtractor != null) + { + corr = _configuration.CorrelationIdExtractor(message); } + + return corr; + } - public AmqpRpcServer(RpcConfiguration builder) + public AmqpRpcServer(RpcConfiguration configuration) { - _configuration = builder; + _configuration = configuration; } public override async Task OpenAsync() @@ -94,7 +109,9 @@ public override async Task OpenAsync() Trace.WriteLine(TraceLevel.Error, "No reply-to address in request"); } - await SendReply(reply).ConfigureAwait(false); + object correlationId = ExtractCorrelationId(request); + ; + await SendReply(reply.CorrelationId(correlationId)).ConfigureAwait(false); } }) .Queue(_configuration.RequestQueue).BuildAndStartAsync() diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 3962730e..e2905b7c 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -182,9 +182,11 @@ RabbitMQ.AMQP.Client.IMessage RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.Body() -> object! -RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> string! +RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object! +RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.IMessage.MessageId() -> string! +RabbitMQ.AMQP.Client.IMessage.MessageId() -> object! +RabbitMQ.AMQP.Client.IMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.ReplyTo() -> string! RabbitMQ.AMQP.Client.IMessage.ReplyTo(string! id) -> RabbitMQ.AMQP.Client.IMessage! @@ -290,9 +292,11 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! -RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId() -> object! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.NativeMessage.get -> Amqp.Message! RabbitMQ.AMQP.Client.Impl.AmqpMessage.ReplyTo() -> string! @@ -354,11 +358,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessa RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func! correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServer -RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! builder) -> void +RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! configuration) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! @@ -474,6 +480,10 @@ RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClientAddressBuilder(Rabbit RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.set -> void RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.get -> string! RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.set -> void RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.get -> string! @@ -484,6 +494,8 @@ RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection! RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler? RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.get -> string! @@ -569,6 +581,8 @@ RabbitMQ.AMQP.Client.IRpcClientAddressBuilder RabbitMQ.AMQP.Client.IRpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func! correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! diff --git a/Tests/AmqpTests.cs b/Tests/AmqpTests.cs index 91aaf5c7..ee171d9b 100644 --- a/Tests/AmqpTests.cs +++ b/Tests/AmqpTests.cs @@ -108,7 +108,7 @@ async Task MessageHandler(IContext ctx, IMessage msg) try { receivedSubject = msg.Subject(); - messageIds.Add(int.Parse(msg.MessageId())); + messageIds.Add(int.Parse((string)msg.MessageId())); await ctx.AcceptAsync(); if (Interlocked.Increment(ref receivedMessageCount) == messageCount) { diff --git a/Tests/Consumer/StreamConsumerTests.cs b/Tests/Consumer/StreamConsumerTests.cs index a8fbe7d7..b4188fae 100644 --- a/Tests/Consumer/StreamConsumerTests.cs +++ b/Tests/Consumer/StreamConsumerTests.cs @@ -38,7 +38,7 @@ public async Task StreamConsumerBuilderShouldRestartFromTheBeginning() { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } @@ -75,7 +75,7 @@ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfiguration() { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } @@ -111,7 +111,7 @@ public async Task StreamConsumerBuilderShouldStartFromTheListenerConfigurationWh { Interlocked.Increment(ref totalConsumed); await context.AcceptAsync(); - if (message.MessageId() == "9") + if ((string)message.MessageId() == "9") { manualResetEvent.Set(); } diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index fbbdeb1c..9caafe95 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -122,7 +122,8 @@ public async Task RpcServerClientPingPong() TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - var m = context.Message("this_come_from_the_server").MessageId(request.MessageId()); + var m = context.Message("this_come_from_the_server"); + tcs.SetResult(m); return Task.FromResult(m); }).RequestQueue(_queueName).BuildAsync(); Assert.NotNull(rpcServer); @@ -133,12 +134,15 @@ public async Task RpcServerClientPingPong() IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() .Queue(requestQueue) .RpcClient() + .ReplyToQueue(replyToQueue).BuildAsync(); - IMessage message = new AmqpMessage("test").MessageId("that_just_a_id"); + IMessage message = new AmqpMessage("test"); IMessage response = await rpcClient.PublishAsync(message); Assert.Equal("this_come_from_the_server", response.Body()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); } } } From 51e5bc5d11390a8d531a6daf1be277034a79e309 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 14 Oct 2024 14:57:51 +0200 Subject: [PATCH 14/17] Generate reply-to-queue queue when not defined Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IRpcClient.cs | 4 +- RabbitMQ.AMQP.Client/IRpcServer.cs | 2 +- RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs | 15 +++-- RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 2 +- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 12 ++-- Tests/Rpc/RpcServerTests.cs | 63 ++++++++++++++++---- 6 files changed, 73 insertions(+), 25 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs index 6bf1db32..d210702d 100644 --- a/RabbitMQ.AMQP.Client/IRpcClient.cs +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -14,9 +14,9 @@ public interface IRpcClientBuilder { IRpcClientAddressBuilder RequestAddress(); IRpcClientBuilder ReplyToQueue(string replyToQueue); - IRpcClientBuilder CorrelationIdExtractor(Func correlationIdExtractor); + IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor); - IRpcClientBuilder CorrelationIdSupplier(Func correlationIdSupplier); + IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier); IRpcClientBuilder Timeout(TimeSpan timeout); Task BuildAsync(); } diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index 8b6d79e0..bedbbc34 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -9,7 +9,7 @@ public interface IRpcServerBuilder { IRpcServerBuilder RequestQueue(string requestQueue); IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); - IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor); + IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor); IRpcServerBuilder Handler(RpcHandler handler); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs index 48948339..0bbd6385 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -41,13 +41,13 @@ public IRpcClientBuilder ReplyToQueue(string replyToQueue) return this; } - public IRpcClientBuilder CorrelationIdExtractor(Func correlationIdExtractor) + public IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor) { _configuration.CorrelationIdExtractor = correlationIdExtractor; return this; } - public IRpcClientBuilder CorrelationIdSupplier(Func correlationIdSupplier) + public IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier) { _configuration.CorrelationIdSupplier = correlationIdSupplier; return this; @@ -97,8 +97,8 @@ private object ExtractCorrelationId(IMessage message) } return corr; - } + public AmqpRpcClient(RpcClientConfiguration configuration) { _configuration = configuration; @@ -106,6 +106,14 @@ public AmqpRpcClient(RpcClientConfiguration configuration) public override async Task OpenAsync() { + if (string.IsNullOrEmpty(_configuration.ReplyToQueue)) + { + IQueueInfo queueInfo = await _configuration.Connection.Management().Queue().AutoDelete(true) + .Exclusive(true).DeclareAsync() + .ConfigureAwait(false); + _configuration.ReplyToQueue = queueInfo.Name(); + } + _publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false); _consumer = await _configuration.Connection.ConsumerBuilder() .Queue(_configuration.ReplyToQueue) @@ -146,7 +154,6 @@ public override async Task CloseAsync() public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { message.MessageId(CorrelationIdSupplier()); - //TODO: use correlation id to match request and response _pendingRequests.Add(message.MessageId(), new TaskCompletionSource()); if (_publisher != null) { diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index da4399ff..23c25587 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -33,7 +33,7 @@ public IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue) return this; } - public IRpcServerBuilder CorrelationIdExtractor(Func correlationIdExtractor) + public IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor) { _configuration.CorrelationIdExtractor = correlationIdExtractor; return this; diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index e2905b7c..1ad9c83a 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -358,8 +358,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessa RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! -RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! -RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func! correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! @@ -368,7 +368,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl. RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! -RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! @@ -581,8 +581,8 @@ RabbitMQ.AMQP.Client.IRpcClientAddressBuilder RabbitMQ.AMQP.Client.IRpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task! -RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! -RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func! correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! @@ -591,7 +591,7 @@ RabbitMQ.AMQP.Client.IRpcServer.IContext RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IRpcServerBuilder RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! -RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func! correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 9caafe95..2ff88fda 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -112,35 +112,76 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() await publisher.CloseAsync(); } + /// + /// In this test the client has to create a reply queue since is not provided by the user + /// with the ReplyToQueue method + /// [Fact] - public async Task RpcServerClientPingPong() + public async Task RpcServerClientPingPongWithDefault() { Assert.NotNull(_connection); Assert.NotNull(_management); string requestQueue = _queueName; await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - var m = context.Message("this_come_from_the_server"); - tcs.SetResult(m); - return Task.FromResult(m); + Assert.Equal("ping", request.Body()); + var reply = context.Message("pong"); + return Task.FromResult(reply); }).RequestQueue(_queueName).BuildAsync(); Assert.NotNull(rpcServer); - string replyToQueue = $"replyToQueue-{Now}"; - await _management.Queue(replyToQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); - IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() .Queue(requestQueue) .RpcClient() + .BuildAsync(); - .ReplyToQueue(replyToQueue).BuildAsync(); + IMessage message = new AmqpMessage("ping"); + + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } + + /// + /// In this test the client has to use the ReplyToQueue provided by the user + /// + [Fact] + public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + Assert.Equal("ping", request.Body()); + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + .BuildAsync(); + Assert.NotNull(rpcServer); + + // custom replyTo queue + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .CorrelationIdSupplier(() => correlationId) + .ReplyToQueue(replyTo.Name()) + .BuildAsync(); - IMessage message = new AmqpMessage("test"); + IMessage message = new AmqpMessage("ping"); IMessage response = await rpcClient.PublishAsync(message); - Assert.Equal("this_come_from_the_server", response.Body()); + Assert.Equal("pong", response.Body()); + Assert.Equal(correlationId, response.CorrelationId()); await rpcClient.CloseAsync(); await rpcServer.CloseAsync(); } From 2f4ae158cd2201743ba8b49b738cb67fab36d478 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 14 Oct 2024 16:11:35 +0200 Subject: [PATCH 15/17] Request and Reply post Processor Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IRpcClient.cs | 2 ++ RabbitMQ.AMQP.Client/IRpcServer.cs | 2 ++ RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs | 34 +++++++++++++++----- RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs | 15 +++++++-- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 8 +++++ Tests/Rpc/RpcServerTests.cs | 7 ++-- 6 files changed, 54 insertions(+), 14 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRpcClient.cs index d210702d..baf73b64 100644 --- a/RabbitMQ.AMQP.Client/IRpcClient.cs +++ b/RabbitMQ.AMQP.Client/IRpcClient.cs @@ -16,6 +16,8 @@ public interface IRpcClientBuilder IRpcClientBuilder ReplyToQueue(string replyToQueue); IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor); + IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor); + IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier); IRpcClientBuilder Timeout(TimeSpan timeout); Task BuildAsync(); diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IRpcServer.cs index bedbbc34..79d93246 100644 --- a/RabbitMQ.AMQP.Client/IRpcServer.cs +++ b/RabbitMQ.AMQP.Client/IRpcServer.cs @@ -11,6 +11,8 @@ public interface IRpcServerBuilder IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue); IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor); + IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor); + IRpcServerBuilder Handler(RpcHandler handler); Task BuildAsync(); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs index 0bbd6385..a8955f9c 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -16,6 +16,8 @@ public class RpcClientConfiguration public Func? CorrelationIdSupplier { get; set; } = null; public Func? CorrelationIdExtractor { get; set; } + + public Func? RequestPostProcessor { get; set; } } public class AmqpRpcClientBuilder : IRpcClientBuilder @@ -47,6 +49,12 @@ public IRpcClientBuilder CorrelationIdExtractor(Func? correlat return this; } + public IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor) + { + _configuration.RequestPostProcessor = requestPostProcessor; + return this; + } + public IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier) { _configuration.CorrelationIdSupplier = correlationIdSupplier; @@ -99,6 +107,17 @@ private object ExtractCorrelationId(IMessage message) return corr; } + private IMessage PostProcessRequest(IMessage request, object correlationId) + { + if (_configuration.RequestPostProcessor != null) + { + return _configuration.RequestPostProcessor(request, correlationId); + } + + return request.ReplyTo(new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) + .MessageId(correlationId); + } + public AmqpRpcClient(RpcClientConfiguration configuration) { _configuration = configuration; @@ -153,26 +172,25 @@ public override async Task CloseAsync() public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { - message.MessageId(CorrelationIdSupplier()); - _pendingRequests.Add(message.MessageId(), new TaskCompletionSource()); + object correlationId = CorrelationIdSupplier(); + message = PostProcessRequest(message, correlationId); + _pendingRequests.Add(correlationId, new TaskCompletionSource()); if (_publisher != null) { PublishResult pr = await _publisher.PublishAsync( - message.ReplyTo( - new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) - .To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); + message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); if (pr.Outcome.State != OutcomeState.Accepted) { - _pendingRequests[message.CorrelationId()] + _pendingRequests[correlationId] .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); } } - await _pendingRequests[message.MessageId()].Task.WaitAsync(_configuration.Timeout) + await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout) .ConfigureAwait(false); - return await _pendingRequests[message.MessageId()].Task.ConfigureAwait(false); + return await _pendingRequests[correlationId].Task.ConfigureAwait(false); } } } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs index 23c25587..b1cea207 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs @@ -10,6 +10,7 @@ public class RpcConfiguration public RpcHandler? Handler { get; set; } public string RequestQueue { get; set; } = ""; public Func? CorrelationIdExtractor { get; set; } + public Func? ReplyPostProcessor { get; set; } } public class AmqpRpcServerBuilder : IRpcServerBuilder @@ -39,6 +40,12 @@ public IRpcServerBuilder CorrelationIdExtractor(Func? correlat return this; } + public IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor) + { + _configuration.ReplyPostProcessor = replyPostProcessor; + return this; + } + public IRpcServerBuilder Handler(RpcHandler handler) { _configuration.Handler = handler; @@ -80,7 +87,11 @@ private object ExtractCorrelationId(IMessage message) } return corr; + } + private IMessage ReplyPostProcessor(IMessage reply, object correlationId) + { + return _configuration.ReplyPostProcessor != null ? _configuration.ReplyPostProcessor(reply, correlationId) : reply.CorrelationId(correlationId); } public AmqpRpcServer(RpcConfiguration configuration) @@ -110,8 +121,8 @@ public override async Task OpenAsync() } object correlationId = ExtractCorrelationId(request); - ; - await SendReply(reply.CorrelationId(correlationId)).ConfigureAwait(false); + reply = ReplyPostProcessor(reply, correlationId); + await SendReply(reply).ConfigureAwait(false); } }) .Queue(_configuration.RequestQueue).BuildAndStartAsync() diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 1ad9c83a..9a6b1095 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -362,6 +362,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Fun RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServer RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! configuration) -> void @@ -370,6 +371,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQ RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification @@ -488,6 +490,8 @@ RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.get -> string! RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.set -> void RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.get -> string! RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.set -> void +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.set -> void RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RpcClientConfiguration() -> void RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.get -> System.TimeSpan RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.set -> void @@ -498,6 +502,8 @@ RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.get -> System. RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler? RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.set -> void +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.get -> System.Func? +RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.get -> string! RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.set -> void RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RpcConfiguration() -> void @@ -585,6 +591,7 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder! +RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder! RabbitMQ.AMQP.Client.IRpcServer RabbitMQ.AMQP.Client.IRpcServer.IContext @@ -593,6 +600,7 @@ RabbitMQ.AMQP.Client.IRpcServerBuilder RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task! RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! +RabbitMQ.AMQP.Client.IRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder! RabbitMQ.AMQP.Client.IStreamSpecification diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 2ff88fda..a624e21b 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -77,8 +77,8 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - var m = context.Message(request.Body()).MessageId("pong_from_the_server"); - return Task.FromResult(m); + var reply = context.Message(request.Body()).MessageId("pong_from_the_server"); + return Task.FromResult(reply); }).RequestQueue(requestQueue).BuildAsync(); Assert.NotNull(rpcServer); @@ -125,7 +125,6 @@ public async Task RpcServerClientPingPongWithDefault() await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - Assert.Equal("ping", request.Body()); var reply = context.Message("pong"); return Task.FromResult(reply); }).RequestQueue(_queueName).BuildAsync(); @@ -156,7 +155,6 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - Assert.Equal("ping", request.Body()); var reply = context.Message("pong"); return Task.FromResult(reply); }).RequestQueue(_queueName) @@ -174,6 +172,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS .Queue(requestQueue) .RpcClient() .CorrelationIdSupplier(() => correlationId) + .CorrelationIdExtractor(message => message.CorrelationId()) .ReplyToQueue(replyTo.Name()) .BuildAsync(); From 6469f3730d025d20746dfc49e66f15b5c9ce9c48 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 14 Oct 2024 17:22:18 +0200 Subject: [PATCH 16/17] Request and Reply post Processor Signed-off-by: Gabriele Santomaggio --- Tests/Rpc/RpcServerTests.cs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index a624e21b..623f7263 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -20,17 +20,16 @@ public async Task MockRpcServerPingPong() TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - var m = context.Message(request.Body()).MessageId("pong_from_the_server"); - tcs.SetResult(m); - return Task.FromResult(m); + var reply = context.Message("pong"); + tcs.SetResult(reply); + return Task.FromResult(reply); }).RequestQueue(_queueName).BuildAsync(); Assert.NotNull(rpcServer); IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync(); await p.PublishAsync(new AmqpMessage("test")); IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal("test", m.Body()); - Assert.Equal("pong_from_the_server", m.MessageId()); + Assert.Equal("pong", m.Body()); await rpcServer.CloseAsync(); } @@ -77,7 +76,7 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => { - var reply = context.Message(request.Body()).MessageId("pong_from_the_server"); + var reply = context.Message("pong"); return Task.FromResult(reply); }).RequestQueue(requestQueue).BuildAsync(); @@ -103,10 +102,8 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess() Assert.Equal(OutcomeState.Accepted, pr.Outcome.State); IMessage m = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal("test", m.Body()); - Assert.Equal("pong_from_the_server", m.MessageId()); + Assert.Equal("pong", m.Body()); - await spec.DeleteAsync(); await rpcServer.CloseAsync(); await consumer.CloseAsync(); await publisher.CloseAsync(); From 444ec2aff6b9c602901051560e2f328bae977c2b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 15 Oct 2024 10:05:42 +0200 Subject: [PATCH 17/17] Request and Reply post Processor tests Signed-off-by: Gabriele Santomaggio --- RabbitMQ.AMQP.Client/IMessage.cs | 2 + RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs | 5 ++ .../Impl/AmqpConsumerBuilder.cs | 2 +- RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs | 13 ++++ .../Impl/AmqpPublisherBuilder.cs | 2 +- RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs | 8 +-- RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 6 ++ Tests/Rpc/RpcServerTests.cs | 72 +++++++++++++++++++ 8 files changed, 104 insertions(+), 6 deletions(-) diff --git a/RabbitMQ.AMQP.Client/IMessage.cs b/RabbitMQ.AMQP.Client/IMessage.cs index d1ab1d85..00b7d289 100644 --- a/RabbitMQ.AMQP.Client/IMessage.cs +++ b/RabbitMQ.AMQP.Client/IMessage.cs @@ -29,6 +29,8 @@ public interface IMessage string Subject(); IMessage Subject(string subject); + IMessage GroupId(string groupId); + string GroupId(); public IMessage Annotation(string key, object value); diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs index 9700d373..cf8a2d17 100644 --- a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs @@ -98,6 +98,11 @@ public AddressBuilder() } } + public static class AddressBuilderHelper + { + public static AddressBuilder AddressBuilder() => new(); + } + public class MessageAddressBuilder : DefaultAddressBuilder, IMessageAddressBuilder { private readonly IMessage _message; diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs index 5b9b720b..9da78b72 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs @@ -43,7 +43,7 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec) public IConsumerBuilder Queue(string queueName) { - string address = new AddressBuilder().Queue(queueName).Address(); + string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address(); _configuration.Address = address; return this; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index 2eb285e0..8cc9b54c 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -143,6 +143,19 @@ public IMessage Subject(string subject) return this; } + public IMessage GroupId(string groupId) + { + EnsureProperties(); + NativeMessage.Properties.GroupId = groupId; + return this; + } + + public string GroupId() + { + ThrowIfPropertiesNotSet(); + return NativeMessage.Properties.GroupId; + } + // Annotations public IMessage Annotation(string key, object value) diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs index b29bae2e..8a03f882 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs @@ -65,7 +65,7 @@ public async Task BuildAsync(CancellationToken cancellationToken = d string? address = null; if (!IsAnonymous()) { - address = new AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); + address = AddressBuilderHelper.AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address(); } AmqpPublisher publisher = new(_connection, address, _timeout); diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs index a8955f9c..a65b78a0 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs @@ -107,14 +107,14 @@ private object ExtractCorrelationId(IMessage message) return corr; } - private IMessage PostProcessRequest(IMessage request, object correlationId) + private IMessage RequestPostProcess(IMessage request, object correlationId) { if (_configuration.RequestPostProcessor != null) { return _configuration.RequestPostProcessor(request, correlationId); } - return request.ReplyTo(new AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) + return request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(_configuration.ReplyToQueue).Address()) .MessageId(correlationId); } @@ -173,8 +173,8 @@ public override async Task CloseAsync() public async Task PublishAsync(IMessage message, CancellationToken cancellationToken = default) { object correlationId = CorrelationIdSupplier(); - message = PostProcessRequest(message, correlationId); - _pendingRequests.Add(correlationId, new TaskCompletionSource()); + message = RequestPostProcess(message, correlationId); + _pendingRequests.Add(correlationId, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); if (_publisher != null) { PublishResult pr = await _publisher.PublishAsync( diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index 9a6b1095..0e9f5c19 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -185,6 +185,8 @@ RabbitMQ.AMQP.Client.IMessage.Body() -> object! RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object! RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.IMessage.GroupId() -> string! +RabbitMQ.AMQP.Client.IMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.MessageId() -> object! RabbitMQ.AMQP.Client.IMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.IMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! @@ -209,6 +211,7 @@ RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle RabbitMQ.AMQP.Client.Impl.AbstractReconnectLifeCycle.AbstractReconnectLifeCycle() -> void RabbitMQ.AMQP.Client.Impl.AddressBuilder RabbitMQ.AMQP.Client.Impl.AddressBuilder.AddressBuilder() -> void +RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.AmqpBindingSpecification(RabbitMQ.AMQP.Client.Impl.AmqpManagement! management) -> void RabbitMQ.AMQP.Client.Impl.AmqpBindingSpecification.Argument(string! key, object! value) -> RabbitMQ.AMQP.Client.IBindingSpecification! @@ -295,6 +298,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(string! id) -> RabbitMQ.AMQP.Client.IMessage! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId() -> string! +RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId() -> object! RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(object! id) -> RabbitMQ.AMQP.Client.IMessage! RabbitMQ.AMQP.Client.Impl.AmqpMessage.MessageId(string! id) -> RabbitMQ.AMQP.Client.IMessage! @@ -670,6 +675,7 @@ static RabbitMQ.AMQP.Client.ByteCapacity.Gb(long gigabytes) -> RabbitMQ.AMQP.Cli static RabbitMQ.AMQP.Client.ByteCapacity.Kb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Mb(long megabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! static RabbitMQ.AMQP.Client.ByteCapacity.Tb(long terabytes) -> RabbitMQ.AMQP.Client.ByteCapacity! +static RabbitMQ.AMQP.Client.Impl.AddressBuilderHelper.AddressBuilder() -> RabbitMQ.AMQP.Client.Impl.AddressBuilder! static RabbitMQ.AMQP.Client.Impl.AmqpConnection.CreateAsync(RabbitMQ.AMQP.Client.IConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task! static RabbitMQ.AMQP.Client.Impl.AmqpEnvironment.CreateAsync(RabbitMQ.AMQP.Client.IConnectionSettings! connectionSettings) -> System.Threading.Tasks.Task! static RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy.Create() -> RabbitMQ.AMQP.Client.Impl.BackOffDelayPolicy! diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/Rpc/RpcServerTests.cs index 623f7263..8fee5c32 100644 --- a/Tests/Rpc/RpcServerTests.cs +++ b/Tests/Rpc/RpcServerTests.cs @@ -181,5 +181,77 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS await rpcClient.CloseAsync(); await rpcServer.CloseAsync(); } + + /// + /// This test combine all the features with the overriding of the request and response post processor + /// the correlation id supplier and the extraction of the correlationId. + /// Here the client uses the replyTo queue provided by the user and the correlationId supplier + /// the field "Subject" is used as correlationId + /// The server uses the field "GroupId" as correlationId + /// Both use the extraction correlationId to get the correlationId + /// + /// The fields "Subject" and "GroupId" are used ONLY for test. + /// You should not use these fields for this purpose. + /// + /// + + [Fact] + public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor() + { + Assert.NotNull(_connection); + Assert.NotNull(_management); + string requestQueue = _queueName; + await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync(); + IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) => + { + var reply = context.Message("pong"); + return Task.FromResult(reply); + }).RequestQueue(_queueName) + //come from the client + .CorrelationIdExtractor(message => message.Subject()) + // replace the correlation id location with GroupId + .ReplyPostProcessor((reply, replyCorrelationId) => reply.GroupId( + replyCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + Assert.NotNull(rpcServer); + + IQueueInfo replyTo = + await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync(); + + // custom correlationId supplier + const string correlationId = "my-correlation-id"; + int correlationIdCounter = 0; + + IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress() + .Queue(requestQueue) + .RpcClient() + .ReplyToQueue(replyTo.Name()) + // replace the correlation id creation with a custom function + .CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}") + // The server will reply with the correlation id in the groupId + // This is only for testing. You should not use the groupId for this. + .CorrelationIdExtractor(message => message.GroupId()) + // The client will use Subject to store the correlation id + // this is only for testing. You should not use Subject for this. + .RequestPostProcessor((request, requestCorrelationId) + => request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(replyTo.Name()).Address()) + .Subject(requestCorrelationId.ToString() ?? throw new InvalidOperationException())) + .BuildAsync(); + + IMessage message = new AmqpMessage("ping"); + + int i = 1; + while (i < 30) + { + IMessage response = await rpcClient.PublishAsync(message); + Assert.Equal("pong", response.Body()); + // the server replies with the correlation id in the GroupId field + Assert.Equal($"{correlationId}_{i}", response.GroupId()); + i++; + } + + await rpcClient.CloseAsync(); + await rpcServer.CloseAsync(); + } } }