Skip to content

Commit 41618a4

Browse files
authored
Write unary content with single Stream.WriteAsync (#901)
1 parent 90609ad commit 41618a4

File tree

5 files changed

+57
-47
lines changed

5 files changed

+57
-47
lines changed

src/Grpc.Net.Client.Web/Internal/Base64RequestStream.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,20 @@ private static void EnsureSuccess(OperationStatus status, OperationStatus expect
118118
}
119119

120120
public override async Task FlushAsync(CancellationToken cancellationToken)
121+
{
122+
await WriteRemainderAsync(cancellationToken).ConfigureAwait(false);
123+
await _inner.FlushAsync(cancellationToken).ConfigureAwait(false);
124+
}
125+
126+
internal async Task WriteRemainderAsync(CancellationToken cancellationToken)
121127
{
122128
if (_remainder > 0)
123129
{
124130
EnsureSuccess(Base64.EncodeToUtf8InPlace(_buffer, _remainder, out var bytesWritten));
125131

126-
await _inner.WriteAsync(_buffer.AsMemory(0, bytesWritten), cancellationToken);
132+
await _inner.WriteAsync(_buffer.AsMemory(0, bytesWritten), cancellationToken).ConfigureAwait(false);
127133
_remainder = 0;
128134
}
129-
130-
await _inner.FlushAsync(cancellationToken).ConfigureAwait(false);
131135
}
132136

133137
protected override void Dispose(bool disposing)

src/Grpc.Net.Client.Web/Internal/GrpcWebRequestContent.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,28 @@ public GrpcWebRequestContent(HttpContent inner, GrpcWebMode mode)
3535
_mode = mode;
3636
foreach (var header in inner.Headers)
3737
{
38-
Headers.Add(header.Key, header.Value);
38+
Headers.TryAddWithoutValidation(header.Key, header.Value);
3939
}
4040

4141
Headers.ContentType = (mode == GrpcWebMode.GrpcWebText)
4242
? GrpcWebProtocolConstants.GrpcWebTextHeader
4343
: GrpcWebProtocolConstants.GrpcWebHeader;
4444
}
4545

46-
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
47-
{
48-
Base64RequestStream? base64RequestStream = null;
46+
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
47+
_mode == GrpcWebMode.GrpcWebText
48+
? SerializeTextToStreamAsync(stream)
49+
: _inner.CopyToAsync(stream);
4950

50-
try
51-
{
52-
if (_mode == GrpcWebMode.GrpcWebText)
53-
{
54-
base64RequestStream = new Base64RequestStream(stream);
55-
stream = base64RequestStream;
56-
}
51+
private async Task SerializeTextToStreamAsync(Stream stream)
52+
{
53+
using var base64RequestStream = new Base64RequestStream(stream);
54+
await _inner.CopyToAsync(base64RequestStream).ConfigureAwait(false);
5755

58-
await _inner.CopyToAsync(stream).ConfigureAwait(false);
59-
}
60-
finally
61-
{
62-
base64RequestStream?.Dispose();
63-
}
56+
// Any remaining content needs to be written when SerializeToStreamAsync finishes.
57+
// We want to avoid unnecessary flush calls so a custom method is used to write
58+
// ramining content rather than calling FlushAsync.
59+
await base64RequestStream.WriteRemainderAsync(CancellationToken.None).ConfigureAwait(false);
6460
}
6561

6662
protected override bool TryComputeLength(out long length)

src/Grpc.Net.Client/Internal/DefaultSerializationContext.cs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -135,28 +135,5 @@ public override void Complete()
135135
break;
136136
}
137137
}
138-
139-
public Memory<byte> GetHeader(bool isCompressed, int length)
140-
{
141-
// TODO(JamesNK): We can optimize header allocation when IBufferWriter is being used.
142-
// IBufferWriter can be used to provide a buffer, either before or after message content.
143-
// https://github.com/grpc/grpc-dotnet/issues/784
144-
var buffer = new byte[GrpcProtocolConstants.HeaderSize];
145-
146-
// Compression flag
147-
buffer[0] = isCompressed ? (byte)1 : (byte)0;
148-
149-
// Message length
150-
EncodeMessageLength(length, buffer.AsSpan(1, 4));
151-
152-
return buffer;
153-
}
154-
155-
private static void EncodeMessageLength(int messageLength, Span<byte> destination)
156-
{
157-
Debug.Assert(destination.Length >= GrpcProtocolConstants.MessageDelimiterSize, "Buffer too small to encode message length.");
158-
159-
BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
160-
}
161138
}
162139
}

src/Grpc.Net.Client/Internal/HttpContentClientStreamWriter.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,10 @@ private async Task WriteAsyncCore(TRequest message)
163163
callOptions = callOptions.WithWriteOptions(WriteOptions);
164164
}
165165

166-
await _call.WriteMessageAsync(writeStream, message, _grpcEncoding, callOptions);
166+
await _call.WriteMessageAsync(writeStream, message, _grpcEncoding, callOptions).ConfigureAwait(false);
167+
168+
// Flush stream to ensure messages are sent immediately
169+
await writeStream.FlushAsync(callOptions.CancellationToken).ConfigureAwait(false);
167170

168171
GrpcEventSource.Log.MessageSent();
169172
}

src/Grpc.Net.Client/Internal/StreamExtensions.cs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,23 @@ public static async ValueTask WriteMessageAsync<TMessage>(
294294
data);
295295
}
296296

297-
await stream.WriteAsync(serializationContext.GetHeader(isCompressed, data.Length), callOptions.CancellationToken).ConfigureAwait(false);
298-
await stream.WriteAsync(data, callOptions.CancellationToken).ConfigureAwait(false);
299-
await stream.FlushAsync(callOptions.CancellationToken).ConfigureAwait(false);
297+
var totalSize = data.Length + GrpcProtocolConstants.HeaderSize;
298+
var completeData = ArrayPool<byte>.Shared.Rent(totalSize);
299+
try
300+
{
301+
var buffer = completeData.AsMemory(0, totalSize);
302+
303+
WriteHeader(buffer.Span.Slice(0, GrpcProtocolConstants.HeaderSize), isCompressed, data.Length);
304+
data.CopyTo(buffer.Slice(GrpcProtocolConstants.HeaderSize));
305+
306+
// Sending the header+content in a single WriteAsync call has significant performance benefits
307+
// https://github.com/dotnet/runtime/issues/35184#issuecomment-626304981
308+
await stream.WriteAsync(buffer, callOptions.CancellationToken).ConfigureAwait(false);
309+
}
310+
finally
311+
{
312+
ArrayPool<byte>.Shared.Return(completeData);
313+
}
300314

301315
GrpcCallLog.MessageSent(logger);
302316
}
@@ -328,5 +342,21 @@ private static ReadOnlyMemory<byte> CompressMessage(ILogger logger, string compr
328342
// Should never reach here
329343
throw new InvalidOperationException($"Could not find compression provider for '{compressionEncoding}'.");
330344
}
345+
346+
private static void WriteHeader(Span<byte> headerData, bool isCompressed, int length)
347+
{
348+
// Compression flag
349+
headerData[0] = isCompressed ? (byte)1 : (byte)0;
350+
351+
// Message length
352+
EncodeMessageLength(length, headerData.Slice(1, 4));
353+
}
354+
355+
private static void EncodeMessageLength(int messageLength, Span<byte> destination)
356+
{
357+
Debug.Assert(destination.Length >= GrpcProtocolConstants.MessageDelimiterSize, "Buffer too small to encode message length.");
358+
359+
BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
360+
}
331361
}
332362
}

0 commit comments

Comments
 (0)