Skip to content

Commit f57ae22

Browse files
authored
Merge pull request #97 from rabbitmq/lukebakken/test-flakes
Address GHA test flakes
2 parents b1c3a58 + 6bab211 commit f57ae22

30 files changed

+353
-283
lines changed

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.1.2",
3-
"rabbitmq": "4.0.3"
3+
"rabbitmq": "4.0.4"
44
}

RabbitMQ.AMQP.Client/Extensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancella
5656

5757
private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
5858
{
59-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
59+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
6060

6161
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
6262
state: cancellationTokenTcs, useSynchronizationContext: false))
@@ -73,7 +73,7 @@ private static async Task DoWaitAsync(this Task task, CancellationToken cancella
7373

7474
private static async Task<T0> DoWaitGenericAsync<T0>(this Task<T0> task, CancellationToken cancellationToken)
7575
{
76-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
76+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
7777

7878
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
7979
state: cancellationTokenTcs, useSynchronizationContext: false))

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ internal void RemoveConsumer(Guid id)
9696
}
9797

9898
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
99-
new(TaskCreationOptions.RunContinuationsAsynchronously);
99+
Utils.CreateTaskCompletionSource<bool>();
100100

101101
public IRpcServerBuilder RpcServerBuilder()
102102
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public override async Task OpenAsync()
4646
try
4747
{
4848
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
49-
new(TaskCreationOptions.RunContinuationsAsynchronously);
49+
Utils.CreateTaskCompletionSource<ReceiverLink>();
5050

5151
// this is an event to get the filters to the listener context
5252
// it _must_ be here because in case of reconnect the original filters could be not valid anymore

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
4747
private const string ReplyTo = "$me";
4848

4949
protected readonly TaskCompletionSource<bool> _managementSessionClosedTcs =
50-
new(TaskCreationOptions.RunContinuationsAsynchronously);
50+
Utils.CreateTaskCompletionSource<bool>();
5151

5252
internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
5353
{
@@ -269,7 +269,7 @@ private async Task EnsureReceiverLinkAsync()
269269
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
270270
};
271271

272-
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
272+
TaskCompletionSource<ReceiverLink> tcs = Utils.CreateTaskCompletionSource<ReceiverLink>();
273273
var tmpReceiverLink = new ReceiverLink(
274274
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
275275
{
@@ -328,7 +328,7 @@ private async Task EnsureSenderLinkAsync()
328328
},
329329
};
330330

331-
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
331+
TaskCompletionSource<SenderLink> tcs = Utils.CreateTaskCompletionSource<SenderLink>();
332332
var tmpSenderLink = new SenderLink(
333333
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
334334
{
@@ -425,10 +425,10 @@ internal async Task<Message> RequestAsync(Message message, int[] expectedRespons
425425
// TODO: make the timeout configurable
426426
TimeSpan timeout = argTimeout ?? TimeSpan.FromSeconds(30);
427427

428-
TaskCompletionSource<Message> mre = new(TaskCreationOptions.RunContinuationsAsynchronously);
428+
TaskCompletionSource<Message> tcs = Utils.CreateTaskCompletionSource<Message>();
429429

430430
// Add TaskCompletionSource to the dictionary it will be used to set the result of the request
431-
if (false == _requests.TryAdd(message.Properties.MessageId, mre))
431+
if (false == _requests.TryAdd(message.Properties.MessageId, tcs))
432432
{
433433
// TODO what to do in this error case?
434434
}
@@ -461,7 +461,7 @@ void RequestTimeoutAction()
461461

462462
// The response is handled in a separate thread, see ProcessResponses method in the Init method
463463
// TODO timeout & token
464-
Message result = await mre.Task.WaitAsync(linkedCts.Token)
464+
Message result = await tcs.Task.WaitAsync(linkedCts.Token)
465465
.ConfigureAwait(false);
466466

467467
await sendTask.WaitAsync(linkedCts.Token)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public override async Task OpenAsync()
3535
try
3636
{
3737
TaskCompletionSource<SenderLink> attachCompletedTcs =
38-
new(TaskCreationOptions.RunContinuationsAsynchronously);
38+
Utils.CreateTaskCompletionSource<SenderLink>();
3939

4040
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);
4141

@@ -122,7 +122,8 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
122122
try
123123
{
124124
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
125-
new(TaskCreationOptions.RunContinuationsAsynchronously);
125+
Utils.CreateTaskCompletionSource<PublishOutcome>();
126+
126127
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
127128

128129
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)

RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ internal DefaultQueueInfo(string queueName)
3535

3636
internal DefaultQueueInfo(Map response)
3737
{
38-
_name = (string)response["name"];
38+
if (response["name"] is string name)
39+
{
40+
_name = name;
41+
}
42+
else
43+
{
44+
// TODO error?
45+
_name = string.Empty;
46+
}
47+
3948
_durable = (bool)response["durable"];
4049
_autoDelete = (bool)response["auto_delete"];
4150
_exclusive = (bool)response["exclusive"];
@@ -48,10 +57,12 @@ internal DefaultQueueInfo(Map response)
4857

4958
_leader = (string)response["leader"];
5059

51-
string[]? replicas = (string[])response["replicas"];
52-
if (replicas.Length > 0)
60+
if (response["replicas"] is string[] queueReplicas)
5361
{
54-
_replicas.AddRange(replicas);
62+
if (queueReplicas.Length > 0)
63+
{
64+
_replicas.AddRange(queueReplicas);
65+
}
5566
}
5667

5768
_messageCount = (ulong)response["message_count"];

RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public override async Task OpenAsync()
158158
.Queue(_configuration.ReplyToQueue)
159159
.MessageHandler((context, message) =>
160160
{
161+
// TODO MessageHandler funcs should catch all exceptions
161162
context.Accept();
162163
object correlationId = ExtractCorrelationId(message);
163164
if (_pendingRequests.TryGetValue(correlationId, out TaskCompletionSource<IMessage>? request))
@@ -198,8 +199,7 @@ public async Task<IMessage> PublishAsync(IMessage message, CancellationToken can
198199
{
199200
object correlationId = CorrelationIdSupplier();
200201
message = RequestPostProcess(message, correlationId);
201-
_pendingRequests.TryAdd(correlationId,
202-
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
202+
_pendingRequests.TryAdd(correlationId, Utils.CreateTaskCompletionSource<IMessage>());
203203
if (_publisher != null)
204204
{
205205
PublishResult pr = await _publisher.PublishAsync(

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal async Task<Session> GetOrCreateSessionAsync()
3434
}
3535
else
3636
{
37-
TaskCompletionSource<ISession> sessionBeginTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
37+
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
3838
void OnBegin(ISession session, Begin peerBegin)
3939
{
4040
sessionBeginTcs.SetResult(session);

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System;
66
using System.Collections.Generic;
7+
using System.Runtime.CompilerServices;
78
using System.Security.Cryptography;
89
using System.Text;
910
using System.Threading.Tasks;
@@ -39,6 +40,12 @@ internal static int RandomNext(int minValue = 0, int maxValue = 1024)
3940
#endif
4041
}
4142

43+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
44+
internal static TaskCompletionSource<T> CreateTaskCompletionSource<T>()
45+
{
46+
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
47+
}
48+
4249
internal static string GenerateQueueName()
4350
{
4451
return GenerateName(DefaultPrefix);

0 commit comments

Comments
 (0)