Skip to content

Commit 90609ad

Browse files
authored
Add GrpcChannelOptions.HttpHandler and channel default to invoker (#896)
1 parent 1b3588c commit 90609ad

File tree

13 files changed

+316
-119
lines changed

13 files changed

+316
-119
lines changed

perf/Grpc.AspNetCore.Microbenchmarks/Client/UnaryClientBenchmarkBase.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,9 @@ public void GlobalSetup()
6464
return ResponseUtils.CreateResponse(HttpStatusCode.OK, content, grpcEncoding: ResponseCompressionAlgorithm);
6565
});
6666

67-
var httpClient = new HttpClient(handler);
68-
6967
var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions
7068
{
71-
HttpClient = httpClient,
69+
HttpHandler = handler,
7270
CompressionProviders = CompressionProviders
7371
});
7472

src/Grpc.Net.Client/GrpcChannel.cs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System;
2020
using System.Collections.Concurrent;
2121
using System.Collections.Generic;
22+
using System.Linq;
2223
using System.Net.Http;
2324
using System.Threading;
2425
using Grpc.Core;
@@ -40,9 +41,11 @@ public sealed class GrpcChannel : ChannelBase, IDisposable
4041

4142
private readonly ConcurrentDictionary<IMethod, GrpcMethodInfo> _methodInfoCache;
4243
private readonly Func<IMethod, GrpcMethodInfo> _createMethodInfoFunc;
44+
// Internal for testing
45+
internal readonly HashSet<IDisposable> ActiveCalls;
4346

4447
internal Uri Address { get; }
45-
internal HttpClient HttpClient { get; }
48+
internal HttpMessageInvoker HttpInvoker { get; }
4649
internal int? SendMaxMessageSize { get; }
4750
internal int? ReceiveMaxMessageSize { get; }
4851
internal ILoggerFactory LoggerFactory { get; }
@@ -63,20 +66,22 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr
6366
{
6467
_methodInfoCache = new ConcurrentDictionary<IMethod, GrpcMethodInfo>();
6568

66-
// Dispose the HttpClient if...
67-
// 1. No client was specified and so the channel created the HttpClient itself
68-
// 2. User has specified a client and set DisposeHttpClient to true
69-
_shouldDisposeHttpClient = channelOptions.HttpClient == null || channelOptions.DisposeHttpClient;
69+
// Dispose the HTTP client/handler if...
70+
// 1. No client/handler was specified and so the channel created the client itself
71+
// 2. User has specified a client/handler and set DisposeHttpClient to true
72+
_shouldDisposeHttpClient = (channelOptions.HttpClient == null && channelOptions.HttpHandler == null)
73+
|| channelOptions.DisposeHttpClient;
7074

7175
Address = address;
72-
HttpClient = channelOptions.HttpClient ?? CreateInternalHttpClient();
76+
HttpInvoker = channelOptions.HttpClient ?? CreateInternalHttpInvoker(channelOptions.HttpHandler);
7377
SendMaxMessageSize = channelOptions.MaxSendMessageSize;
7478
ReceiveMaxMessageSize = channelOptions.MaxReceiveMessageSize;
7579
CompressionProviders = ResolveCompressionProviders(channelOptions.CompressionProviders);
7680
MessageAcceptEncoding = GrpcProtocolHelpers.GetMessageAcceptEncoding(CompressionProviders);
7781
LoggerFactory = channelOptions.LoggerFactory ?? NullLoggerFactory.Instance;
7882
ThrowOperationCanceledOnCancellation = channelOptions.ThrowOperationCanceledOnCancellation;
7983
_createMethodInfoFunc = CreateMethodInfo;
84+
ActiveCalls = new HashSet<IDisposable>();
8085

8186
if (channelOptions.Credentials != null)
8287
{
@@ -90,21 +95,29 @@ internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(addr
9095
}
9196
}
9297

93-
private static HttpClient CreateInternalHttpClient()
98+
private static HttpMessageInvoker CreateInternalHttpInvoker(HttpMessageHandler? handler)
9499
{
95-
var httpClient = new HttpClient();
96-
97-
// Long running server and duplex streaming gRPC requests may not
98-
// return any messages for over 100 seconds, triggering a cancellation
99-
// of HttpClient.SendAsync. Disable timeout in internally created
100-
// HttpClient for channel.
101-
//
102-
// gRPC deadline should be the recommended way to timeout gRPC calls.
103-
//
104-
// https://github.com/dotnet/corefx/issues/41650
105-
httpClient.Timeout = Timeout.InfiniteTimeSpan;
106-
107-
return httpClient;
100+
// HttpMessageInvoker should always dispose handler if Disposed is called on it.
101+
// Decision to dispose invoker is controlled by _shouldDisposeHttpClient.
102+
var httpInvoker = new HttpMessageInvoker(handler ?? new HttpClientHandler(), disposeHandler: true);
103+
104+
return httpInvoker;
105+
}
106+
107+
internal void RegisterActiveCall(IDisposable grpcCall)
108+
{
109+
lock (ActiveCalls)
110+
{
111+
ActiveCalls.Add(grpcCall);
112+
}
113+
}
114+
115+
internal void FinishActiveCall(IDisposable grpcCall)
116+
{
117+
lock (ActiveCalls)
118+
{
119+
ActiveCalls.Remove(grpcCall);
120+
}
108121
}
109122

110123
internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method)
@@ -261,6 +274,12 @@ public static GrpcChannel ForAddress(Uri address, GrpcChannelOptions channelOpti
261274
throw new ArgumentNullException(nameof(channelOptions));
262275
}
263276

277+
if (channelOptions.HttpClient != null && channelOptions.HttpHandler != null)
278+
{
279+
throw new ArgumentException($"{nameof(GrpcChannelOptions.HttpClient)} and {nameof(GrpcChannelOptions.HttpHandler)} have been configured. " +
280+
$"Only one HTTP caller can be specified.");
281+
}
282+
264283
return new GrpcChannel(address, channelOptions);
265284
}
266285

@@ -275,9 +294,24 @@ public void Dispose()
275294
return;
276295
}
277296

297+
lock (ActiveCalls)
298+
{
299+
if (ActiveCalls.Count > 0)
300+
{
301+
// Disposing a call will remove it from ActiveCalls. Need to take a copy
302+
// to avoid enumeration from being modified
303+
var activeCallsCopy = ActiveCalls.ToArray();
304+
305+
foreach (var activeCall in activeCallsCopy)
306+
{
307+
activeCall.Dispose();
308+
}
309+
}
310+
}
311+
278312
if (_shouldDisposeHttpClient)
279313
{
280-
HttpClient.Dispose();
314+
HttpInvoker.Dispose();
281315
}
282316
Disposed = true;
283317
}

src/Grpc.Net.Client/GrpcChannelOptions.cs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,23 +68,46 @@ public sealed class GrpcChannelOptions
6868
public ILoggerFactory? LoggerFactory { get; set; }
6969

7070
/// <summary>
71-
/// Gets or sets the <see cref="HttpClient"/> used by the channel.
71+
/// Gets or sets the <see cref="System.Net.Http.HttpClient"/> used by the channel to make HTTP calls.
7272
/// </summary>
7373
/// <remarks>
74+
/// <para>
7475
/// By default a <see cref="System.Net.Http.HttpClient"/> specified here will not be disposed with the channel.
7576
/// To dispose the <see cref="System.Net.Http.HttpClient"/> with the channel you must set <see cref="DisposeHttpClient"/>
7677
/// to <c>true</c>.
78+
/// </para>
79+
/// <para>
80+
/// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured
81+
/// together with <see cref="HttpHandler"/>.
82+
/// </para>
7783
/// </remarks>
7884
public HttpClient? HttpClient { get; set; }
7985

8086
/// <summary>
81-
/// Gets or sets a value indicating whether the underlying <see cref="System.Net.Http.HttpClient"/> should be disposed
82-
/// when the <see cref="GrpcChannel"/> instance is disposed. The default value is <c>false</c>.
87+
/// Gets or sets the <see cref="HttpMessageHandler"/> used by the channel to make HTTP calls.
88+
/// </summary>
89+
/// <remarks>
90+
/// <para>
91+
/// By default a <see cref="HttpMessageHandler"/> specified here will not be disposed with the channel.
92+
/// To dispose the <see cref="HttpMessageHandler"/> with the channel you must set <see cref="DisposeHttpClient"/>
93+
/// to <c>true</c>.
94+
/// </para>
95+
/// <para>
96+
/// Only one HTTP caller can be specified for a channel. An error will be thrown if this is configured
97+
/// together with <see cref="HttpClient"/>.
98+
/// </para>
99+
/// </remarks>
100+
public HttpMessageHandler? HttpHandler { get; set; }
101+
102+
/// <summary>
103+
/// Gets or sets a value indicating whether the underlying <see cref="System.Net.Http.HttpClient"/> or
104+
/// <see cref="HttpMessageHandler"/> should be disposed when the <see cref="GrpcChannel"/> instance is disposed.
105+
/// The default value is <c>false</c>.
83106
/// </summary>
84107
/// <remarks>
85-
/// This setting is used when a <see cref="HttpClient"/> value is specified. If no <see cref="HttpClient"/> value is provided
86-
/// then the channel will create an <see cref="System.Net.Http.HttpClient"/> instance that is always disposed when
87-
/// the channel is disposed.
108+
/// This setting is used when a <see cref="HttpClient"/> or <see cref="HttpHandler"/> value is specified.
109+
/// If they are not specified then the channel will create an internal HTTP caller that is always disposed
110+
/// when the channel is disposed.
88111
/// </remarks>
89112
public bool DisposeHttpClient { get; set; }
90113

src/Grpc.Net.Client/Internal/GrpcCall.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public GrpcCall(Method<TRequest, TResponse> method, GrpcMethodInfo grpcMethodInf
7575
Channel = channel;
7676
Logger = channel.LoggerFactory.CreateLogger(LoggerName);
7777
_deadline = options.Deadline ?? DateTime.MaxValue;
78+
79+
Channel.RegisterActiveCall(this);
7880
}
7981

8082
private void ValidateDeadline(DateTime? deadline)
@@ -168,6 +170,8 @@ private void Cleanup(Status status)
168170
ClientStreamReader?.HttpResponseTcs.TrySetCanceled();
169171
}
170172

173+
Channel.FinishActiveCall(this);
174+
171175
_ctsRegistration?.Dispose();
172176
_deadlineTimer?.Dispose();
173177
HttpResponse?.Dispose();
@@ -459,7 +463,12 @@ private async ValueTask RunCall(HttpRequestMessage request, TimeSpan? timeout)
459463

460464
try
461465
{
462-
_httpResponseTask = Channel.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token);
466+
// If a HttpClient has been specified then we need to call it with ResponseHeadersRead
467+
// so that the response message is available for streaming
468+
_httpResponseTask = (Channel.HttpInvoker is HttpClient httpClient)
469+
? httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _callCts.Token)
470+
: Channel.HttpInvoker.SendAsync(request, _callCts.Token);
471+
463472
HttpResponse = await _httpResponseTask.ConfigureAwait(false);
464473
}
465474
catch (Exception ex)
@@ -768,11 +777,11 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)
768777
var headers = message.Headers;
769778

770779
// User agent is optional but recommended.
771-
headers.Add(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue);
780+
headers.TryAddWithoutValidation(GrpcProtocolConstants.UserAgentHeader, GrpcProtocolConstants.UserAgentHeaderValue);
772781
// TE is required by some servers, e.g. C Core.
773782
// A missing TE header results in servers aborting the gRPC call.
774-
headers.Add(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue);
775-
headers.Add(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding);
783+
headers.TryAddWithoutValidation(GrpcProtocolConstants.TEHeader, GrpcProtocolConstants.TEHeaderValue);
784+
headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageAcceptEncodingHeader, Channel.MessageAcceptEncoding);
776785

777786
if (Options.Headers != null && Options.Headers.Count > 0)
778787
{
@@ -788,7 +797,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)
788797
// grpc-internal-encoding-request is used in the client to set message compression.
789798
// 'grpc-encoding' is sent even if WriteOptions.Flags = NoCompress. In that situation
790799
// individual messages will not be written with compression.
791-
headers.Add(GrpcProtocolConstants.MessageEncodingHeader, entry.Value);
800+
headers.TryAddWithoutValidation(GrpcProtocolConstants.MessageEncodingHeader, entry.Value);
792801
}
793802
else
794803
{
@@ -799,7 +808,7 @@ private HttpRequestMessage CreateHttpRequestMessage(TimeSpan? timeout)
799808

800809
if (timeout != null)
801810
{
802-
headers.Add(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond));
811+
headers.TryAddWithoutValidation(GrpcProtocolConstants.TimeoutHeader, GrpcProtocolHelpers.EncodeTimeout(timeout.Value.Ticks / TimeSpan.TicksPerMillisecond));
803812
}
804813

805814
return message;

src/Grpc.Net.Client/Internal/GrpcProtocolHelpers.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ internal async static Task ReadCredentialMetadata(
269269
public static void AddHeader(HttpRequestHeaders headers, Metadata.Entry entry)
270270
{
271271
var value = entry.IsBinary ? Convert.ToBase64String(entry.ValueBytes) : entry.Value;
272-
headers.Add(entry.Key, value);
272+
headers.TryAddWithoutValidation(entry.Key, value);
273273
}
274274

275275
public static string? GetHeaderValue(HttpHeaders? headers, string name)

src/Grpc.Net.Client/Internal/StreamExtensions.cs

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -46,44 +46,6 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
4646
return new Status(StatusCode.Unimplemented, $"Unsupported grpc-encoding value '{unsupportedEncoding}'. Supported encodings: {string.Join(", ", supportedEncodings)}");
4747
}
4848

49-
private static async Task<(int length, bool compressed)?> ReadHeaderAsync(Stream responseStream, Memory<byte> header, CancellationToken cancellationToken)
50-
{
51-
int read;
52-
var received = 0;
53-
while ((read = await responseStream.ReadAsync(header.Slice(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0)
54-
{
55-
received += read;
56-
57-
if (received == GrpcProtocolConstants.HeaderSize)
58-
{
59-
break;
60-
}
61-
}
62-
63-
if (received < GrpcProtocolConstants.HeaderSize)
64-
{
65-
if (received == 0)
66-
{
67-
return null;
68-
}
69-
70-
throw new InvalidDataException("Unexpected end of content while reading the message header.");
71-
}
72-
73-
// Read the header first
74-
// - 1 byte flag for compression
75-
// - 4 bytes for the content length
76-
var compressed = ReadCompressedFlag(header.Span[0]);
77-
var length = BinaryPrimitives.ReadUInt32BigEndian(header.Span.Slice(1, 4));
78-
79-
if (length > int.MaxValue)
80-
{
81-
throw new InvalidDataException("Message too large.");
82-
}
83-
84-
return ((int)length, compressed);
85-
}
86-
8749
public static async ValueTask<TResponse?> ReadMessageAsync<TResponse>(
8850
this Stream responseStream,
8951
ILogger logger,
@@ -107,16 +69,34 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
10769
// If the message is larger then the array will be replaced when the message size is known.
10870
buffer = ArrayPool<byte>.Shared.Rent(minimumLength: 4096);
10971

110-
var headerDetails = await ReadHeaderAsync(responseStream, buffer, cancellationToken).ConfigureAwait(false);
72+
int read;
73+
var received = 0;
74+
while ((read = await responseStream.ReadAsync(buffer.AsMemory(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0)
75+
{
76+
received += read;
11177

112-
if (headerDetails == null)
78+
if (received == GrpcProtocolConstants.HeaderSize)
79+
{
80+
break;
81+
}
82+
}
83+
84+
if (received < GrpcProtocolConstants.HeaderSize)
11385
{
114-
GrpcCallLog.NoMessageReturned(logger);
115-
return default;
86+
if (received == 0)
87+
{
88+
GrpcCallLog.NoMessageReturned(logger);
89+
return default;
90+
}
91+
92+
throw new InvalidDataException("Unexpected end of content while reading the message header.");
11693
}
11794

118-
var length = headerDetails.Value.length;
119-
var compressed = headerDetails.Value.compressed;
95+
// Read the header first
96+
// - 1 byte flag for compression
97+
// - 4 bytes for the content length
98+
var compressed = ReadCompressedFlag(buffer[0]);
99+
var length = ReadMessageLength(buffer.AsSpan(1, 4));
120100

121101
if (length > 0)
122102
{
@@ -200,6 +180,18 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
200180
}
201181
}
202182

183+
private static int ReadMessageLength(Span<byte> header)
184+
{
185+
var length = BinaryPrimitives.ReadUInt32BigEndian(header);
186+
187+
if (length > int.MaxValue)
188+
{
189+
throw new InvalidDataException("Message too large.");
190+
}
191+
192+
return (int)length;
193+
}
194+
203195
private static async Task ReadMessageContent(Stream responseStream, Memory<byte> messageData, int length, CancellationToken cancellationToken)
204196
{
205197
// Read message content until content length is reached
@@ -257,7 +249,7 @@ private static bool ReadCompressedFlag(byte flag)
257249
}
258250
}
259251

260-
// TODO(JamesNK): Reuse serialization content between message writes. Improve client/duplex streaming allocations.
252+
// TODO(JamesNK): Reuse serialization context between message writes. Improve client/duplex streaming allocations.
261253
public static async ValueTask WriteMessageAsync<TMessage>(
262254
this Stream stream,
263255
ILogger logger,

0 commit comments

Comments
 (0)