Skip to content

Commit f1f4135

Browse files
committed
* Consolidate creation of TaskCompletionSource objects to ensure correct TaskCreationOptions are used.
1 parent c58f4fb commit f1f4135

File tree

11 files changed

+29
-24
lines changed

11 files changed

+29
-24
lines changed

RabbitMQ.AMQP.Client/Extensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancella
5656

5757
private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
5858
{
59-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
59+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
6060

6161
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
6262
state: cancellationTokenTcs, useSynchronizationContext: false))
@@ -73,7 +73,7 @@ private static async Task DoWaitAsync(this Task task, CancellationToken cancella
7373

7474
private static async Task<T0> DoWaitGenericAsync<T0>(this Task<T0> task, CancellationToken cancellationToken)
7575
{
76-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
76+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
7777

7878
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
7979
state: cancellationTokenTcs, useSynchronizationContext: false))

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ internal void RemoveConsumer(Guid id)
9696
}
9797

9898
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
99-
new(TaskCreationOptions.RunContinuationsAsynchronously);
99+
Utils.CreateTaskCompletionSource<bool>();
100100

101101
public IRpcServerBuilder RpcServerBuilder()
102102
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public override async Task OpenAsync()
4646
try
4747
{
4848
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
49-
new(TaskCreationOptions.RunContinuationsAsynchronously);
49+
Utils.CreateTaskCompletionSource<ReceiverLink>();
5050

5151
// this is an event to get the filters to the listener context
5252
// it _must_ be here because in case of reconnect the original filters could be not valid anymore

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
4747
private const string ReplyTo = "$me";
4848

4949
protected readonly TaskCompletionSource<bool> _managementSessionClosedTcs =
50-
new(TaskCreationOptions.RunContinuationsAsynchronously);
50+
Utils.CreateTaskCompletionSource<bool>();
5151

5252
internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
5353
{
@@ -269,7 +269,7 @@ private async Task EnsureReceiverLinkAsync()
269269
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
270270
};
271271

272-
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
272+
TaskCompletionSource<ReceiverLink> tcs = Utils.CreateTaskCompletionSource<ReceiverLink>();
273273
var tmpReceiverLink = new ReceiverLink(
274274
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
275275
{
@@ -328,7 +328,7 @@ private async Task EnsureSenderLinkAsync()
328328
},
329329
};
330330

331-
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
331+
TaskCompletionSource<SenderLink> tcs = Utils.CreateTaskCompletionSource<SenderLink>();
332332
var tmpSenderLink = new SenderLink(
333333
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
334334
{
@@ -425,10 +425,10 @@ internal async Task<Message> RequestAsync(Message message, int[] expectedRespons
425425
// TODO: make the timeout configurable
426426
TimeSpan timeout = argTimeout ?? TimeSpan.FromSeconds(30);
427427

428-
TaskCompletionSource<Message> mre = new(TaskCreationOptions.RunContinuationsAsynchronously);
428+
TaskCompletionSource<Message> tcs = Utils.CreateTaskCompletionSource<Message>();
429429

430430
// Add TaskCompletionSource to the dictionary it will be used to set the result of the request
431-
if (false == _requests.TryAdd(message.Properties.MessageId, mre))
431+
if (false == _requests.TryAdd(message.Properties.MessageId, tcs))
432432
{
433433
// TODO what to do in this error case?
434434
}
@@ -461,7 +461,7 @@ void RequestTimeoutAction()
461461

462462
// The response is handled in a separate thread, see ProcessResponses method in the Init method
463463
// TODO timeout & token
464-
Message result = await mre.Task.WaitAsync(linkedCts.Token)
464+
Message result = await tcs.Task.WaitAsync(linkedCts.Token)
465465
.ConfigureAwait(false);
466466

467467
await sendTask.WaitAsync(linkedCts.Token)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public override async Task OpenAsync()
3535
try
3636
{
3737
TaskCompletionSource<SenderLink> attachCompletedTcs =
38-
new(TaskCreationOptions.RunContinuationsAsynchronously);
38+
Utils.CreateTaskCompletionSource<SenderLink>();
3939

4040
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);
4141

@@ -122,7 +122,8 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
122122
try
123123
{
124124
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
125-
new(TaskCreationOptions.RunContinuationsAsynchronously);
125+
Utils.CreateTaskCompletionSource<PublishOutcome>();
126+
126127
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
127128

128129
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)

RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,7 @@ public async Task<IMessage> PublishAsync(IMessage message, CancellationToken can
198198
{
199199
object correlationId = CorrelationIdSupplier();
200200
message = RequestPostProcess(message, correlationId);
201-
_pendingRequests.TryAdd(correlationId,
202-
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
201+
_pendingRequests.TryAdd(correlationId, Utils.CreateTaskCompletionSource<IMessage>());
203202
if (_publisher != null)
204203
{
205204
PublishResult pr = await _publisher.PublishAsync(

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal async Task<Session> GetOrCreateSessionAsync()
3434
}
3535
else
3636
{
37-
TaskCompletionSource<ISession> sessionBeginTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
37+
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
3838
void OnBegin(ISession session, Begin peerBegin)
3939
{
4040
sessionBeginTcs.SetResult(session);

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System;
66
using System.Collections.Generic;
7+
using System.Runtime.CompilerServices;
78
using System.Security.Cryptography;
89
using System.Text;
910
using System.Threading.Tasks;
@@ -39,6 +40,12 @@ internal static int RandomNext(int minValue = 0, int maxValue = 1024)
3940
#endif
4041
}
4142

43+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
44+
internal static TaskCompletionSource<T> CreateTaskCompletionSource<T>()
45+
{
46+
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
47+
}
48+
4249
internal static string GenerateQueueName()
4350
{
4451
return GenerateName(DefaultPrefix);

Tests/Consumer/ConsumerOutcomeTests.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,7 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont
258258

259259
const string annotationKey = "x-opt-annotation-key";
260260
const string annotationValue = "annotation-value";
261-
TaskCompletionSource<bool> tcs =
262-
new(TaskCreationOptions.RunContinuationsAsynchronously);
261+
TaskCompletionSource<bool> tcs = CreateTaskCompletionSource();
263262
IPublisher publisher = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
264263
IConsumer consumer = await _connection.ConsumerBuilder()
265264
.MessageHandler((context, _) =>
@@ -279,8 +278,7 @@ public async Task DiscardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndCont
279278
await consumer.CloseAsync();
280279
consumer.Dispose();
281280

282-
TaskCompletionSource<IMessage> tcsDl =
283-
new(TaskCreationOptions.RunContinuationsAsynchronously);
281+
TaskCompletionSource<IMessage> tcsDl = CreateTaskCompletionSource<IMessage>();
284282
IConsumer dlConsumer = await _connection.ConsumerBuilder()
285283
.MessageHandler((context, message1) =>
286284
{

Tests/IntegrationTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ protected static TaskCompletionSource<bool> CreateTaskCompletionSource()
153153

154154
protected static TaskCompletionSource<T> CreateTaskCompletionSource<T>()
155155
{
156-
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
156+
return Utils.CreateTaskCompletionSource<T>();
157157
}
158158

159159
protected Task<T> WhenTcsCompletes<T>(TaskCompletionSource<T> tcs)

0 commit comments

Comments
 (0)