|
4 | 4 |
|
5 | 5 | using System; |
6 | 6 | using System.Collections.Concurrent; |
7 | | -using System.Diagnostics; |
8 | 7 | using System.Linq; |
9 | 8 | using System.Threading; |
10 | 9 | using System.Threading.Tasks; |
@@ -270,24 +269,38 @@ private async Task EnsureReceiverLinkAsync() |
270 | 269 | new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), }, |
271 | 270 | }; |
272 | 271 |
|
273 | | - var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
274 | | - _receiverLink = new ReceiverLink( |
| 272 | + var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously); |
| 273 | + var tmpReceiverLink = new ReceiverLink( |
275 | 274 | _managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) => |
276 | 275 | { |
277 | | - Debug.Assert(Object.ReferenceEquals(_receiverLink, link)); |
278 | | - tcs.SetResult(true); |
| 276 | + if (link is ReceiverLink receiverLink) |
| 277 | + { |
| 278 | + tcs.SetResult(receiverLink); |
| 279 | + } |
| 280 | + else |
| 281 | + { |
| 282 | + // TODO create "internal bug" exception type? |
| 283 | + var ex = new InvalidOperationException( |
| 284 | + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); |
| 285 | + tcs.SetException(ex); |
| 286 | + } |
279 | 287 | }); |
280 | 288 |
|
281 | | - await tcs.Task |
| 289 | + _receiverLink = await tcs.Task |
282 | 290 | .ConfigureAwait(false); |
283 | 291 |
|
| 292 | + if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink)) |
| 293 | + { |
| 294 | + // TODO log this case? |
| 295 | + } |
| 296 | + |
284 | 297 | // TODO |
285 | 298 | // using a credit of 1 can result in AmqpExceptions in ProcessResponses |
286 | 299 | _receiverLink.SetCredit(100); |
287 | 300 | } |
288 | 301 | } |
289 | 302 |
|
290 | | - private Task EnsureSenderLinkAsync() |
| 303 | + private async Task EnsureSenderLinkAsync() |
291 | 304 | { |
292 | 305 | if (_senderLink == null || _senderLink.IsClosed) |
293 | 306 | { |
@@ -315,18 +328,30 @@ private Task EnsureSenderLinkAsync() |
315 | 328 | }, |
316 | 329 | }; |
317 | 330 |
|
318 | | - var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
319 | | - _senderLink = new SenderLink( |
| 331 | + var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously); |
| 332 | + var tmpSenderLink = new SenderLink( |
320 | 333 | _managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) => |
321 | 334 | { |
322 | | - Debug.Assert(Object.ReferenceEquals(_senderLink, link)); |
323 | | - tcs.SetResult(true); |
| 335 | + if (link is SenderLink senderLink) |
| 336 | + { |
| 337 | + tcs.SetResult(senderLink); |
| 338 | + } |
| 339 | + else |
| 340 | + { |
| 341 | + // TODO create "internal bug" exception type? |
| 342 | + var ex = new InvalidOperationException( |
| 343 | + "invalid link in OnAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues"); |
| 344 | + tcs.SetException(ex); |
| 345 | + } |
324 | 346 | }); |
325 | | - return tcs.Task; |
326 | | - } |
327 | | - else |
328 | | - { |
329 | | - return Task.CompletedTask; |
| 347 | + |
| 348 | + _senderLink = await tcs.Task |
| 349 | + .ConfigureAwait(false); |
| 350 | + |
| 351 | + if (false == Object.ReferenceEquals(_senderLink, tmpSenderLink)) |
| 352 | + { |
| 353 | + // TODO log this case? |
| 354 | + } |
330 | 355 | } |
331 | 356 | } |
332 | 357 |
|
|
0 commit comments