Skip to content

Commit 3f1760c

Browse files
authored
Reduce copies for WebSocket reads (#6656)
- Change the upgrade message body to use the transport pipe instead of the request body pipe. - Remove some logic from the base type (more can be removed but this is a conservative change) - Improve performance of the ReadAsync call
1 parent cd0eab8 commit 3f1760c

File tree

1 file changed

+86
-13
lines changed

1 file changed

+86
-13
lines changed

src/Servers/Kestrel/Core/src/Internal/Http/Http1MessageBody.cs

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Diagnostics;
67
using System.IO;
78
using System.IO.Pipelines;
9+
using System.Threading;
810
using System.Threading.Tasks;
911
using Microsoft.AspNetCore.Connections;
1012
using Microsoft.AspNetCore.Http;
@@ -27,6 +29,8 @@ protected Http1MessageBody(Http1Connection context)
2729

2830
private async Task PumpAsync()
2931
{
32+
Debug.Assert(!RequestUpgrade, "Upgraded connections should never use this code path!");
33+
3034
Exception error = null;
3135

3236
try
@@ -81,14 +85,6 @@ private async Task PumpAsync()
8185
// closing the connection without a response as expected.
8286
_context.OnInputOrOutputCompleted();
8387

84-
// Treat any FIN from an upgraded request as expected.
85-
// It's up to higher-level consumer (i.e. WebSocket middleware) to determine
86-
// if the end is actually expected based on higher-level framing.
87-
if (RequestUpgrade)
88-
{
89-
break;
90-
}
91-
9288
BadHttpRequestException.Throw(RequestRejectionReason.UnexpectedEndOfRequestContent);
9389
}
9490
}
@@ -291,6 +287,10 @@ public static MessageBody For(
291287
return keepAlive ? MessageBody.ZeroContentLengthKeepAlive : MessageBody.ZeroContentLengthClose;
292288
}
293289

290+
/// <summary>
291+
/// The upgrade stream uses the raw connection stream instead of going through the RequestBodyPipe. This
292+
/// removes the redundant copy from the transport pipe to the body pipe.
293+
/// </summary>
294294
private class ForUpgrade : Http1MessageBody
295295
{
296296
public ForUpgrade(Http1Connection context)
@@ -299,14 +299,87 @@ public ForUpgrade(Http1Connection context)
299299
RequestUpgrade = true;
300300
}
301301

302+
// This returns IsEmpty so we can avoid draining the body (since it's basically an endless stream)
302303
public override bool IsEmpty => true;
303304

304-
protected override bool Read(ReadOnlySequence<byte> readableBuffer, PipeWriter writableBuffer, out SequencePosition consumed, out SequencePosition examined)
305+
public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
305306
{
306-
Copy(readableBuffer, writableBuffer);
307-
consumed = readableBuffer.End;
308-
examined = readableBuffer.End;
309-
return false;
307+
while (true)
308+
{
309+
var result = await _context.Input.ReadAsync(cancellationToken);
310+
var readableBuffer = result.Buffer;
311+
var readableBufferLength = readableBuffer.Length;
312+
313+
try
314+
{
315+
if (!readableBuffer.IsEmpty)
316+
{
317+
foreach (var memory in readableBuffer)
318+
{
319+
// REVIEW: This *could* be slower if 2 things are true
320+
// - The WriteAsync(ReadOnlyMemory<byte>) isn't overridden on the destination
321+
// - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory
322+
await destination.WriteAsync(memory, cancellationToken);
323+
}
324+
}
325+
326+
if (result.IsCompleted)
327+
{
328+
return;
329+
}
330+
}
331+
finally
332+
{
333+
_context.Input.AdvanceTo(readableBuffer.End);
334+
}
335+
}
336+
}
337+
338+
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
339+
{
340+
while (true)
341+
{
342+
var result = await _context.Input.ReadAsync(cancellationToken);
343+
var readableBuffer = result.Buffer;
344+
var readableBufferLength = readableBuffer.Length;
345+
346+
var consumed = readableBuffer.End;
347+
var actual = 0;
348+
349+
try
350+
{
351+
if (readableBufferLength != 0)
352+
{
353+
// buffer.Length is int
354+
actual = (int)Math.Min(readableBufferLength, buffer.Length);
355+
356+
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
357+
consumed = slice.End;
358+
slice.CopyTo(buffer.Span);
359+
360+
return actual;
361+
}
362+
363+
if (result.IsCompleted)
364+
{
365+
return 0;
366+
}
367+
}
368+
finally
369+
{
370+
_context.Input.AdvanceTo(consumed);
371+
}
372+
}
373+
}
374+
375+
public override Task ConsumeAsync()
376+
{
377+
return Task.CompletedTask;
378+
}
379+
380+
public override Task StopAsync()
381+
{
382+
return Task.CompletedTask;
310383
}
311384
}
312385

0 commit comments

Comments
 (0)