Skip to content

Commit 098d126

Browse files
sungam3rPliner
authored andcommitted
Dispose consumer cancellation for RPC response queue (#969)
* fixes #968 * relates to #967
1 parent 5a3e1b9 commit 098d126

File tree

1 file changed

+32
-11
lines changed
  • Source/EasyNetQ/Producer

1 file changed

+32
-11
lines changed

Source/EasyNetQ/Producer/Rpc.cs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class Rpc : IRpc
2323
private readonly ITypeNameSerializer typeNameSerializer;
2424

2525
private readonly object responseQueuesAddLock = new object();
26-
private readonly ConcurrentDictionary<RpcKey, string> responseQueues = new ConcurrentDictionary<RpcKey, string>();
26+
private readonly ConcurrentDictionary<RpcKey, ResponseQueueWithCancellation> responseQueues = new ConcurrentDictionary<RpcKey, ResponseQueueWithCancellation>();
2727
private readonly ConcurrentDictionary<string, ResponseAction> responseActions = new ConcurrentDictionary<string, ResponseAction>();
2828

2929
protected readonly TimeSpan disablePeriodicSignaling = TimeSpan.FromMilliseconds(-1);
@@ -63,11 +63,19 @@ public Rpc(
6363

6464
private void OnConnectionCreated(ConnectionCreatedEvent @event)
6565
{
66+
var copyOfResponseQueues = responseQueues.Values;
67+
responseQueues.Clear();
68+
6669
var copyOfResponseActions = responseActions.Values;
6770
responseActions.Clear();
68-
responseQueues.Clear();
6971

70-
// retry in-flight requests.
72+
// call consumer cancellations
73+
foreach (var queueWithCancellation in copyOfResponseQueues)
74+
{
75+
queueWithCancellation.Cancellation.Dispose();
76+
}
77+
78+
// finish in-flight requests
7179
foreach (var responseAction in copyOfResponseActions)
7280
{
7381
responseAction.OnFailure();
@@ -85,7 +93,11 @@ public virtual Task<TResponse> Request<TRequest, TResponse>(TRequest request, Ac
8593
var configuration = new RequestConfiguration();
8694
configure(configuration);
8795

96+
#if NETFX && !NET46
8897
var tcs = new TaskCompletionSource<TResponse>();
98+
#else
99+
var tcs = new TaskCompletionSource<TResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
100+
#endif
89101

90102
var timeout = timeoutStrategy.GetTimeoutSeconds(requestType);
91103
Timer timer = null;
@@ -154,13 +166,12 @@ protected virtual string SubscribeToResponse<TRequest, TResponse>()
154166
{
155167
var responseType = typeof(TResponse);
156168
var rpcKey = new RpcKey { Request = typeof(TRequest), Response = responseType };
157-
string queueName;
158-
if (responseQueues.TryGetValue(rpcKey, out queueName))
159-
return queueName;
169+
if (responseQueues.TryGetValue(rpcKey, out ResponseQueueWithCancellation queueWithCancellation))
170+
return queueWithCancellation.QueueName;
160171
lock (responseQueuesAddLock)
161172
{
162-
if (responseQueues.TryGetValue(rpcKey, out queueName))
163-
return queueName;
173+
if (responseQueues.TryGetValue(rpcKey, out queueWithCancellation))
174+
return queueWithCancellation.QueueName;
164175

165176
var queue = advancedBus.QueueDeclare(
166177
conventions.RpcReturnQueueNamingConvention(),
@@ -174,15 +185,15 @@ protected virtual string SubscribeToResponse<TRequest, TResponse>()
174185
queue,
175186
queue.Name);
176187

177-
advancedBus.Consume<TResponse>(queue, (message, messageReceivedInfo) => Task.Factory.StartNew(() =>
188+
var cancellation = advancedBus.Consume<TResponse>(queue, (message, messageReceivedInfo) => Task.Factory.StartNew(() =>
178189
{
179190
ResponseAction responseAction;
180191
if (responseActions.TryRemove(message.Properties.CorrelationId, out responseAction))
181192
{
182193
responseAction.OnSuccess(message);
183194
}
184195
}));
185-
responseQueues.TryAdd(rpcKey, queue.Name);
196+
responseQueues.TryAdd(rpcKey, new ResponseQueueWithCancellation { QueueName = queue.Name, Cancellation = cancellation });
186197
return queue.Name;
187198
}
188199
}
@@ -193,6 +204,12 @@ protected struct RpcKey
193204
public Type Response;
194205
}
195206

207+
protected struct ResponseQueueWithCancellation
208+
{
209+
public string QueueName;
210+
public IDisposable Cancellation;
211+
}
212+
196213
protected class ResponseAction
197214
{
198215
public Action<object> OnSuccess { get; set; }
@@ -257,7 +274,11 @@ protected Task ExecuteResponder<TRequest, TResponse>(Func<TRequest, Task<TRespon
257274
where TRequest : class
258275
where TResponse : class
259276
{
277+
#if NETFX && !NET46
260278
var tcs = new TaskCompletionSource<object>();
279+
#else
280+
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
281+
#endif
261282

262283
try
263284
{
@@ -344,4 +365,4 @@ private IExchange DeclareAndBindRpcExchange(string exchangeName, IQueue queue, s
344365
return exchange;
345366
}
346367
}
347-
}
368+
}

0 commit comments

Comments
 (0)