Skip to content

Commit bd92830

Browse files
authored
Merge pull request #921 from dotnet/fix918
Auto-flush support for SimplexStream at 4KB threshold
2 parents 44c87b7 + 5a80d7e commit bd92830

File tree

2 files changed

+270
-4
lines changed

2 files changed

+270
-4
lines changed

src/Nerdbank.Streams/SimplexStream.cs

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,26 @@ namespace Nerdbank.Streams
1717
/// A <see cref="Stream"/> that acts as a queue for bytes, in that what gets written to it
1818
/// can then be read from it, in order.
1919
/// </summary>
20+
/// <remarks>
21+
/// <para>
22+
/// This stream buffers written content, as any other .NET stream might do.
23+
/// To ensure bytes written are available to be read, callers should call <see cref="FlushAsync"/> or <see cref="Flush"/> after writing, and before reading.
24+
/// Flushing automatically happens when the buffer is filled, so writes will block until the reader reads enough data to make room for the new data.
25+
/// No flushing occurs when using the <see cref="IBufferWriter{T}"/> interface, so callers must call <see cref="FlushAsync"/> or <see cref="Flush"/> after calling <see cref="IBufferWriter{T}.Advance(int)"/>.
26+
/// </para>
27+
/// <para>
28+
/// This class is thread safe for one concurrent reader and writer.
29+
/// It is <em>not</em> thread safe for multiple concurrent readers or writers.
30+
/// Disposal is not thread safe and must be executed exclusively of any concurrent reader or writer.
31+
/// </para>
32+
/// </remarks>
2033
public class SimplexStream : Stream, IBufferWriter<byte>, IDisposableObservable
2134
{
35+
/// <summary>
36+
/// The number of bytes to write before automatically flushing.
37+
/// </summary>
38+
private const int AutoFlushThreshold = 4096;
39+
2240
/// <summary>
2341
/// The pipe that does all the hard work.
2442
/// </summary>
@@ -34,9 +52,17 @@ public class SimplexStream : Stream, IBufferWriter<byte>, IDisposableObservable
3452
/// </summary>
3553
private bool completed;
3654

55+
/// <summary>
56+
/// The number of bytes written since the last flush.
57+
/// </summary>
58+
private int bytesSinceLastFlush;
59+
3760
/// <summary>
3861
/// Initializes a new instance of the <see cref="SimplexStream"/> class.
3962
/// </summary>
63+
/// <remarks>
64+
/// The default thresholds for pausing and resuming the writer are 32KB and 16KB, respectively.
65+
/// </remarks>
4066
public SimplexStream()
4167
: this(16 * 1024, 32 * 1024)
4268
{
@@ -78,6 +104,8 @@ public override long Position
78104
set => throw this.ThrowDisposedOr(new NotSupportedException());
79105
}
80106

107+
private long UnflushedBytes => this.pipe.Writer.CanGetUnflushedBytes ? this.pipe.Writer.UnflushedBytes : this.bytesSinceLastFlush;
108+
81109
/// <summary>
82110
/// Signals that no more writing will take place, causing readers to receive 0 bytes when asking for any more data.
83111
/// </summary>
@@ -141,15 +169,33 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
141169
}
142170

143171
/// <inheritdoc />
144-
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
172+
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
145173
{
174+
Requires.NotNull(buffer, nameof(buffer));
175+
Requires.Range(offset + count <= buffer.Length, nameof(count));
176+
Requires.Range(offset >= 0, nameof(offset));
177+
Requires.Range(count >= 0, nameof(count));
178+
Verify.NotDisposed(this);
179+
146180
cancellationToken.ThrowIfCancellationRequested();
147-
this.Write(buffer, offset, count);
148-
return Task.CompletedTask;
181+
Memory<byte> memory = this.pipe.Writer.GetMemory(count);
182+
buffer.AsMemory(offset, count).CopyTo(memory);
183+
this.pipe.Writer.Advance(count);
184+
this.RecordBytesWritten(count);
185+
186+
// Auto-flush if we've written enough data
187+
if (this.UnflushedBytes >= AutoFlushThreshold)
188+
{
189+
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
190+
}
149191
}
150192

151193
/// <inheritdoc />
152-
void IBufferWriter<byte>.Advance(int count) => this.pipe.Writer.Advance(count);
194+
void IBufferWriter<byte>.Advance(int count)
195+
{
196+
this.pipe.Writer.Advance(count);
197+
this.RecordBytesWritten(count);
198+
}
153199

154200
/// <inheritdoc />
155201
Memory<byte> IBufferWriter<byte>.GetMemory(int sizeHint) => this.pipe.Writer.GetMemory(sizeHint);
@@ -174,6 +220,13 @@ public override void Write(byte[] buffer, int offset, int count)
174220
Memory<byte> memory = this.pipe.Writer.GetMemory(count);
175221
buffer.AsMemory(offset, count).CopyTo(memory);
176222
this.pipe.Writer.Advance(count);
223+
this.RecordBytesWritten(count);
224+
225+
// Auto-flush if we've written enough data
226+
if (this.UnflushedBytes >= AutoFlushThreshold)
227+
{
228+
this.Flush();
229+
}
177230
}
178231

179232
/// <inheritdoc />
@@ -195,5 +248,18 @@ private Exception ThrowDisposedOr(Exception ex)
195248
Verify.NotDisposed(this);
196249
throw ex;
197250
}
251+
252+
private void RecordBytesWritten(int count)
253+
{
254+
if (this.pipe.Writer.CanGetUnflushedBytes)
255+
{
256+
// The PipeWriter is tracking unflushed bytes for us, so we don't need to.
257+
return;
258+
}
259+
260+
this.bytesSinceLastFlush += count;
261+
}
262+
263+
private void ResetBytesSinceLastFlush() => this.bytesSinceLastFlush = 0;
198264
}
199265
}

test/Nerdbank.Streams.Tests/SimplexStreamTests.cs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ public class SimplexStreamTests : TestBase
1313

1414
private const int PauseThreshold = 40;
1515

16+
// Test-local constant for auto-flush threshold. Keep in sync with production value in SimplexStream.
17+
private const int AutoFlushThreshold = 4096;
18+
1619
private readonly Random random = new Random();
1720

1821
private SimplexStream stream = new SimplexStream(ResumeThreshold, PauseThreshold);
@@ -344,6 +347,203 @@ public void CompleteWriting_CompleteSuccessfullyThenWithError()
344347
Assert.Equal(0, this.stream.Read(buffer, 0, buffer.Length));
345348
}
346349

350+
[Theory]
351+
[CombinatorialData]
352+
public async Task AutoFlush_OccursAfter4KB(bool useAsync)
353+
{
354+
// Use a stream with larger thresholds to avoid blocking
355+
using var largeStream = new SimplexStream(8192, 16384);
356+
357+
// Write exactly 4KB (4096 bytes) which should trigger auto-flush
358+
byte[] sendBuffer = this.GetRandomBuffer(AutoFlushThreshold);
359+
if (useAsync)
360+
{
361+
await largeStream.WriteAsync(sendBuffer, 0, sendBuffer.Length, this.TimeoutToken);
362+
}
363+
else
364+
{
365+
largeStream.Write(sendBuffer, 0, sendBuffer.Length);
366+
}
367+
368+
// Data should be available for reading without explicit flush
369+
byte[] recvBuffer = new byte[sendBuffer.Length];
370+
await this.ReadAsync(largeStream, recvBuffer, isAsync: useAsync);
371+
Assert.Equal(sendBuffer, recvBuffer);
372+
}
373+
374+
[Theory]
375+
[CombinatorialData]
376+
public async Task AutoFlush_DoesNotOccurBelow4KB(bool useAsync)
377+
{
378+
// Use a stream with larger thresholds
379+
using var largeStream = new SimplexStream(8192, 16384);
380+
381+
// Write less than 4KB
382+
byte[] sendBuffer = this.GetRandomBuffer(4095);
383+
if (useAsync)
384+
{
385+
await largeStream.WriteAsync(sendBuffer, 0, sendBuffer.Length, this.TimeoutToken);
386+
}
387+
else
388+
{
389+
largeStream.Write(sendBuffer, 0, sendBuffer.Length);
390+
}
391+
392+
// Data should NOT be available without explicit flush - read should timeout
393+
byte[] recvBuffer = new byte[1];
394+
Task<int> readTask = largeStream.ReadAsync(recvBuffer, 0, 1, ExpectedTimeoutToken);
395+
396+
// This should timeout because data hasn't been flushed yet
397+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await readTask);
398+
399+
// After explicit flush, data should be available
400+
await largeStream.FlushAsync();
401+
recvBuffer = new byte[sendBuffer.Length];
402+
await this.ReadAsync(largeStream, recvBuffer, isAsync: useAsync);
403+
Assert.Equal(sendBuffer, recvBuffer);
404+
}
405+
406+
[Theory]
407+
[CombinatorialData]
408+
public async Task AutoFlush_AccumulatesAcrossMultipleWrites(bool useAsync)
409+
{
410+
// Use a stream with larger thresholds to avoid blocking
411+
using var largeStream = new SimplexStream(8192, 16384);
412+
413+
// Write 2KB three times (total 6KB) - should auto-flush after second write
414+
byte[] sendBuffer = this.GetRandomBuffer(6144);
415+
416+
// First write (2KB) - no flush yet
417+
if (useAsync)
418+
{
419+
await largeStream.WriteAsync(sendBuffer, 0, 2048, this.TimeoutToken);
420+
}
421+
else
422+
{
423+
largeStream.Write(sendBuffer, 0, 2048);
424+
}
425+
426+
// Second write (2KB, total 4KB) - should auto-flush
427+
if (useAsync)
428+
{
429+
await largeStream.WriteAsync(sendBuffer, 2048, 2048, this.TimeoutToken);
430+
}
431+
else
432+
{
433+
largeStream.Write(sendBuffer, 2048, 2048);
434+
}
435+
436+
// Data should be available for reading (4KB)
437+
byte[] recvBuffer = new byte[AutoFlushThreshold];
438+
await this.ReadAsync(largeStream, recvBuffer, isAsync: useAsync);
439+
Assert.Equal(sendBuffer.Take(AutoFlushThreshold), recvBuffer);
440+
441+
// Third write (2KB) - not flushed yet
442+
if (useAsync)
443+
{
444+
await largeStream.WriteAsync(sendBuffer, AutoFlushThreshold, 2048, this.TimeoutToken);
445+
}
446+
else
447+
{
448+
largeStream.Write(sendBuffer, AutoFlushThreshold, 2048);
449+
}
450+
451+
// Explicitly flush to make remaining data available
452+
await largeStream.FlushAsync();
453+
recvBuffer = new byte[2048];
454+
await this.ReadAsync(largeStream, recvBuffer, isAsync: useAsync);
455+
Assert.Equal(sendBuffer.Skip(AutoFlushThreshold).Take(2048), recvBuffer);
456+
}
457+
458+
[Fact]
459+
public async Task BackpressureWorks_WithAutoFlush()
460+
{
461+
// This test verifies that the pauseWriterThreshold works correctly with auto-flush
462+
// by having concurrent reading and writing
463+
var simplex = new SimplexStream(2048, AutoFlushThreshold);
464+
465+
try
466+
{
467+
byte[] sendBuffer = this.GetRandomBuffer(8192);
468+
byte[] recvBuffer = new byte[8192];
469+
int bytesRead = 0;
470+
471+
// Start concurrent reader
472+
var readTask = Task.Run(async () =>
473+
{
474+
while (bytesRead < 8192)
475+
{
476+
int count = await simplex.ReadAsync(recvBuffer, bytesRead, 1024, this.TimeoutToken);
477+
if (count == 0)
478+
{
479+
break;
480+
}
481+
482+
bytesRead += count;
483+
await Task.Delay(10); // Simulate slow reader
484+
}
485+
});
486+
487+
// Write 8KB in 1KB chunks (should auto-flush twice at 4KB and 8KB)
488+
for (int i = 0; i < 8; i++)
489+
{
490+
await simplex.WriteAsync(sendBuffer, i * 1024, 1024, this.TimeoutToken);
491+
}
492+
493+
simplex.CompleteWriting();
494+
await readTask.WithCancellation(this.TimeoutToken);
495+
496+
Assert.Equal(8192, bytesRead);
497+
Assert.Equal(sendBuffer, recvBuffer);
498+
}
499+
finally
500+
{
501+
simplex.Dispose();
502+
}
503+
}
504+
505+
[Fact]
506+
public async Task Issue918_LargeWriteSmallReadWithDispose()
507+
{
508+
// This is the scenario from issue #918
509+
var simplex = new SimplexStream(0, 4096);
510+
511+
try
512+
{
513+
int written = 0;
514+
var writeTask = Task.Run(async () =>
515+
{
516+
byte[] buffer = new byte[1024];
517+
int totalToWrite = 10 * 1024 * 1024; // 10 MB
518+
519+
while (written < totalToWrite)
520+
{
521+
await simplex.WriteAsync(buffer, 0, buffer.Length, this.TimeoutToken);
522+
written += buffer.Length;
523+
}
524+
525+
simplex.CompleteWriting();
526+
});
527+
528+
// Read only 1KB
529+
byte[] readBuffer = new byte[1024];
530+
int bytesRead = await simplex.ReadAsync(readBuffer, 0, readBuffer.Length, this.TimeoutToken);
531+
Assert.Equal(1024, bytesRead);
532+
533+
// Dispose the stream - this should cause the writer to fail
534+
simplex.Dispose();
535+
536+
// Wait for writer to complete (it should fail with ObjectDisposedException or similar)
537+
Exception ex = await Assert.ThrowsAnyAsync<Exception>(() => writeTask.WithCancellation(this.TimeoutToken));
538+
this.Logger.WriteLine($"Writer stopped after {written} bytes with: {ex.GetType().Name}: {ex.Message}");
539+
simplex.CompleteWriting(ex);
540+
}
541+
finally
542+
{
543+
simplex.Dispose();
544+
}
545+
}
546+
347547
protected override void Dispose(bool disposing)
348548
{
349549
this.stream.Dispose();

0 commit comments

Comments
 (0)