Skip to content

Commit 4aa7c98

Browse files
authored
Merge pull request #198 from AArnott/channelShutdownFix
Shutdown ExistingPipe streams on channel disposal
2 parents f0d43d1 + 6a14b44 commit 4aa7c98

File tree

2 files changed

+28
-5
lines changed

2 files changed

+28
-5
lines changed

src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public async Task Dispose_DisposesChannels()
208208
[Fact]
209209
public async Task ChannelDispose_ClosesExistingStream()
210210
{
211-
var ms = new MonitoringStream(new MemoryStream());
211+
var ms = new MonitoringStream(FullDuplexStream.CreatePair().Item1);
212212
var disposal = new AsyncManualResetEvent();
213213
ms.Disposed += (s, e) => disposal.Set();
214214

@@ -220,7 +220,7 @@ public async Task ChannelDispose_ClosesExistingStream()
220220
[Fact]
221221
public async Task RemoteChannelClose_ClosesExistingStream()
222222
{
223-
var ms = new MonitoringStream(new MemoryStream());
223+
var ms = new MonitoringStream(FullDuplexStream.CreatePair().Item1);
224224
var disposal = new AsyncManualResetEvent();
225225
ms.Disposed += (s, e) => disposal.Set();
226226

src/Nerdbank.Streams/MultiplexingStream.Channel.cs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class Channel : IDisposableObservable, IDuplexPipe
3838
/// </summary>
3939
private readonly TaskCompletionSource<object?> completionSource = new TaskCompletionSource<object?>();
4040

41+
/// <summary>
42+
/// The source for a token that will be canceled when this channel has completed.
43+
/// </summary>
44+
private readonly CancellationTokenSource disposalTokenSource = new CancellationTokenSource();
45+
4146
/// <summary>
4247
/// The source for the <see cref="OptionsApplied"/> property. May be null if options were provided in ctor.
4348
/// </summary>
@@ -107,11 +112,17 @@ public class Channel : IDisposableObservable, IDuplexPipe
107112
private PipeWriter? mxStreamIOWriter;
108113

109114
/// <summary>
110-
/// The I/O to expose on this channel. Will be <c>null</c> if <see cref="ChannelOptions.ExistingPipe"/>
111-
/// was set to a non-null value when this channel was created.
115+
/// The I/O to expose on this channel if <see cref="ChannelOptions.ExistingPipe"/> was not specified;
116+
/// otherwise it is the buffering pipe we use as an intermediary with the specified <see cref="ChannelOptions.ExistingPipe"/>.
112117
/// </summary>
113118
private IDuplexPipe? channelIO;
114119

120+
/// <summary>
121+
/// The value of <see cref="ChannelOptions.ExistingPipe"/> as it was when we received it.
122+
/// We don't use this field, but we set it for diagnostic purposes later.
123+
/// </summary>
124+
private IDuplexPipe? existingPipe;
125+
115126
/// <summary>
116127
/// A value indicating whether this <see cref="Channel"/> was created or accepted with a non-null value for <see cref="ChannelOptions.ExistingPipe"/>.
117128
/// </summary>
@@ -229,6 +240,11 @@ public PipeWriter Output
229240
/// </summary>
230241
public MultiplexingStream MultiplexingStream { get; }
231242

243+
/// <summary>
244+
/// Gets a token that is canceled just before <see cref="Completion" /> has transitioned to its final state.
245+
/// </summary>
246+
internal CancellationToken DisposalToken => this.disposalTokenSource.Token;
247+
232248
internal OfferParameters OfferParams { get; }
233249

234250
internal string Name => this.OfferParams.Name;
@@ -297,6 +313,7 @@ public void Dispose()
297313
Action<object?, object> finalDisposalAction = (exOrAntecedent, state) =>
298314
{
299315
var self = (Channel)state;
316+
self.disposalTokenSource.Cancel();
300317
self.completionSource.TrySetResult(null);
301318
self.MultiplexingStream.OnChannelDisposed(self);
302319
};
@@ -574,8 +591,14 @@ private void ApplyChannelOptions(ChannelOptions channelOptions)
574591
if (channelOptions.ExistingPipe is object)
575592
{
576593
Assumes.NotNull(this.channelIO);
577-
this.DisposeSelfOnFailure(this.channelIO.LinkToAsync(channelOptions.ExistingPipe));
594+
this.existingPipe = channelOptions.ExistingPipe;
578595
this.existingPipeGiven = true;
596+
597+
// We always want to write ALL received data to the user's ExistingPipe, rather than truncating it on disposal, so don't use a cancellation token in that direction.
598+
this.DisposeSelfOnFailure(this.channelIO.Input.LinkToAsync(channelOptions.ExistingPipe.Output));
599+
600+
// Upon disposal, we no longer want to continue reading from the user's ExistingPipe into our buffer since we won't be propagating it any further, so use our DisposalToken.
601+
this.DisposeSelfOnFailure(channelOptions.ExistingPipe.Input.LinkToAsync(this.channelIO.Output, this.DisposalToken));
579602
}
580603
else
581604
{

0 commit comments

Comments
 (0)