Skip to content

Commit fa4d9b9

Browse files
authored
Merge pull request #330 from AArnott/fix252
Avoid 1st chance exceptions in one-way mxstream channels
2 parents 8a60223 + 56ec094 commit fa4d9b9

File tree

3 files changed

+33
-0
lines changed

3 files changed

+33
-0
lines changed

src/Nerdbank.Streams.Tests/MultiplexingStreamTests.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,23 @@ public async Task CommunicateOverOneChannel()
388388
await this.TransmitAndVerifyAsync(b, a, Guid.NewGuid().ToByteArray());
389389
}
390390

391+
[Fact]
392+
public async Task ChannelWithExistingSimplexChannel()
393+
{
394+
var transmittingPipe = new Pipe();
395+
var receivingPipe = new Pipe();
396+
var mx1ChannelTask = this.mx1.OfferChannelAsync(string.Empty, new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(transmittingPipe.Reader) }, this.TimeoutToken);
397+
var mx2ChannelTask = this.mx2.AcceptChannelAsync(string.Empty, new MultiplexingStream.ChannelOptions { ExistingPipe = new DuplexPipe(receivingPipe.Writer) }, this.TimeoutToken);
398+
var channels = await WhenAllSucceedOrAnyFail(mx1ChannelTask, mx2ChannelTask).WithCancellation(this.TimeoutToken);
399+
400+
var buffer = this.GetBuffer(3);
401+
await transmittingPipe.Writer.WriteAsync(buffer, this.TimeoutToken);
402+
403+
ReadOnlySequence<byte> readBytes = await this.ReadAtLeastAsync(receivingPipe.Reader, buffer.Length);
404+
Assert.Equal(buffer.Length, readBytes.Length);
405+
Assert.Equal(buffer, readBytes.ToArray());
406+
}
407+
391408
[Fact]
392409
[Trait("SkipInCodeCoverage", "true")] // far too slow and times out
393410
[Trait("Stress", "true")]

src/Nerdbank.Streams/DuplexPipe.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ public DuplexPipe(PipeWriter output)
5050
/// <inheritdoc />
5151
public PipeWriter Output { get; }
5252

53+
/// <summary>
54+
/// Checks whether the given <see cref="PipeReader"/> is known to be already completed.
55+
/// </summary>
56+
/// <param name="reader">The reader to check.</param>
57+
/// <returns><see langword="true" /> if the <paramref name="reader"/> is known to be completed; <see langword="false"/> if the <paramref name="reader"/> is not known to be completed.</returns>
58+
/// <remarks>
59+
/// This method <em>may</em> return <see langword="false"/> for a completed <see cref="PipeReader"/>.
60+
/// </remarks>
61+
internal static bool IsDefinitelyCompleted(PipeReader reader) => reader is CompletedPipeReader;
62+
5363
private class CompletedPipeWriter : PipeWriter
5464
{
5565
internal static readonly PipeWriter Singleton = new CompletedPipeWriter();

src/Nerdbank.Streams/PipeExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,12 @@ internal static Task LinkToAsync(this PipeReader reader, PipeWriter writer, Canc
435435
{
436436
try
437437
{
438+
if (DuplexPipe.IsDefinitelyCompleted(reader))
439+
{
440+
await writer.CompleteAsync().ConfigureAwait(false);
441+
return;
442+
}
443+
438444
while (true)
439445
{
440446
cancellationToken.ThrowIfCancellationRequested();

0 commit comments

Comments
 (0)