Skip to content

Commit 355ac24

Browse files
authored
Add support for per-call server compression (#481)
1 parent b7b1a18 commit 355ac24

File tree

7 files changed

+227
-26
lines changed

7 files changed

+227
-26
lines changed

src/Grpc.AspNetCore.Server/Internal/GrpcProtocolConstants.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ internal static class GrpcProtocolConstants
3131
internal const string MessageEncodingHeader = "grpc-encoding";
3232
internal const string MessageAcceptEncodingHeader = "grpc-accept-encoding";
3333

34+
internal const string CompressionRequestAlgorithmHeader = "grpc-internal-encoding-request";
35+
3436
internal const string StatusTrailer = "grpc-status";
3537
internal const string MessageTrailer = "grpc-message";
3638

src/Grpc.AspNetCore.Server/Internal/HttpContextServerCallContext.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,18 +333,30 @@ protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
333333
{
334334
foreach (var entry in responseHeaders)
335335
{
336-
if (entry.IsBinary)
336+
if (entry.Key == GrpcProtocolConstants.CompressionRequestAlgorithmHeader)
337337
{
338-
HttpContext.Response.Headers[entry.Key] = Convert.ToBase64String(entry.ValueBytes);
338+
// grpc-internal-encoding-request is used in the server to set message compression
339+
// on a per-call bassis.
340+
// 'grpc-encoding' is sent even if WriteOptions.Flags = NoCompress. In that situation
341+
// individual messages will not be written with compression.
342+
ResponseGrpcEncoding = entry.Value;
343+
HttpContext.Response.Headers[GrpcProtocolConstants.MessageEncodingHeader] = ResponseGrpcEncoding;
339344
}
340345
else
341346
{
342-
HttpContext.Response.Headers[entry.Key] = entry.Value;
347+
if (entry.IsBinary)
348+
{
349+
HttpContext.Response.Headers[entry.Key] = Convert.ToBase64String(entry.ValueBytes);
350+
}
351+
else
352+
{
353+
HttpContext.Response.Headers[entry.Key] = entry.Value;
354+
}
343355
}
344356
}
345357
}
346358

347-
return HttpContext.Response.Body.FlushAsync();
359+
return HttpContext.Response.BodyWriter.FlushAsync().GetAsTask();
348360
}
349361

350362
// Clock is for testing

src/Grpc.AspNetCore.Server/Internal/PipeExtensions.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,28 @@ public static async Task WriteMessageAsync<TResponse>(this PipeWriter pipeWriter
7777
await httpResponse.StartAsync();
7878
}
7979

80-
var isCompressed =
80+
var canCompress =
8181
GrpcProtocolHelpers.CanWriteCompressed(serverCallContext.WriteOptions) &&
8282
!string.Equals(serverCallContext.ResponseGrpcEncoding, GrpcProtocolConstants.IdentityGrpcEncoding, StringComparison.Ordinal);
83-
84-
if (isCompressed)
83+
84+
var isCompressed = false;
85+
if (canCompress)
8586
{
8687
Debug.Assert(
8788
serverCallContext.ServiceOptions.ResolvedCompressionProviders != null,
8889
"Compression providers should have been resolved for service.");
8990

90-
responsePayload = CompressMessage(
91+
if (TryCompressMessage(
9192
serverCallContext.Logger,
9293
serverCallContext.ResponseGrpcEncoding!,
9394
serverCallContext.ServiceOptions.ResponseCompressionLevel,
9495
serverCallContext.ServiceOptions.ResolvedCompressionProviders,
95-
responsePayload);
96+
responsePayload,
97+
out var result))
98+
{
99+
responsePayload = result;
100+
isCompressed = true;
101+
}
96102
}
97103

98104
if (responsePayload.Length > serverCallContext.ServiceOptions.MaxSendMessageSize)
@@ -474,7 +480,7 @@ private static bool TryDecompressMessage(ILogger logger, string compressionEncod
474480
return false;
475481
}
476482

477-
private static byte[] CompressMessage(ILogger logger, string compressionEncoding, CompressionLevel? compressionLevel, Dictionary<string, ICompressionProvider> compressionProviders, byte[] messageData)
483+
private static bool TryCompressMessage(ILogger logger, string compressionEncoding, CompressionLevel? compressionLevel, Dictionary<string, ICompressionProvider> compressionProviders, byte[] messageData, [NotNullWhen(true)]out byte[]? result)
478484
{
479485
if (compressionProviders.TryGetValue(compressionEncoding, out var compressionProvider))
480486
{
@@ -486,11 +492,12 @@ private static byte[] CompressMessage(ILogger logger, string compressionEncoding
486492
compressionStream.Write(messageData, 0, messageData.Length);
487493
}
488494

489-
return output.ToArray();
495+
result = output.ToArray();
496+
return true;
490497
}
491498

492-
// Should never reach here
493-
throw new InvalidOperationException($"Could not find compression provider for '{compressionEncoding}'.");
499+
result = null;
500+
return false;
494501
}
495502
}
496503
}

test/FunctionalTests/Server/CompressionTests.cs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
using System.Collections.Generic;
2020
using System.IO;
21+
using System.IO.Pipelines;
2122
using System.Linq;
2223
using System.Net;
2324
using System.Threading.Tasks;
@@ -35,6 +36,159 @@ namespace Grpc.AspNetCore.FunctionalTests.Server
3536
[TestFixture]
3637
public class CompressionTests : FunctionalTestBase
3738
{
39+
[Test]
40+
public async Task SendCompressedMessage_UnaryEnabledInCallWithInvalidSetting_UncompressedMessageReturned()
41+
{
42+
async Task<HelloReply> UnaryEnableCompression(HelloRequest request, ServerCallContext context)
43+
{
44+
var headers = new Metadata { new Metadata.Entry("grpc-internal-encoding-request", "PURPLE_MONKEY_DISHWASHER") };
45+
await context.WriteResponseHeadersAsync(headers);
46+
47+
return new HelloReply { Message = "Hello " + request.Name };
48+
}
49+
50+
// Arrange
51+
var method = Fixture.DynamicGrpc.AddUnaryMethod<HelloRequest, HelloReply>(UnaryEnableCompression);
52+
53+
var ms = new MemoryStream();
54+
MessageHelpers.WriteMessage(ms, new HelloRequest
55+
{
56+
Name = "World"
57+
});
58+
59+
var httpRequest = GrpcHttpHelper.Create(method.FullName);
60+
httpRequest.Content = new PushStreamContent(
61+
async s =>
62+
{
63+
await s.WriteAsync(ms.ToArray()).AsTask().DefaultTimeout();
64+
await s.FlushAsync().DefaultTimeout();
65+
});
66+
67+
// Act
68+
var responseTask = Fixture.Client.SendAsync(httpRequest);
69+
70+
// Assert
71+
var response = await responseTask.DefaultTimeout();
72+
73+
response.AssertIsSuccessfulGrpcRequest();
74+
75+
// Because the client didn't send this encoding in accept, the server has sent the message uncompressed.
76+
Assert.AreEqual("PURPLE_MONKEY_DISHWASHER", response.Headers.GetValues("grpc-encoding").Single());
77+
78+
var returnedMessageData = await response.Content.ReadAsByteArrayAsync().DefaultTimeout();
79+
Assert.AreEqual(0, returnedMessageData[0]);
80+
81+
var responseMessage = MessageHelpers.AssertReadMessage<HelloReply>(returnedMessageData);
82+
Assert.AreEqual("Hello World", responseMessage.Message);
83+
response.AssertTrailerStatus();
84+
}
85+
86+
[Test]
87+
public async Task SendCompressedMessage_UnaryEnabledInCall_CompressedMessageReturned()
88+
{
89+
async Task<HelloReply> UnaryEnableCompression(HelloRequest request, ServerCallContext context)
90+
{
91+
var headers = new Metadata { new Metadata.Entry("grpc-internal-encoding-request", "gzip") };
92+
await context.WriteResponseHeadersAsync(headers);
93+
94+
return new HelloReply { Message = "Hello " + request.Name };
95+
}
96+
97+
// Arrange
98+
var method = Fixture.DynamicGrpc.AddUnaryMethod<HelloRequest, HelloReply>(UnaryEnableCompression);
99+
100+
var ms = new MemoryStream();
101+
MessageHelpers.WriteMessage(ms, new HelloRequest
102+
{
103+
Name = "World"
104+
});
105+
106+
var httpRequest = GrpcHttpHelper.Create(method.FullName);
107+
httpRequest.Content = new PushStreamContent(
108+
async s =>
109+
{
110+
await s.WriteAsync(ms.ToArray()).AsTask().DefaultTimeout();
111+
await s.FlushAsync().DefaultTimeout();
112+
});
113+
114+
// Act
115+
var responseTask = Fixture.Client.SendAsync(httpRequest);
116+
117+
// Assert
118+
var response = await responseTask.DefaultTimeout();
119+
120+
response.AssertIsSuccessfulGrpcRequest();
121+
122+
Assert.AreEqual("gzip", response.Headers.GetValues("grpc-encoding").Single());
123+
124+
var returnedMessageData = await response.Content.ReadAsByteArrayAsync().DefaultTimeout();
125+
Assert.AreEqual(1, returnedMessageData[0]);
126+
127+
var responseMessage = MessageHelpers.AssertReadMessage<HelloReply>(returnedMessageData, "gzip");
128+
Assert.AreEqual("Hello World", responseMessage.Message);
129+
response.AssertTrailerStatus();
130+
}
131+
132+
[Test]
133+
public async Task SendCompressedMessage_ServerStreamingEnabledInCall_CompressedMessageReturned()
134+
{
135+
async Task ServerStreamingEnableCompression(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
136+
{
137+
var headers = new Metadata { new Metadata.Entry("grpc-internal-encoding-request", "gzip") };
138+
await context.WriteResponseHeadersAsync(headers);
139+
140+
await responseStream.WriteAsync(new HelloReply { Message = "Hello 1" });
141+
142+
responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
143+
await responseStream.WriteAsync(new HelloReply { Message = "Hello 2" });
144+
}
145+
146+
// Arrange
147+
var method = Fixture.DynamicGrpc.AddServerStreamingMethod<HelloRequest, HelloReply>(ServerStreamingEnableCompression);
148+
149+
var ms = new MemoryStream();
150+
MessageHelpers.WriteMessage(ms, new HelloRequest
151+
{
152+
Name = "World"
153+
});
154+
155+
var httpRequest = GrpcHttpHelper.Create(method.FullName);
156+
httpRequest.Content = new PushStreamContent(
157+
async s =>
158+
{
159+
await s.WriteAsync(ms.ToArray()).AsTask().DefaultTimeout();
160+
await s.FlushAsync().DefaultTimeout();
161+
});
162+
163+
// Act
164+
var responseTask = Fixture.Client.SendAsync(httpRequest);
165+
166+
// Assert
167+
var response = await responseTask.DefaultTimeout();
168+
169+
response.AssertIsSuccessfulGrpcRequest();
170+
171+
Assert.AreEqual("gzip", response.Headers.GetValues("grpc-encoding").Single());
172+
173+
var responseStream = await response.Content.ReadAsStreamAsync().DefaultTimeout();
174+
var pipeReader = PipeReader.Create(responseStream);
175+
176+
ReadResult readResult;
177+
178+
readResult = await pipeReader.ReadAsync();
179+
Assert.AreEqual(1, readResult.Buffer.FirstSpan[0]); // Message is compressed
180+
var greeting1 = await MessageHelpers.AssertReadStreamMessageAsync<HelloReply>(pipeReader, "gzip").DefaultTimeout();
181+
Assert.AreEqual($"Hello 1", greeting1.Message);
182+
183+
readResult = await pipeReader.ReadAsync();
184+
Assert.AreEqual(0, readResult.Buffer.FirstSpan[0]); // Message is uncompressed
185+
var greeting2 = await MessageHelpers.AssertReadStreamMessageAsync<HelloReply>(pipeReader, "gzip").DefaultTimeout();
186+
Assert.AreEqual($"Hello 2", greeting2.Message);
187+
188+
var finishedTask = MessageHelpers.AssertReadStreamMessageAsync<HelloReply>(pipeReader);
189+
Assert.IsNull(await finishedTask.DefaultTimeout());
190+
}
191+
38192
[Test]
39193
public async Task SendCompressedMessage_ServiceHasNoCompressionConfigured_ResponseIdentityEncoding()
40194
{

test/Shared/MessageHelpers.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,18 @@ internal static class MessageHelpers
8989
new GzipCompressionProvider(CompressionLevel.Fastest)
9090
};
9191

92-
var serverCallContext = HttpContextServerCallContextHelper.CreateServerCallContext(serviceOptions: new GrpcServiceOptions
93-
{
94-
ResponseCompressionAlgorithm = compressionEncoding,
95-
CompressionProviders = compressionProviders
96-
});
92+
var resolvedProviders = ResolveProviders(compressionProviders);
93+
94+
var httpContext = new DefaultHttpContext();
95+
httpContext.Request.Headers[GrpcProtocolConstants.MessageEncodingHeader] = compressionEncoding;
96+
97+
var serverCallContext = HttpContextServerCallContextHelper.CreateServerCallContext(
98+
httpContext: httpContext,
99+
serviceOptions: new GrpcServiceOptions
100+
{
101+
ResponseCompressionAlgorithm = compressionEncoding,
102+
ResolvedCompressionProviders = resolvedProviders
103+
});
97104

98105
var message = await pipeReader.ReadStreamMessageAsync<T>(serverCallContext, Deserialize<T>).AsTask().DefaultTimeout();
99106

testassets/InteropTestsClient/InteropClient.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
namespace InteropTestsClient
4141
{
42-
public class InteropClient
42+
public class InteropClient : IDisposable
4343
{
4444
internal const string CompressionRequestAlgorithmMetadataKey = "grpc-internal-encoding-request";
4545

@@ -86,6 +86,7 @@ private class ClientOptions
8686
public string? ServiceAccountKeyFile { get; set; }
8787
}
8888

89+
private ServiceProvider serviceProvider;
8990
private ILoggerFactory loggerFactory;
9091
private ClientOptions options;
9192

@@ -100,11 +101,16 @@ private InteropClient(ClientOptions options)
100101
configure.AddConsole(loggerOptions => loggerOptions.IncludeScopes = true);
101102
});
102103

103-
var serviceProvider = services.BuildServiceProvider();
104+
serviceProvider = services.BuildServiceProvider();
104105

105106
loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
106107
}
107108

109+
public void Dispose()
110+
{
111+
serviceProvider.Dispose();
112+
}
113+
108114
public static void Run(string[] args)
109115
{
110116
GrpcEnvironment.SetLogger(new ConsoleLogger());
@@ -118,8 +124,10 @@ public static void Run(string[] args)
118124
Console.WriteLine("Server host: " + options.ServerHost);
119125
Console.WriteLine("Server port: " + options.ServerPort);
120126

121-
var interopClient = new InteropClient(options);
122-
interopClient.Run().Wait();
127+
using (var interopClient = new InteropClient(options))
128+
{
129+
interopClient.Run().Wait();
130+
}
123131
});
124132
}
125133

testassets/InteropTestsWebsite/TestServiceImpl.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public override Task<Empty> EmptyCall(Empty request, ServerCallContext context)
3535

3636
public override async Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
3737
{
38-
await EnsureEchoMetadataAsync(context);
38+
await EnsureEchoMetadataAsync(context, request.ResponseCompressed?.Value ?? false);
3939
EnsureEchoStatus(request.ResponseStatus, context);
4040
EnsureCompression(request.ExpectCompressed, context);
4141

@@ -45,11 +45,15 @@ public override async Task<SimpleResponse> UnaryCall(SimpleRequest request, Serv
4545

4646
public override async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context)
4747
{
48-
await EnsureEchoMetadataAsync(context);
48+
await EnsureEchoMetadataAsync(context, request.ResponseParameters.Any(rp => rp.Compressed?.Value ?? false));
4949
EnsureEchoStatus(request.ResponseStatus, context);
5050

5151
foreach (var responseParam in request.ResponseParameters)
5252
{
53+
responseStream.WriteOptions = !(responseParam.Compressed?.Value ?? false)
54+
? new WriteOptions(WriteFlags.NoCompress)
55+
: null;
56+
5357
var response = new StreamingOutputCallResponse { Payload = CreateZerosPayload(responseParam.Size) };
5458
await responseStream.WriteAsync(response);
5559
}
@@ -95,9 +99,16 @@ private static Payload CreateZerosPayload(int size)
9599
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
96100
}
97101

98-
private static async Task EnsureEchoMetadataAsync(ServerCallContext context)
102+
private static async Task EnsureEchoMetadataAsync(ServerCallContext context, bool enableCompression = false)
99103
{
100104
var echoInitialList = context.RequestHeaders.Where((entry) => entry.Key == "x-grpc-test-echo-initial").ToList();
105+
106+
// Append grpc internal compression header if compression is requested by the client
107+
if (enableCompression)
108+
{
109+
echoInitialList.Add(new Metadata.Entry("grpc-internal-encoding-request", "gzip"));
110+
}
111+
101112
if (echoInitialList.Any()) {
102113
var entry = echoInitialList.Single();
103114
await context.WriteResponseHeadersAsync(new Metadata { entry });
@@ -118,7 +129,7 @@ private static void EnsureEchoStatus(EchoStatus responseStatus, ServerCallContex
118129
}
119130
}
120131

121-
private static void EnsureCompression(BoolValue expectCompressed, ServerCallContext context)
132+
private static void EnsureCompression(BoolValue? expectCompressed, ServerCallContext context)
122133
{
123134
if (expectCompressed != null)
124135
{

0 commit comments

Comments
 (0)