Skip to content

Commit 542066a

Browse files
authored
Implement SerializationContext buffer writer (#629)
Implement SerializationContext buffer writer
2 parents 65a1e16 + ecc8f79 commit 542066a

29 files changed

+908
-290
lines changed

build/dependencies.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<CommandLineParserPackageVersion>2.3.0</CommandLineParserPackageVersion>
55
<GoogleProtobufPackageVersion>3.10.0</GoogleProtobufPackageVersion>
66
<GrpcDotNetPackageVersion>2.24.0</GrpcDotNetPackageVersion>
7-
<GrpcPackageVersion>2.24.0</GrpcPackageVersion>
7+
<GrpcPackageVersion>2.25.0-pre1</GrpcPackageVersion>
88
<MicrosoftAspNetCorePackageVersion>3.0.0</MicrosoftAspNetCorePackageVersion>
99
<MicrosoftBuildLocatorPackageVersion>1.2.2</MicrosoftBuildLocatorPackageVersion>
1010
<MicrosoftBuildPackageVersion>16.0.461</MicrosoftBuildPackageVersion>

perf/Grpc.AspNetCore.Microbenchmarks/Internal/MessageHelpers.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,19 @@
1919
using System.IO;
2020
using System.IO.Pipelines;
2121
using Google.Protobuf;
22-
using Grpc.AspNetCore.Server;
2322
using Grpc.AspNetCore.Server.Internal;
24-
using Microsoft.AspNetCore.Http;
25-
using Microsoft.Extensions.Logging.Abstractions;
23+
using Grpc.Tests.Shared;
2624

2725
namespace Grpc.AspNetCore.Microbenchmarks.Internal
2826
{
2927
internal static class MessageHelpers
3028
{
31-
private static readonly HttpContextServerCallContext TestServerCallContext = new HttpContextServerCallContext(
32-
new DefaultHttpContext(),
33-
new GrpcServiceOptions(),
34-
NullLogger.Instance);
35-
36-
static MessageHelpers()
37-
{
38-
TestServerCallContext.Initialize();
39-
}
40-
4129
public static void WriteMessage<T>(Stream stream, T message, HttpContextServerCallContext? callContext = null)
4230
where T : class, IMessage
4331
{
4432
var pipeWriter = PipeWriter.Create(stream);
4533

46-
PipeExtensions.WriteMessageAsync(pipeWriter, message, callContext ?? TestServerCallContext, (r, c) => c.Complete(r.ToByteArray()), canFlush: true).GetAwaiter().GetResult();
34+
PipeExtensions.WriteMessageAsync(pipeWriter, message, callContext ?? HttpContextServerCallContextHelper.CreateServerCallContext(), (r, c) => c.Complete(r.ToByteArray()), canFlush: true).GetAwaiter().GetResult();
4735
}
4836
}
4937
}

perf/Grpc.AspNetCore.Microbenchmarks/Server/CompressedUnaryServerCallHandlerBenchmark.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public class CompressedUnaryServerCallHandlerBenchmark : UnaryServerCallHandlerB
3434
{
3535
public CompressedUnaryServerCallHandlerBenchmark()
3636
{
37-
ServiceOptions.ResponseCompressionAlgorithm = TestCompressionProvider.Name;
38-
ServiceOptions.ResolvedCompressionProviders = new Dictionary<string, ICompressionProvider>
37+
ResponseCompressionAlgorithm = TestCompressionProvider.Name;
38+
CompressionProviders = new Dictionary<string, ICompressionProvider>
3939
{
4040
[TestCompressionProvider.Name] = new TestCompressionProvider()
4141
};
@@ -52,7 +52,10 @@ protected override byte[] GetMessageData(ChatMessage message)
5252
var httpContext = new DefaultHttpContext();
5353
httpContext.Request.Headers.Add(GrpcProtocolConstants.MessageAcceptEncodingHeader, TestCompressionProvider.Name);
5454

55-
var callContext = HttpContextServerCallContextHelper.CreateServerCallContext(httpContext, serviceOptions: ServiceOptions);
55+
var callContext = HttpContextServerCallContextHelper.CreateServerCallContext(
56+
httpContext,
57+
responseCompressionAlgorithm: ResponseCompressionAlgorithm,
58+
compressionProviders: CompressionProviders);
5659

5760
var ms = new MemoryStream();
5861
MessageHelpers.WriteMessage(ms, message, callContext);

perf/Grpc.AspNetCore.Microbenchmarks/Server/InterceptedUnaryServerCallHandlerBenchmark.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
using System.Threading.Tasks;
2020
using BenchmarkDotNet.Attributes;
2121
using Grpc.AspNetCore.Microbenchmarks.Internal;
22+
using Grpc.AspNetCore.Server;
2223

2324
namespace Grpc.AspNetCore.Microbenchmarks.Server
2425
{
2526
public class InterceptedUnaryServerCallHandlerBenchmark : UnaryServerCallHandlerBenchmarkBase
2627
{
2728
public InterceptedUnaryServerCallHandlerBenchmark()
2829
{
29-
ServiceOptions.Interceptors.Add<UnaryAwaitInterceptor>();
30+
Interceptors = new InterceptorCollection();
31+
Interceptors.Add<UnaryAwaitInterceptor>();
3032
}
3133

3234
[Benchmark]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using System.Threading.Tasks;
20+
using BenchmarkDotNet.Attributes;
21+
using Chat;
22+
using Grpc.Core;
23+
24+
namespace Grpc.AspNetCore.Microbenchmarks.Server
25+
{
26+
public class PipelinesUnaryServerCallHandlerBenchmark : UnaryServerCallHandlerBenchmarkBase
27+
{
28+
protected override Marshaller<ChatMessage> CreateMarshaller()
29+
{
30+
var marshaller = new Marshaller<ChatMessage>(
31+
(ChatMessage data, SerializationContext c) =>
32+
{
33+
var size = data.CalculateSize();
34+
c.SetPayloadLength(size);
35+
var writer = c.GetBufferWriter();
36+
writer.GetSpan(size);
37+
writer.Advance(size);
38+
c.Complete();
39+
},
40+
(DeserializationContext c) =>
41+
{
42+
c.PayloadAsReadOnlySequence();
43+
return new ChatMessage();
44+
});
45+
46+
return marshaller;
47+
}
48+
49+
[Benchmark]
50+
public Task PipelinesHandleCallAsync()
51+
{
52+
return InvokeUnaryRequestAsync();
53+
}
54+
}
55+
}

perf/Grpc.AspNetCore.Microbenchmarks/Server/UnaryServerCallHandlerBenchmarkBase.cs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
#endregion
1818

19+
using System;
1920
using System.Buffers;
21+
using System.Collections.Generic;
2022
using System.IO;
2123
using System.IO.Pipelines;
2224
using System.Threading.Tasks;
@@ -28,12 +30,14 @@
2830
using Grpc.AspNetCore.Server.Internal;
2931
using Grpc.AspNetCore.Server.Internal.CallHandlers;
3032
using Grpc.Core;
33+
using Grpc.Net.Compression;
3134
using Grpc.Tests.Shared;
3235
using Microsoft.AspNetCore.Http;
3336
using Microsoft.AspNetCore.Http.Features;
3437
using Microsoft.Extensions.DependencyInjection;
3538
using Microsoft.Extensions.DependencyInjection.Extensions;
3639
using Microsoft.Extensions.Logging.Abstractions;
40+
using Microsoft.Extensions.Primitives;
3741

3842
namespace Grpc.AspNetCore.Microbenchmarks.Server
3943
{
@@ -43,10 +47,13 @@ public class UnaryServerCallHandlerBenchmarkBase
4347
private ServiceProvider? _requestServices;
4448
private DefaultHttpContext? _httpContext;
4549
private HeaderDictionary? _trailers;
50+
private IHeaderDictionary? _headers;
4651
private byte[]? _requestMessage;
4752
private TestPipeReader? _requestPipe;
4853

49-
internal GrpcServiceOptions ServiceOptions { get; } = new GrpcServiceOptions();
54+
protected InterceptorCollection? Interceptors { get; set; }
55+
protected Dictionary<string, ICompressionProvider>? CompressionProviders { get; set; }
56+
protected string? ResponseCompressionAlgorithm { get; set; }
5057

5158
[GlobalSetup]
5259
public void GlobalSetup()
@@ -65,13 +72,17 @@ public void GlobalSetup()
6572
services.TryAddSingleton<IGrpcInterceptorActivator<UnaryAwaitInterceptor>>(new TestGrpcInterceptorActivator<UnaryAwaitInterceptor>(new UnaryAwaitInterceptor()));
6673
var serviceProvider = services.BuildServiceProvider();
6774

68-
var marshaller = Marshallers.Create((arg) => MessageExtensions.ToByteArray(arg), bytes => new ChatMessage());
75+
var marshaller = CreateMarshaller();
76+
6977
var method = new Method<ChatMessage, ChatMessage>(MethodType.Unary, typeof(TestService).FullName, nameof(TestService.SayHello), marshaller, marshaller);
70-
var result = Task.FromResult(new ChatMessage());
78+
var result = Task.FromResult(message);
7179
_callHandler = new UnaryServerCallHandler<TestService, ChatMessage, ChatMessage>(
7280
method,
7381
(service, request, context) => result,
74-
ServiceOptions,
82+
HttpContextServerCallContextHelper.CreateMethodContext(
83+
compressionProviders: CompressionProviders,
84+
responseCompressionAlgorithm: ResponseCompressionAlgorithm,
85+
interceptors: Interceptors),
7586
NullLoggerFactory.Instance,
7687
new TestGrpcServiceActivator<TestService>(new TestService()),
7788
serviceProvider);
@@ -95,27 +106,47 @@ public void GlobalSetup()
95106
{
96107
Trailers = _trailers
97108
});
109+
_headers = _httpContext.Response.Headers;
98110
SetupHttpContext(_httpContext);
99111
}
100112

113+
protected virtual Marshaller<ChatMessage> CreateMarshaller()
114+
{
115+
var marshaller = Marshallers.Create((arg) => MessageExtensions.ToByteArray(arg), bytes => new ChatMessage());
116+
return marshaller;
117+
}
118+
101119
protected virtual void SetupHttpContext(HttpContext httpContext)
102120
{
103121
}
104122

105123
protected virtual byte[] GetMessageData(ChatMessage message)
106124
{
107-
var ms = new MemoryStream();
125+
var ms = new MemoryStream();
108126
MessageHelpers.WriteMessage(ms, message);
109127
return ms.ToArray();
110128
}
111129

112-
protected Task InvokeUnaryRequestAsync()
130+
protected async Task InvokeUnaryRequestAsync()
113131
{
114-
_httpContext!.Response.Headers.Clear();
132+
_headers!.Clear();
115133
_trailers!.Clear();
116134
_requestPipe!.ReadResults.Add(new ValueTask<ReadResult>(new ReadResult(new ReadOnlySequence<byte>(_requestMessage!), false, true)));
117135

118-
return _callHandler!.HandleCallAsync(_httpContext);
136+
await _callHandler!.HandleCallAsync(_httpContext!);
137+
138+
StringValues value;
139+
if (_trailers.TryGetValue("grpc-status", out value) || _headers.TryGetValue("grpc-status", out value))
140+
{
141+
if (!value.Equals("0"))
142+
{
143+
throw new InvalidOperationException("Unexpected grpc-status: " + Enum.Parse<StatusCode>(value));
144+
}
145+
}
146+
else
147+
{
148+
throw new InvalidOperationException("No grpc-status returned.");
149+
}
119150
}
120151
}
121152
}

src/Grpc.AspNetCore.Server/Grpc.AspNetCore.Server.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
<ItemGroup>
1717
<Compile Include="..\Shared\DefaultDeserializationContext.cs" Link="Internal\DefaultDeserializationContext.cs" />
18-
<Compile Include="..\Shared\DefaultSerializationContext.cs" Link="Internal\DefaultSerializationContext.cs" />
1918
</ItemGroup>
2019

2120
<ItemGroup>

src/Grpc.AspNetCore.Server/GrpcServiceOptions.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ public class GrpcServiceOptions
2929
{
3030
internal IList<ICompressionProvider>? _compressionProviders;
3131

32-
// Fast check for interceptors is used per-request
33-
internal bool HasInterceptors { get; set; }
34-
// Compression providers by encoding name
35-
internal Dictionary<string, ICompressionProvider>? ResolvedCompressionProviders;
36-
3732
/// <summary>
3833
/// Gets or sets the maximum message size in bytes that can be sent from the server.
3934
/// </summary>

src/Grpc.AspNetCore.Server/Internal/CallHandlers/ClientStreamingServerCallHandler.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ internal class ClientStreamingServerCallHandler<TService, TRequest, TResponse> :
3737
public ClientStreamingServerCallHandler(
3838
Method<TRequest, TResponse> method,
3939
ClientStreamingServerMethod<TService, TRequest, TResponse> invoker,
40-
GrpcServiceOptions serviceOptions,
40+
MethodContext methodContext,
4141
ILoggerFactory loggerFactory,
4242
IGrpcServiceActivator<TService> serviceActivator,
4343
IServiceProvider serviceProvider)
44-
: base(method, serviceOptions, loggerFactory, serviceActivator, serviceProvider)
44+
: base(method, methodContext, loggerFactory, serviceActivator, serviceProvider)
4545
{
4646
_invoker = invoker;
4747

48-
if (ServiceOptions.HasInterceptors)
48+
if (MethodContext.HasInterceptors)
4949
{
50-
var interceptorPipeline = new InterceptorPipelineBuilder<TRequest, TResponse>(ServiceOptions.Interceptors, ServiceProvider);
50+
var interceptorPipeline = new InterceptorPipelineBuilder<TRequest, TResponse>(MethodContext.Interceptors, ServiceProvider);
5151
_pipelineInvoker = interceptorPipeline.ClientStreamingPipeline(ResolvedInterceptorInvoker);
5252
}
5353
}

src/Grpc.AspNetCore.Server/Internal/CallHandlers/DuplexStreamingServerCallHandler.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ internal class DuplexStreamingServerCallHandler<TService, TRequest, TResponse> :
3737
public DuplexStreamingServerCallHandler(
3838
Method<TRequest, TResponse> method,
3939
DuplexStreamingServerMethod<TService, TRequest, TResponse> invoker,
40-
GrpcServiceOptions serviceOptions,
40+
MethodContext methodContext,
4141
ILoggerFactory loggerFactory,
4242
IGrpcServiceActivator<TService> serviceActivator,
4343
IServiceProvider serviceProvider)
44-
: base(method, serviceOptions, loggerFactory, serviceActivator, serviceProvider)
44+
: base(method, methodContext, loggerFactory, serviceActivator, serviceProvider)
4545
{
4646
_invoker = invoker;
4747

48-
if (ServiceOptions.HasInterceptors)
48+
if (MethodContext.HasInterceptors)
4949
{
50-
var interceptorPipeline = new InterceptorPipelineBuilder<TRequest, TResponse>(ServiceOptions.Interceptors, ServiceProvider);
50+
var interceptorPipeline = new InterceptorPipelineBuilder<TRequest, TResponse>(MethodContext.Interceptors, ServiceProvider);
5151
_pipelineInvoker = interceptorPipeline.DuplexStreamingPipeline(ResolvedInterceptorInvoker);
5252
}
5353
}

0 commit comments

Comments
 (0)