@@ -363,8 +363,16 @@ private IByteBuffer ReceiveBuffer(int responseTo, CancellationToken cancellation
363
363
receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
364
364
while ( true )
365
365
{
366
- var buffer = ReceiveBuffer ( cancellationToken ) ;
367
- _dropbox . AddMessage ( buffer ) ;
366
+ try
367
+ {
368
+ var buffer = ReceiveBuffer ( cancellationToken ) ;
369
+ _dropbox . AddMessage ( buffer ) ;
370
+ }
371
+ catch ( Exception ex )
372
+ {
373
+ _dropbox . AddException ( ex ) ;
374
+ throw ;
375
+ }
368
376
369
377
if ( messageTask . IsCompleted )
370
378
{
@@ -426,8 +434,16 @@ private async Task<IByteBuffer> ReceiveBufferAsync(int responseTo, CancellationT
426
434
receiveLockRequest . Task . GetAwaiter ( ) . GetResult ( ) ; // propagate exceptions
427
435
while ( true )
428
436
{
429
- var buffer = await ReceiveBufferAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
430
- _dropbox . AddMessage ( buffer ) ;
437
+ try
438
+ {
439
+ var buffer = await ReceiveBufferAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
440
+ _dropbox . AddMessage ( buffer ) ;
441
+ }
442
+ catch ( Exception ex )
443
+ {
444
+ _dropbox . AddException ( ex ) ;
445
+ throw ;
446
+ }
431
447
432
448
if ( messageTask . IsCompleted )
433
449
{
@@ -766,6 +782,14 @@ private class Dropbox
766
782
private readonly ConcurrentDictionary< int , TaskCompletionSource< IByteBuffer>> _messages = new ConcurrentDictionary < int , TaskCompletionSource < IByteBuffer > > ( ) ;
767
783
768
784
// public methods
785
+ public void AddException ( Exception exception )
786
+ {
787
+ foreach ( var taskCompletionSource in _messages . Values )
788
+ {
789
+ taskCompletionSource . TrySetException ( exception ) ; // has no effect on already completed tasks
790
+ }
791
+ }
792
+
769
793
public void AddMessage ( IByteBuffer message )
770
794
{
771
795
var responseTo = GetResponseTo( message ) ;
0 commit comments