@@ -52,11 +52,6 @@ public class SimplexStream : Stream, IBufferWriter<byte>, IDisposableObservable
5252 /// </summary>
5353 private bool completed ;
5454
55- /// <summary>
56- /// The number of bytes written since the last flush.
57- /// </summary>
58- private int bytesSinceLastFlush ;
59-
6055 /// <summary>
6156 /// Initializes a new instance of the <see cref="SimplexStream"/> class.
6257 /// </summary>
@@ -80,6 +75,10 @@ public SimplexStream(int resumeWriterThreshold, int pauseWriterThreshold)
8075 resumeWriterThreshold : resumeWriterThreshold ,
8176 useSynchronizationContext : false ) ;
8277 this . pipe = new Pipe ( options ) ;
78+ if ( ! this . pipe . Writer . CanGetUnflushedBytes )
79+ {
80+ throw new NotSupportedException ( "Pipe writer does not support getting unflushed bytes." ) ;
81+ }
8382 }
8483
8584 /// <inheritdoc />
@@ -104,8 +103,6 @@ public override long Position
104103 set => throw this . ThrowDisposedOr ( new NotSupportedException ( ) ) ;
105104 }
106105
107- private long UnflushedBytes => this . pipe . Writer . CanGetUnflushedBytes ? this . pipe . Writer . UnflushedBytes : this . bytesSinceLastFlush ;
108-
109106 /// <summary>
110107 /// Signals that no more writing will take place, causing readers to receive 0 bytes when asking for any more data.
111108 /// </summary>
@@ -181,21 +178,16 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc
181178 Memory < byte > memory = this . pipe . Writer . GetMemory ( count ) ;
182179 buffer . AsMemory ( offset , count ) . CopyTo ( memory ) ;
183180 this . pipe . Writer . Advance ( count ) ;
184- this . RecordBytesWritten ( count ) ;
185181
186182 // Auto-flush if we've written enough data
187- if ( this . UnflushedBytes >= AutoFlushThreshold )
183+ if ( this . pipe . Writer . UnflushedBytes >= AutoFlushThreshold )
188184 {
189185 await this . FlushAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
190186 }
191187 }
192188
193189 /// <inheritdoc />
194- void IBufferWriter < byte > . Advance ( int count )
195- {
196- this . pipe . Writer . Advance ( count ) ;
197- this . RecordBytesWritten ( count ) ;
198- }
190+ void IBufferWriter < byte > . Advance ( int count ) => this . pipe . Writer . Advance ( count ) ;
199191
200192 /// <inheritdoc />
201193 Memory < byte > IBufferWriter < byte > . GetMemory ( int sizeHint ) => this . pipe . Writer . GetMemory ( sizeHint ) ;
@@ -220,10 +212,9 @@ public override void Write(byte[] buffer, int offset, int count)
220212 Memory < byte > memory = this . pipe . Writer . GetMemory ( count ) ;
221213 buffer . AsMemory ( offset , count ) . CopyTo ( memory ) ;
222214 this . pipe . Writer . Advance ( count ) ;
223- this . RecordBytesWritten ( count ) ;
224215
225216 // Auto-flush if we've written enough data
226- if ( this . UnflushedBytes >= AutoFlushThreshold )
217+ if ( this . pipe . Writer . UnflushedBytes >= AutoFlushThreshold )
227218 {
228219 this . Flush ( ) ;
229220 }
@@ -248,18 +239,5 @@ private Exception ThrowDisposedOr(Exception ex)
248239 Verify . NotDisposed ( this ) ;
249240 throw ex ;
250241 }
251-
252- private void RecordBytesWritten ( int count )
253- {
254- if ( this . pipe . Writer . CanGetUnflushedBytes )
255- {
256- // The PipeWriter is tracking unflushed bytes for us, so we don't need to.
257- return ;
258- }
259-
260- this . bytesSinceLastFlush += count ;
261- }
262-
263- private void ResetBytesSinceLastFlush ( ) => this . bytesSinceLastFlush = 0 ;
264242 }
265243}
0 commit comments