Skip to content

Commit 9682645

Browse files
committed
Fix channel shutdown when the entire mxstream is disposed
Fixes #95
1 parent 8e68048 commit 9682645

File tree

3 files changed

+162
-105
lines changed

3 files changed

+162
-105
lines changed

src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ public void Disposal_DisposesTransportStream()
8989
public async Task Dispose_DisposesChannels()
9090
{
9191
var (channel1, channel2) = await this.EstablishChannelsAsync("A");
92-
Task channel1WriterCompleted = channel1.Input.WaitForWriterCompletionAsync();
93-
Task channel1ReaderCompleted = channel1.Output.WaitForReaderCompletionAsync();
9492
this.mx1.Dispose();
9593
Assert.True(channel1.IsDisposed);
9694
await channel1.Completion.WithCancellation(this.TimeoutToken);
97-
await channel1WriterCompleted.WithCancellation(this.TimeoutToken);
98-
await channel1ReaderCompleted.WithCancellation(this.TimeoutToken);
95+
await channel1.Input.WaitForWriterCompletionAsync().WithCancellation(this.TimeoutToken);
96+
await channel1.Output.WaitForReaderCompletionAsync().WithCancellation(this.TimeoutToken);
9997
}
10098

10199
[Fact]

src/Nerdbank.Streams/MultiplexingStream.Channel.cs

Lines changed: 91 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public class Channel : IDisposableObservable, IDuplexPipe
4242
/// </summary>
4343
private readonly bool offeredLocally;
4444

45+
/// <summary>
46+
/// Tracks the end of any copying from the mxstream to this channel.
47+
/// </summary>
48+
private readonly AsyncManualResetEvent mxStreamIOWriterCompleted = new AsyncManualResetEvent();
49+
4550
/// <summary>
4651
/// Indicates whether the <see cref="Dispose"/> method has been called.
4752
/// </summary>
@@ -53,7 +58,8 @@ public class Channel : IDisposableObservable, IDuplexPipe
5358
private PipeReader mxStreamIOReader;
5459

5560
/// <summary>
56-
/// A task that represents the completion of the <see cref="mxStreamIOReader"/>.
61+
/// A task that represents the completion of the <see cref="mxStreamIOReader"/>,
62+
/// signifying the point where we will stop relaying data from the channel to the <see cref="MultiplexingStream"/> for transmission to the remote party.
5763
/// </summary>
5864
private Task mxStreamIOReaderCompleted;
5965

@@ -206,40 +212,44 @@ public void Dispose()
206212
this.acceptanceSource.TrySetCanceled();
207213
this.optionsAppliedTaskSource?.TrySetCanceled();
208214

215+
PipeWriter mxStreamIOWriter;
209216
lock (this.SyncObject)
210217
{
211218
this.isDisposed = true;
219+
mxStreamIOWriter = this.mxStreamIOWriter;
220+
}
212221

213-
// We do NOT complete *our* writer (which reads data coming in from the remote) here because
214-
// another thread could be writing to it right now and we don't want to cause them to throw.
215-
// Instead, we'll just Cancel a pending flush to it as a signal to unblock if the mxstream is flushing now.
216-
// The mxstream itself will Complete writing.
217-
this.mxStreamIOWriter?.CancelPendingFlush();
222+
// Complete writing so that the mxstream cannot write to this channel any more.
223+
// We must also cancel a pending flush since no one is guaranteed to be reading this any more
224+
// and we don't want to deadlock on a full buffer in a disposed channel's pipe.
225+
mxStreamIOWriter?.Complete();
226+
mxStreamIOWriter?.CancelPendingFlush();
227+
this.mxStreamIOWriterCompleted.Set();
218228

219-
// If we're using our own Pipe to relay user messages, we can shutdown writing and allow for our reader to propagate what was already written
229+
if (this.channelIO != null)
230+
{
231+
// We're using our own Pipe to relay user messages, so we can shutdown writing and allow for our reader to propagate what was already written
220232
// before actually shutting down.
221-
if (this.channelIO != null)
222-
{
223-
this.channelIO.Output.Complete();
224-
this.channelIO.Output.OnReaderCompleted(finalDisposalAction, this);
225-
}
226-
else
227-
{
228-
// We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel).
229-
// We can't know whether there is or will be more bytes written to the user's PipeWriter,
230-
// but we need to terminate our reader for their writer as part of reclaiming resources.
231-
// We want to Cancel any concurrent or subsequent read and let the reader respond and complete itself to avoid race conditions.
232-
this.mxStreamIOReader?.CancelPendingRead();
233+
this.channelIO.Output.Complete();
234+
}
235+
else
236+
{
237+
// We don't own the user's PipeWriter to complete it (so they can't write anything more to this channel).
238+
// We can't know whether there is or will be more bytes written to the user's PipeWriter,
239+
// but we need to terminate our reader for their writer as part of reclaiming resources.
240+
// We want to complete reading immediately and cancel any pending read.
241+
this.mxStreamIOReader?.Complete();
242+
this.mxStreamIOReader?.CancelPendingRead();
243+
}
233244

234-
if (this.mxStreamIOReaderCompleted?.IsCompleted ?? true)
235-
{
236-
finalDisposalAction(null, this);
237-
}
238-
else
239-
{
240-
this.mxStreamIOReaderCompleted.ContinueWith(finalDisposalAction, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Forget();
241-
}
242-
}
245+
// As a minor perf optimization, avoid allocating a continuation task if the antecedent is already completed.
246+
if (this.mxStreamIOReaderCompleted?.IsCompleted ?? true)
247+
{
248+
finalDisposalAction(null, this);
249+
}
250+
else
251+
{
252+
this.mxStreamIOReaderCompleted.ContinueWith(finalDisposalAction, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Forget();
243253
}
244254
}
245255
}
@@ -252,6 +262,7 @@ internal async ValueTask<PipeWriter> GetReceivedMessagePipeWriterAsync()
252262
{
253263
lock (this.SyncObject)
254264
{
265+
Verify.NotDisposed(this);
255266
if (this.switchingToExistingPipe == null)
256267
{
257268
PipeWriter result = this.mxStreamIOWriter;
@@ -275,6 +286,7 @@ internal async ValueTask<PipeWriter> GetReceivedMessagePipeWriterAsync()
275286
PipeWriter newWriter = await this.switchingToExistingPipe.ConfigureAwait(false);
276287
lock (this.SyncObject)
277288
{
289+
Verify.NotDisposed(this);
278290
this.mxStreamIOWriter = newWriter;
279291

280292
// Skip all this next time.
@@ -284,6 +296,19 @@ internal async ValueTask<PipeWriter> GetReceivedMessagePipeWriterAsync()
284296
return this.mxStreamIOWriter;
285297
}
286298

299+
/// <summary>
300+
/// Called by the <see cref="MultiplexingStream"/> when when it will not be writing any more data to the channel.
301+
/// </summary>
302+
internal void OnContentWritingCompleted()
303+
{
304+
this.DisposeSelfOnFailure(Task.Run(async delegate
305+
{
306+
var writer = await this.GetReceivedMessagePipeWriterAsync().ConfigureAwait(false);
307+
writer.Complete();
308+
this.mxStreamIOWriterCompleted.Set();
309+
}));
310+
}
311+
287312
/// <summary>
288313
/// Accepts an offer made by the remote party.
289314
/// </summary>
@@ -379,7 +404,7 @@ private void ApplyChannelOptions(ChannelOptions channelOptions)
379404

380405
this.mxStreamIOReaderCompleted = this.ProcessOutboundTransmissionsAsync();
381406
this.DisposeSelfOnFailure(this.mxStreamIOReaderCompleted);
382-
this.DisposeSelfOnFailure(this.AutoCloseOnPipesClosureAsync(this.mxStreamIOReaderCompleted));
407+
this.DisposeSelfOnFailure(this.AutoCloseOnPipesClosureAsync());
383408
}
384409
catch (Exception ex)
385410
{
@@ -420,7 +445,22 @@ private async Task ProcessOutboundTransmissionsAsync()
420445
{
421446
while (!this.Completion.IsCompleted)
422447
{
423-
var result = await this.mxStreamIOReader.ReadAsync().ConfigureAwait(false);
448+
ReadResult result;
449+
try
450+
{
451+
result = await this.mxStreamIOReader.ReadAsync().ConfigureAwait(false);
452+
}
453+
catch (InvalidOperationException ex)
454+
{
455+
// Someone completed the reader. The channel was probably disposed.
456+
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
457+
{
458+
this.TraceSource.TraceEvent(TraceEventType.Verbose, 0, "Transmission terminated because the reader threw: {0}", ex);
459+
}
460+
461+
break;
462+
}
463+
424464
if (result.IsCanceled)
425465
{
426466
// We've been asked to cancel. Presumably the channel has been disposed.
@@ -451,8 +491,21 @@ private async Task ProcessOutboundTransmissionsAsync()
451491

452492
await this.MultiplexingStream.SendFrameAsync(header, bufferToRelay, CancellationToken.None).ConfigureAwait(false);
453493

454-
// Let the pipe know exactly how much we read, which might be less than we were given.
455-
this.mxStreamIOReader.AdvanceTo(bufferToRelay.End);
494+
try
495+
{
496+
// Let the pipe know exactly how much we read, which might be less than we were given.
497+
this.mxStreamIOReader.AdvanceTo(bufferToRelay.End);
498+
}
499+
catch (InvalidOperationException ex)
500+
{
501+
// Someone completed the reader. The channel was probably disposed.
502+
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
503+
{
504+
this.TraceSource.TraceEvent(TraceEventType.Verbose, 0, "Transmission terminated because the reader threw: {0}", ex);
505+
}
506+
507+
break;
508+
}
456509
}
457510

458511
if (result.IsCompleted)
@@ -466,43 +519,23 @@ private async Task ProcessOutboundTransmissionsAsync()
466519
}
467520
}
468521

469-
this.MultiplexingStream.OnChannelWritingCompleted(this);
470522
this.mxStreamIOReader.Complete();
471523
}
472524
catch (Exception ex)
473525
{
474526
this.mxStreamIOReader.Complete(ex);
475527
throw;
476528
}
529+
finally
530+
{
531+
this.MultiplexingStream.OnChannelWritingCompleted(this);
532+
}
477533
}
478534

479-
private async Task AutoCloseOnPipesClosureAsync(Task transmissionCompletion)
535+
private async Task AutoCloseOnPipesClosureAsync()
480536
{
481-
Requires.NotNull(transmissionCompletion, nameof(transmissionCompletion));
482-
483537
PipeWriter initialWriter = this.mxStreamIOWriter;
484-
Task receivingCompletion = initialWriter.WaitForReaderCompletionAsync();
485-
await Task.WhenAll(receivingCompletion, transmissionCompletion).ConfigureAwait(false);
486-
487-
// Consider that this.mxStreamIOWriter can be reassigned.
488-
// Carefully await the new one to complete as well, if applicable.
489-
Task<PipeWriter> switchingToExistingPipe;
490-
PipeWriter currentWriter;
491-
lock (this.SyncObject)
492-
{
493-
switchingToExistingPipe = this.switchingToExistingPipe;
494-
currentWriter = this.mxStreamIOWriter;
495-
}
496-
497-
if (switchingToExistingPipe != null)
498-
{
499-
currentWriter = await switchingToExistingPipe.ConfigureAwait(false);
500-
}
501-
502-
if (currentWriter != initialWriter)
503-
{
504-
await currentWriter.WaitForReaderCompletionAsync().ConfigureAwait(false);
505-
}
538+
await Task.WhenAll(this.mxStreamIOWriterCompleted.WaitAsync(), this.mxStreamIOReaderCompleted).ConfigureAwait(false);
506539

507540
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
508541
{

0 commit comments

Comments
 (0)