Skip to content

Commit cdd38d7

Browse files
authored
Implement Stream Adapters (and minor nitpics) (#6583)
1 parent 6a44aca commit cdd38d7

25 files changed

+1219
-47
lines changed

src/Http/Http.Abstractions/src/Microsoft.AspNetCore.Http.Abstractions.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ Microsoft.AspNetCore.Http.HttpResponse</Description>
2222
<Reference Include="Microsoft.AspNetCore.Http.Features" />
2323
<Reference Include="Microsoft.Extensions.ActivatorUtilities.Sources" PrivateAssets="All" />
2424
<Reference Include="System.Text.Encodings.Web" />
25-
<Reference Include="System.IO.Pipelines" />
2625
</ItemGroup>
2726

2827
</Project>

src/Http/Http.Features/src/IRequestBodyPipeFeature.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace Microsoft.AspNetCore.Http.Features
1111
public interface IRequestBodyPipeFeature
1212
{
1313
/// <summary>
14-
/// A <see cref="PipeWriter"/> representing the request body, if any.
14+
/// A <see cref="PipeReader"/> representing the request body, if any.
1515
/// </summary>
1616
PipeReader RequestBodyPipe { get; set; }
1717
}

src/Http/Http.Features/test/Microsoft.AspNetCore.Http.Features.Tests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFrameworks>netcoreapp3.0;net461</TargetFrameworks>
4+
<TargetFramework>netcoreapp3.0</TargetFramework>
55
</PropertyGroup>
66

77
<ItemGroup>

src/Http/Http/src/BufferSegment.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace System.IO.Pipelines
99
{
10-
public sealed class BufferSegment : ReadOnlySequenceSegment<byte>
10+
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
1111
{
1212
private IMemoryOwner<byte> _memoryOwner;
1313
private BufferSegment _next;

src/Http/Http/src/Microsoft.AspNetCore.Http.csproj

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<Description>ASP.NET Core default HTTP feature implementations.</Description>
@@ -19,7 +19,6 @@
1919
<Reference Include="Microsoft.Extensions.ObjectPool" />
2020
<Reference Include="Microsoft.Extensions.Options" />
2121
<Reference Include="Microsoft.Net.Http.Headers" />
22-
<Reference Include="System.IO.Pipelines" />
2322
</ItemGroup>
2423

2524
</Project>
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
using System.Runtime.CompilerServices;
2+
3+
[assembly: InternalsVisibleTo("Microsoft.AspNetCore.Http.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100f33a29044fa9d740c9b3213a93e57c84b472c84e0b8a0e1ae48e67a9f8f6de9d5f7f3d52ac23e48ac51801f1dc950abe901da34d2a9e3baadb141a17c77ef3c565dd5ee5054b91cf63bb3c6ab83f72ab3aafe93d0fc3c2348b764fafb0b1c0733de51459aeab46580384bf9d74c4e28164b7cde247f891ba07891c9d872ad2bb")]
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace System.IO.Pipelines
10+
{
11+
/// <summary>
12+
/// Represents a read-only Stream backed by a PipeReader
13+
/// </summary>
14+
public class ReadOnlyPipeStream : Stream
15+
{
16+
private readonly PipeReader _pipeReader;
17+
private bool _allowSynchronousIO = true;
18+
19+
/// <summary>
20+
/// Creates a new ReadOnlyPipeStream
21+
/// </summary>
22+
/// <param name="pipeReader">The PipeReader to read from.</param>
23+
public ReadOnlyPipeStream(PipeReader pipeReader) :
24+
this(pipeReader, allowSynchronousIO: true)
25+
{
26+
}
27+
28+
/// <summary>
29+
/// Creates a new ReadOnlyPipeStream
30+
/// </summary>
31+
/// <param name="pipeReader">The PipeReader to read from.</param>
32+
/// <param name="allowSynchronousIO">Whether synchronous IO is allowed.</param>
33+
public ReadOnlyPipeStream(PipeReader pipeReader, bool allowSynchronousIO)
34+
{
35+
_allowSynchronousIO = allowSynchronousIO;
36+
_pipeReader = pipeReader;
37+
}
38+
39+
/// <inheritdoc />
40+
public override bool CanSeek => false;
41+
42+
/// <inheritdoc />
43+
public override bool CanRead => true;
44+
45+
/// <inheritdoc />
46+
public override bool CanWrite => false;
47+
48+
/// <inheritdoc />
49+
public override long Length => throw new NotSupportedException();
50+
51+
/// <inheritdoc />
52+
public override long Position
53+
{
54+
get => throw new NotSupportedException();
55+
set => throw new NotSupportedException();
56+
}
57+
58+
/// <inheritdoc />
59+
public override int WriteTimeout
60+
{
61+
get => throw new NotSupportedException();
62+
set => throw new NotSupportedException();
63+
}
64+
65+
/// <inheritdoc />
66+
public override void Write(byte[] buffer, int offset, int count)
67+
=> throw new NotSupportedException();
68+
69+
/// <inheritdoc />
70+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
71+
=> throw new NotSupportedException();
72+
73+
/// <inheritdoc />
74+
public override void Flush()
75+
{
76+
throw new NotSupportedException();
77+
}
78+
79+
/// <inheritdoc />
80+
public override Task FlushAsync(CancellationToken cancellationToken)
81+
{
82+
throw new NotSupportedException();
83+
}
84+
85+
/// <inheritdoc />
86+
public override long Seek(long offset, SeekOrigin origin)
87+
{
88+
throw new NotSupportedException();
89+
}
90+
91+
/// <inheritdoc />
92+
public override void SetLength(long value)
93+
{
94+
throw new NotSupportedException();
95+
}
96+
97+
/// <inheritdoc />
98+
public override int Read(byte[] buffer, int offset, int count)
99+
{
100+
if (!_allowSynchronousIO)
101+
{
102+
ThrowHelper.ThrowInvalidOperationException_SynchronousReadsDisallowed();
103+
}
104+
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
105+
}
106+
107+
/// <inheritdoc />
108+
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
109+
{
110+
var task = ReadAsync(buffer, offset, count, default, state);
111+
if (callback != null)
112+
{
113+
task.ContinueWith(t => callback.Invoke(t));
114+
}
115+
return task;
116+
}
117+
118+
/// <inheritdoc />
119+
public override int EndRead(IAsyncResult asyncResult)
120+
{
121+
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
122+
}
123+
124+
private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
125+
{
126+
var tcs = new TaskCompletionSource<int>(state);
127+
var task = ReadAsync(buffer, offset, count, cancellationToken);
128+
task.ContinueWith((task2, state2) =>
129+
{
130+
var tcs2 = (TaskCompletionSource<int>)state2;
131+
if (task2.IsCanceled)
132+
{
133+
tcs2.SetCanceled();
134+
}
135+
else if (task2.IsFaulted)
136+
{
137+
tcs2.SetException(task2.Exception);
138+
}
139+
else
140+
{
141+
tcs2.SetResult(task2.Result);
142+
}
143+
}, tcs, cancellationToken);
144+
return tcs.Task;
145+
}
146+
147+
/// <inheritdoc />
148+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
149+
{
150+
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
151+
}
152+
153+
/// <inheritdoc />
154+
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
155+
{
156+
return ReadAsyncInternal(destination, cancellationToken);
157+
}
158+
159+
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
160+
{
161+
while (true)
162+
{
163+
var result = await _pipeReader.ReadAsync(cancellationToken);
164+
var readableBuffer = result.Buffer;
165+
var readableBufferLength = readableBuffer.Length;
166+
167+
var consumed = readableBuffer.End;
168+
var actual = 0;
169+
try
170+
{
171+
if (readableBufferLength != 0)
172+
{
173+
actual = (int)Math.Min(readableBufferLength, buffer.Length);
174+
175+
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
176+
consumed = slice.End;
177+
slice.CopyTo(buffer.Span);
178+
179+
return actual;
180+
}
181+
182+
if (result.IsCompleted)
183+
{
184+
return 0;
185+
}
186+
}
187+
finally
188+
{
189+
_pipeReader.AdvanceTo(consumed);
190+
}
191+
}
192+
}
193+
194+
/// <inheritdoc />
195+
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
196+
{
197+
if (destination == null)
198+
{
199+
throw new ArgumentNullException(nameof(destination));
200+
}
201+
202+
if (bufferSize <= 0)
203+
{
204+
throw new ArgumentOutOfRangeException(nameof(bufferSize));
205+
}
206+
207+
return CopyToAsyncInternal(destination, cancellationToken);
208+
}
209+
210+
private async Task CopyToAsyncInternal(Stream destination, CancellationToken cancellationToken)
211+
{
212+
while (true)
213+
{
214+
var result = await _pipeReader.ReadAsync(cancellationToken);
215+
var readableBuffer = result.Buffer;
216+
var readableBufferLength = readableBuffer.Length;
217+
218+
try
219+
{
220+
if (readableBufferLength != 0)
221+
{
222+
foreach (var memory in readableBuffer)
223+
{
224+
await destination.WriteAsync(memory, cancellationToken);
225+
}
226+
}
227+
228+
if (result.IsCompleted)
229+
{
230+
return;
231+
}
232+
}
233+
finally
234+
{
235+
_pipeReader.AdvanceTo(readableBuffer.End);
236+
}
237+
}
238+
}
239+
}
240+
}

src/Http/Http/src/StreamPipeReader.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public class StreamPipeReader : PipeReader, IDisposable
2525
private readonly MemoryPool<byte> _pool;
2626

2727
private CancellationTokenSource _internalTokenSource;
28-
private bool _isCompleted;
28+
private bool _isReaderCompleted;
29+
private bool _isWriterCompleted;
2930
private ExceptionDispatchInfo _exceptionInfo;
3031

3132
private BufferSegment _readHead;
@@ -182,12 +183,12 @@ public override void CancelPendingRead()
182183
/// <inheritdoc />
183184
public override void Complete(Exception exception = null)
184185
{
185-
if (_isCompleted)
186+
if (_isReaderCompleted)
186187
{
187188
return;
188189
}
189190

190-
_isCompleted = true;
191+
_isReaderCompleted = true;
191192
if (exception != null)
192193
{
193194
_exceptionInfo = ExceptionDispatchInfo.Capture(exception);
@@ -248,6 +249,11 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
248249

249250
_readTail.End += length;
250251
_bufferedBytes += length;
252+
253+
if (length == 0)
254+
{
255+
_isWriterCompleted = true;
256+
}
251257
}
252258
catch (OperationCanceledException)
253259
{
@@ -275,7 +281,7 @@ private void ClearCancellationToken()
275281

276282
private void ThrowIfCompleted()
277283
{
278-
if (_isCompleted)
284+
if (_isReaderCompleted)
279285
{
280286
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
281287
}
@@ -357,7 +363,7 @@ private void Cancel()
357363
[MethodImpl(MethodImplOptions.AggressiveInlining)]
358364
private bool IsCompletedOrThrow()
359365
{
360-
if (!_isCompleted)
366+
if (!_isWriterCompleted)
361367
{
362368
return false;
363369
}

src/Http/Http/src/StreamPipeWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public StreamPipeWriter(Stream writingStream, int minimumSegmentSize, MemoryPool
6565
}
6666

6767
/// <summary>
68-
/// Gets the inner stream that is being read from.
68+
/// Gets the inner stream that is being written to.
6969
/// </summary>
7070
public Stream InnerStream => _writingStream;
7171

src/Http/Http/src/ThrowHelper.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,17 @@ internal static class ThrowHelper
1919
public static void ThrowInvalidOperationException_NoDataRead() => throw CreateInvalidOperationException_NoDataRead();
2020
[MethodImpl(MethodImplOptions.NoInlining)]
2121
public static Exception CreateInvalidOperationException_NoDataRead() => new InvalidOperationException("No data has been read into the StreamPipeReader.");
22+
23+
public static void ThrowInvalidOperationException_SynchronousReadsDisallowed() => throw CreateInvalidOperationException_SynchronousReadsDisallowed();
24+
[MethodImpl(MethodImplOptions.NoInlining)]
25+
public static Exception CreateInvalidOperationException_SynchronousReadsDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call ReadAsync or set allowSynchronousIO to true instead.");
26+
27+
public static void ThrowInvalidOperationException_SynchronousWritesDisallowed() => throw CreateInvalidOperationException_SynchronousWritesDisallowed();
28+
[MethodImpl(MethodImplOptions.NoInlining)]
29+
public static Exception CreateInvalidOperationException_SynchronousWritesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call WriteAsync or set allowSynchronousIO to true instead.");
30+
31+
public static void ThrowInvalidOperationException_SynchronousFlushesDisallowed() => throw CreateInvalidOperationException_SynchronousFlushesDisallowed();
32+
[MethodImpl(MethodImplOptions.NoInlining)]
33+
public static Exception CreateInvalidOperationException_SynchronousFlushesDisallowed() => new InvalidOperationException("Synchronous operations are disallowed. Call FlushAsync or set allowSynchronousIO to true instead.");
2234
}
2335
}

0 commit comments

Comments
 (0)