|
1 | 1 | using System; |
| 2 | +using System.Collections.Concurrent; |
2 | 3 | using System.Collections.Generic; |
3 | 4 | using System.Threading; |
4 | 5 | using System.Threading.Tasks; |
@@ -37,9 +38,15 @@ public IRpcClientAddressBuilder RequestAddress() |
37 | 38 | return _addressBuilder; |
38 | 39 | } |
39 | 40 |
|
40 | | - public IRpcClientBuilder ReplyToQueue(string replyToQueue) |
| 41 | + public IRpcClientBuilder ReplyToQueue(string replyToQueueName) |
41 | 42 | { |
42 | | - _configuration.ReplyToQueue = replyToQueue; |
| 43 | + _configuration.ReplyToQueue = replyToQueueName; |
| 44 | + return this; |
| 45 | + } |
| 46 | + |
| 47 | + public IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue) |
| 48 | + { |
| 49 | + _configuration.ReplyToQueue = replyToQueue.QueueName; |
43 | 50 | return this; |
44 | 51 | } |
45 | 52 |
|
@@ -77,13 +84,24 @@ public async Task<IRpcClient> BuildAsync() |
77 | 84 | } |
78 | 85 | } |
79 | 86 |
|
| 87 | + /// <summary> |
| 88 | + /// AmqpRpcClient is an implementation of <see cref="IRpcClient"/>. |
| 89 | + /// It is a wrapper around <see cref="IPublisher"/> and <see cref="IConsumer"/> to create an RPC client over AMQP 1.0. |
| 90 | + /// even the PublishAsync is async the RPClient blocks the thread until the response is received. |
| 91 | + /// within the timeout. |
| 92 | + /// |
| 93 | + /// The PublishAsync is thread-safe and can be called from multiple threads. |
| 94 | + /// |
| 95 | + /// See also the server side <see cref="IRpcServer"/>. |
| 96 | + /// </summary> |
80 | 97 | public class AmqpRpcClient : AbstractLifeCycle, IRpcClient |
81 | 98 | { |
82 | 99 | private readonly RpcClientConfiguration _configuration; |
83 | 100 | private IConsumer? _consumer = null; |
84 | 101 | private IPublisher? _publisher = null; |
85 | | - private readonly Dictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new(); |
| 102 | + private readonly ConcurrentDictionary<object, TaskCompletionSource<IMessage>> _pendingRequests = new(); |
86 | 103 | private readonly string _correlationId = Guid.NewGuid().ToString(); |
| 104 | + private readonly SemaphoreSlim _semaphore = new(1, 1); |
87 | 105 | private int _nextCorrelationId = 0; |
88 | 106 |
|
89 | 107 | private object CorrelationIdSupplier() |
@@ -170,27 +188,42 @@ public override async Task CloseAsync() |
170 | 188 | } |
171 | 189 | } |
172 | 190 |
|
| 191 | + /// <summary> |
| 192 | + /// PublishAsync sends a request message to the server and blocks the thread until the response is received. |
| 193 | + /// </summary> |
| 194 | + /// <param name="message"> The request message</param> |
| 195 | + /// <param name="cancellationToken">Cancellation token</param> |
| 196 | + /// <returns></returns> |
173 | 197 | public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default) |
174 | 198 | { |
175 | | - object correlationId = CorrelationIdSupplier(); |
176 | | - message = RequestPostProcess(message, correlationId); |
177 | | - _pendingRequests.Add(correlationId, new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously)); |
178 | | - if (_publisher != null) |
| 199 | + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); |
| 200 | + try |
179 | 201 | { |
180 | | - PublishResult pr = await _publisher.PublishAsync( |
181 | | - message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); |
182 | | - |
183 | | - if (pr.Outcome.State != OutcomeState.Accepted) |
| 202 | + object correlationId = CorrelationIdSupplier(); |
| 203 | + message = RequestPostProcess(message, correlationId); |
| 204 | + _pendingRequests.TryAdd(correlationId, |
| 205 | + new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously)); |
| 206 | + if (_publisher != null) |
184 | 207 | { |
185 | | - _pendingRequests[correlationId] |
186 | | - .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); |
| 208 | + PublishResult pr = await _publisher.PublishAsync( |
| 209 | + message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false); |
| 210 | + |
| 211 | + if (pr.Outcome.State != OutcomeState.Accepted) |
| 212 | + { |
| 213 | + _pendingRequests[correlationId] |
| 214 | + .SetException(new Exception($"Failed to send request state: {pr.Outcome.State}")); |
| 215 | + } |
187 | 216 | } |
188 | | - } |
189 | 217 |
|
190 | | - await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout) |
191 | | - .ConfigureAwait(false); |
| 218 | + await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout) |
| 219 | + .ConfigureAwait(false); |
192 | 220 |
|
193 | | - return await _pendingRequests[correlationId].Task.ConfigureAwait(false); |
| 221 | + return await _pendingRequests[correlationId].Task.ConfigureAwait(false); |
| 222 | + } |
| 223 | + finally |
| 224 | + { |
| 225 | + _semaphore.Release(); |
| 226 | + } |
194 | 227 | } |
195 | 228 | } |
196 | 229 | } |
0 commit comments