Skip to content

Commit 9d6dfab

Browse files
authored
Merge pull request #205 from AArnott/fixNestedPipeReader
Fix NestedPipeReader.AdvanceTo after ReadAsync encounters end of underyling PipeReader
2 parents 4aa7c98 + 59c30f3 commit 9d6dfab

File tree

8 files changed

+303
-136
lines changed

8 files changed

+303
-136
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) Andrew Arnott. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System.IO;
5+
using System.IO.Pipelines;
6+
using Xunit.Abstractions;
7+
8+
public class IOPipelinesStreamPipeReaderTests : StreamPipeReaderTestBase
9+
{
10+
public IOPipelinesStreamPipeReaderTests(ITestOutputHelper logger)
11+
: base(logger)
12+
{
13+
}
14+
15+
protected override PipeReader CreatePipeReader(Stream stream, int hintSize = 0) => PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: hintSize == 0 ? -1 : hintSize));
16+
}

src/Nerdbank.Streams.Tests/NestedPipeReaderTests.cs

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public void TryRead_AllAtOnce_ExamineEverything()
4646
Assert.True(sliceReader.TryRead(out readResult));
4747
Assert.True(readResult.IsCompleted);
4848
Assert.True(readResult.Buffer.IsEmpty);
49+
sliceReader.AdvanceTo(readResult.Buffer.End);
4950

5051
// Verify that the original PipeReader can still produce bytes.
5152
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -65,6 +66,7 @@ public void TryRead_AllAtOnce()
6566
Assert.True(sliceReader.TryRead(out readResult));
6667
Assert.True(readResult.IsCompleted);
6768
Assert.Equal(0, readResult.Buffer.Length);
69+
sliceReader.AdvanceTo(readResult.Buffer.End);
6870

6971
// Verify that the original PipeReader can still produce bytes.
7072
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -113,6 +115,7 @@ public async Task ReadAsync_AllAtOnce()
113115
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
114116
Assert.True(readResult.IsCompleted);
115117
Assert.True(readResult.Buffer.IsEmpty);
118+
sliceReader.AdvanceTo(readResult.Buffer.End);
116119

117120
// Verify that the original PipeReader can still produce bytes.
118121
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -138,6 +141,7 @@ public async Task ReadAsync_ExamineEverything()
138141
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
139142
Assert.True(readResult.IsCompleted);
140143
Assert.True(readResult.Buffer.IsEmpty);
144+
sliceReader.AdvanceTo(readResult.Buffer.End);
141145

142146
// Verify that the original PipeReader can still produce bytes.
143147
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -164,6 +168,7 @@ public async Task ReadAsync_MultipleReads()
164168
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
165169
Assert.True(readResult.IsCompleted);
166170
Assert.Equal(0, readResult.Buffer.Length);
171+
sliceReader.AdvanceTo(readResult.Buffer.End);
167172

168173
// Verify that the original PipeReader can still produce bytes.
169174
Assert.True(this.pipe.Reader.TryRead(out readResult));
@@ -186,6 +191,61 @@ public async Task ReadAsync_SliceExceedsUnderlyingLength()
186191
Assert.True(readResult.IsCompleted);
187192
}
188193

194+
[Fact]
195+
public async Task ReadAsync_TwiceOnCompletion_Throws()
196+
{
197+
this.pipe.Writer.Complete();
198+
199+
var sliceReader = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
200+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
201+
sliceReader.AdvanceTo(readResult.Buffer.End);
202+
await sliceReader.ReadAsync(this.TimeoutToken);
203+
await Assert.ThrowsAsync<InvalidOperationException>(async () => await sliceReader.ReadAsync(this.TimeoutToken));
204+
}
205+
206+
[Fact]
207+
public async Task ReadAsync_ToEnd_AdvanceTo_Partial_ThenReadAsyncAgain()
208+
{
209+
this.pipe.Writer.Complete();
210+
211+
var sliceReader = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
212+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
213+
Assert.True(readResult.IsCompleted);
214+
sliceReader.AdvanceTo(readResult.Buffer.GetPosition(3));
215+
216+
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
217+
Assert.True(readResult.IsCompleted);
218+
sliceReader.AdvanceTo(readResult.Buffer.End);
219+
220+
readResult = await sliceReader.ReadAsync(this.TimeoutToken);
221+
sliceReader.AdvanceTo(readResult.Buffer.End);
222+
223+
readResult = await this.pipe.Reader.ReadAsync(this.TimeoutToken);
224+
Assert.True(readResult.IsCompleted);
225+
this.pipe.Reader.AdvanceTo(readResult.Buffer.End);
226+
}
227+
228+
[Fact]
229+
public async Task ReadAsync_AfterReadingExactBytes()
230+
{
231+
var slice = this.pipe.Reader.ReadSlice(OriginalBuffer.Length);
232+
var readResult = await slice.ReadAsync(this.TimeoutToken);
233+
Assert.True(readResult.IsCompleted);
234+
slice.AdvanceTo(readResult.Buffer.End);
235+
236+
// Try to read again...
237+
readResult = await slice.ReadAsync(this.TimeoutToken);
238+
Assert.True(readResult.Buffer.IsEmpty);
239+
Assert.True(readResult.IsCompleted);
240+
slice.AdvanceTo(readResult.Buffer.End);
241+
242+
// And again...
243+
readResult = await slice.ReadAsync(this.TimeoutToken);
244+
Assert.True(readResult.Buffer.IsEmpty);
245+
Assert.True(readResult.IsCompleted);
246+
slice.AdvanceTo(readResult.Buffer.End);
247+
}
248+
189249
[Fact]
190250
public void OnWriterCompleted_NoOps()
191251
{
@@ -199,30 +259,51 @@ public void OnWriterCompleted_NoOps()
199259
}
200260

201261
[Fact]
202-
public void TryRead_ThrowsAfterCompleting()
262+
public void TryRead_ThrowsAfterCompleting_Prematurely()
203263
{
204264
var sliceReader = this.pipe.Reader.ReadSlice(5);
205265
sliceReader.Complete();
206266
Assert.Throws<InvalidOperationException>(() => sliceReader.TryRead(out ReadResult result));
207267
}
208268

209269
[Fact]
210-
public async Task ReadAsync_ThrowsAfterCompleting()
270+
public async Task ReadAsync_ThrowsAfterCompleting_Prematurely()
211271
{
212272
var sliceReader = this.pipe.Reader.ReadSlice(5);
213273
sliceReader.Complete();
214274
await Assert.ThrowsAsync<InvalidOperationException>(() => sliceReader.ReadAsync(this.TimeoutToken).AsTask());
215275
}
216276

217277
[Fact]
218-
public void Complete_DoesNotCompleteUnderlyingReader()
278+
public void TryRead_ThrowsAfterCompleting_AfterFullRead()
219279
{
220280
var sliceReader = this.pipe.Reader.ReadSlice(5);
281+
Assert.True(sliceReader.TryRead(out ReadResult readResult));
282+
Assert.True(readResult.IsCompleted);
283+
sliceReader.AdvanceTo(readResult.Buffer.End);
221284
sliceReader.Complete();
222-
Assert.True(this.pipe.Reader.TryRead(out ReadResult result));
223-
Assert.Equal(OriginalBuffer.Length, result.Buffer.Length);
285+
Assert.Throws<InvalidOperationException>(() => sliceReader.TryRead(out ReadResult result));
286+
287+
Assert.True(this.pipe.Reader.TryRead(out readResult));
288+
Assert.False(readResult.Buffer.IsEmpty);
224289
this.pipe.Reader.Complete();
225-
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out result));
290+
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out ReadResult result));
291+
}
292+
293+
[Fact]
294+
public async Task ReadAsync_ThrowsAfterCompleting_AfterFullRead()
295+
{
296+
var sliceReader = this.pipe.Reader.ReadSlice(5);
297+
var readResult = await sliceReader.ReadAsync(this.TimeoutToken);
298+
Assert.True(readResult.IsCompleted);
299+
sliceReader.AdvanceTo(readResult.Buffer.End);
300+
sliceReader.Complete();
301+
await Assert.ThrowsAsync<InvalidOperationException>(() => sliceReader.ReadAsync(this.TimeoutToken).AsTask());
302+
303+
readResult = await this.pipe.Reader.ReadAsync(this.TimeoutToken);
304+
Assert.False(readResult.Buffer.IsEmpty);
305+
this.pipe.Reader.Complete();
306+
await Assert.ThrowsAsync<InvalidOperationException>(() => this.pipe.Reader.ReadAsync(this.TimeoutToken).AsTask());
226307
}
227308

228309
[Fact]
@@ -233,6 +314,25 @@ public void Complete_WithException_DoesCompleteUnderlyingReader()
233314
Assert.Throws<InvalidOperationException>(() => this.pipe.Reader.TryRead(out ReadResult result));
234315
}
235316

317+
[Fact]
318+
public void Complete_Twice_WithoutReading()
319+
{
320+
var sliceReader = this.pipe.Reader.ReadSlice(5);
321+
sliceReader.Complete();
322+
sliceReader.Complete();
323+
}
324+
325+
[Fact]
326+
public void Complete_Twice_AfterReading()
327+
{
328+
var sliceReader = this.pipe.Reader.ReadSlice(5);
329+
Assert.True(sliceReader.TryRead(out ReadResult result));
330+
Assert.True(result.IsCompleted);
331+
sliceReader.AdvanceTo(result.Buffer.End);
332+
sliceReader.Complete();
333+
sliceReader.Complete();
334+
}
335+
236336
[Fact]
237337
public void CancelPendingRead_UnderlyingReader_TryRead()
238338
{

0 commit comments

Comments
 (0)