Skip to content

Commit 6a14b44

Browse files
committed
Only cancel LinkToAsync in one direction
This avoids data loss when pipes complete while others are still propagating received data.
1 parent 665e18a commit 6a14b44

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

src/Nerdbank.Streams/MultiplexingStream.Channel.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,17 @@ public class Channel : IDisposableObservable, IDuplexPipe
112112
private PipeWriter? mxStreamIOWriter;
113113

114114
/// <summary>
115-
/// The I/O to expose on this channel. Will be <c>null</c> if <see cref="ChannelOptions.ExistingPipe"/>
116-
/// was set to a non-null value when this channel was created.
115+
/// The I/O to expose on this channel if <see cref="ChannelOptions.ExistingPipe"/> was not specified;
116+
/// otherwise it is the buffering pipe we use as an intermediary with the specified <see cref="ChannelOptions.ExistingPipe"/>.
117117
/// </summary>
118118
private IDuplexPipe? channelIO;
119119

120+
/// <summary>
121+
/// The value of <see cref="ChannelOptions.ExistingPipe"/> as it was when we received it.
122+
/// We don't use this field, but we set it for diagnostic purposes later.
123+
/// </summary>
124+
private IDuplexPipe? existingPipe;
125+
120126
/// <summary>
121127
/// A value indicating whether this <see cref="Channel"/> was created or accepted with a non-null value for <see cref="ChannelOptions.ExistingPipe"/>.
122128
/// </summary>
@@ -585,8 +591,14 @@ private void ApplyChannelOptions(ChannelOptions channelOptions)
585591
if (channelOptions.ExistingPipe is object)
586592
{
587593
Assumes.NotNull(this.channelIO);
588-
this.DisposeSelfOnFailure(this.channelIO.LinkToAsync(channelOptions.ExistingPipe, this.DisposalToken));
594+
this.existingPipe = channelOptions.ExistingPipe;
589595
this.existingPipeGiven = true;
596+
597+
// We always want to write ALL received data to the user's ExistingPipe, rather than truncating it on disposal, so don't use a cancellation token in that direction.
598+
this.DisposeSelfOnFailure(this.channelIO.Input.LinkToAsync(channelOptions.ExistingPipe.Output));
599+
600+
// Upon disposal, we no longer want to continue reading from the user's ExistingPipe into our buffer since we won't be propagating it any further, so use our DisposalToken.
601+
this.DisposeSelfOnFailure(channelOptions.ExistingPipe.Input.LinkToAsync(this.channelIO.Output, this.DisposalToken));
590602
}
591603
else
592604
{

0 commit comments

Comments
 (0)