Skip to content

Commit 665e18a

Browse files
committed
Cancel LinkToAsync when Channel shuts down
This will add assurance that streams/pipes that are connected to the Channel will shutdown because their reader/writer has completed.
1 parent 3854a8d commit 665e18a

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public async Task Dispose_DisposesChannels()
208208
[Fact]
209209
public async Task ChannelDispose_ClosesExistingStream()
210210
{
211-
var ms = new MonitoringStream(new MemoryStream());
211+
var ms = new MonitoringStream(FullDuplexStream.CreatePair().Item1);
212212
var disposal = new AsyncManualResetEvent();
213213
ms.Disposed += (s, e) => disposal.Set();
214214

@@ -220,7 +220,7 @@ public async Task ChannelDispose_ClosesExistingStream()
220220
[Fact]
221221
public async Task RemoteChannelClose_ClosesExistingStream()
222222
{
223-
var ms = new MonitoringStream(new MemoryStream());
223+
var ms = new MonitoringStream(FullDuplexStream.CreatePair().Item1);
224224
var disposal = new AsyncManualResetEvent();
225225
ms.Disposed += (s, e) => disposal.Set();
226226

src/Nerdbank.Streams/MultiplexingStream.Channel.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public class Channel : IDisposableObservable, IDuplexPipe
3838
/// </summary>
3939
private readonly TaskCompletionSource<object?> completionSource = new TaskCompletionSource<object?>();
4040

41+
/// <summary>
42+
/// The source for a token that will be canceled when this channel has completed.
43+
/// </summary>
44+
private readonly CancellationTokenSource disposalTokenSource = new CancellationTokenSource();
45+
4146
/// <summary>
4247
/// The source for the <see cref="OptionsApplied"/> property. May be null if options were provided in ctor.
4348
/// </summary>
@@ -229,6 +234,11 @@ public PipeWriter Output
229234
/// </summary>
230235
public MultiplexingStream MultiplexingStream { get; }
231236

237+
/// <summary>
238+
/// Gets a token that is canceled just before <see cref="Completion" /> has transitioned to its final state.
239+
/// </summary>
240+
internal CancellationToken DisposalToken => this.disposalTokenSource.Token;
241+
232242
internal OfferParameters OfferParams { get; }
233243

234244
internal string Name => this.OfferParams.Name;
@@ -297,6 +307,7 @@ public void Dispose()
297307
Action<object?, object> finalDisposalAction = (exOrAntecedent, state) =>
298308
{
299309
var self = (Channel)state;
310+
self.disposalTokenSource.Cancel();
300311
self.completionSource.TrySetResult(null);
301312
self.MultiplexingStream.OnChannelDisposed(self);
302313
};
@@ -574,7 +585,7 @@ private void ApplyChannelOptions(ChannelOptions channelOptions)
574585
if (channelOptions.ExistingPipe is object)
575586
{
576587
Assumes.NotNull(this.channelIO);
577-
this.DisposeSelfOnFailure(this.channelIO.LinkToAsync(channelOptions.ExistingPipe));
588+
this.DisposeSelfOnFailure(this.channelIO.LinkToAsync(channelOptions.ExistingPipe, this.DisposalToken));
578589
this.existingPipeGiven = true;
579590
}
580591
else

0 commit comments

Comments
 (0)