Skip to content

Commit 7959318

Browse files
committed
* Refactor how a couple links are created and the use of TaskCompletionSource
1 parent 7a92489 commit 7959318

File tree

2 files changed

+46
-21
lines changed

2 files changed

+46
-21
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ void OnAttached(ILink argLink, Attach argAttach)
6969
{
7070
// TODO create "internal bug" exception type?
7171
var ex = new InvalidOperationException(
72-
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
72+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
7373
attachCompletedTcs.SetException(ex);
7474
}
7575
}
@@ -84,8 +84,10 @@ void OnAttached(ILink argLink, Attach argAttach)
8484
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
8585
.ConfigureAwait(false);
8686

87-
// TODO this assertion may not be valid
88-
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));
87+
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
88+
{
89+
// TODO log this case?
90+
}
8991

9092
if (_receiverLink is null)
9193
{

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System;
66
using System.Collections.Concurrent;
7-
using System.Diagnostics;
87
using System.Linq;
98
using System.Threading;
109
using System.Threading.Tasks;
@@ -270,25 +269,38 @@ private async Task EnsureReceiverLinkAsync()
270269
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
271270
};
272271

273-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
274-
_receiverLink = new ReceiverLink(
272+
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
273+
var tmpReceiverLink = new ReceiverLink(
275274
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
276275
{
277-
// TODO this may not be a valid assertion
278-
// Debug.Assert(Object.ReferenceEquals(_receiverLink, link));
279-
tcs.SetResult(true);
276+
if (link is ReceiverLink receiverLink)
277+
{
278+
tcs.SetResult(receiverLink);
279+
}
280+
else
281+
{
282+
// TODO create "internal bug" exception type?
283+
var ex = new InvalidOperationException(
284+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
285+
tcs.SetException(ex);
286+
}
280287
});
281288

282-
await tcs.Task
289+
_receiverLink = await tcs.Task
283290
.ConfigureAwait(false);
284291

292+
if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
293+
{
294+
// TODO log this case?
295+
}
296+
285297
// TODO
286298
// using a credit of 1 can result in AmqpExceptions in ProcessResponses
287299
_receiverLink.SetCredit(100);
288300
}
289301
}
290302

291-
private Task EnsureSenderLinkAsync()
303+
private async Task EnsureSenderLinkAsync()
292304
{
293305
if (_senderLink == null || _senderLink.IsClosed)
294306
{
@@ -316,19 +328,30 @@ private Task EnsureSenderLinkAsync()
316328
},
317329
};
318330

319-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
320-
_senderLink = new SenderLink(
331+
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
332+
var tmpSenderLink = new SenderLink(
321333
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
322334
{
323-
// TODO this assertion may not be valid
324-
Debug.Assert(Object.ReferenceEquals(_senderLink, link));
325-
tcs.SetResult(true);
335+
if (link is SenderLink senderLink)
336+
{
337+
tcs.SetResult(senderLink);
338+
}
339+
else
340+
{
341+
// TODO create "internal bug" exception type?
342+
var ex = new InvalidOperationException(
343+
"invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
344+
tcs.SetException(ex);
345+
}
326346
});
327-
return tcs.Task;
328-
}
329-
else
330-
{
331-
return Task.CompletedTask;
347+
348+
_senderLink = await tcs.Task
349+
.ConfigureAwait(false);
350+
351+
if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink))
352+
{
353+
// TODO log this case?
354+
}
332355
}
333356
}
334357

0 commit comments

Comments
 (0)