Skip to content

Commit 7a93b85

Browse files
committed
GrpcServiceBridge should only use client accepted encodings.
Motivation: The GrpcServiceBridge implementation does not check the encodings accepted by the client and can use an encoding that the client would not support. Changes: - Add API to properly check client accepted encodings. - Modify GrpcServiceBridge implementation to check the client accepted encodings.
1 parent 46344e0 commit 7a93b85

File tree

9 files changed

+92
-31
lines changed

9 files changed

+92
-31
lines changed

vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ public interface GrpcWriteStream<T> extends WriteStream<T> {
1717
MultiMap headers();
1818

1919
/**
20-
* Set the stream encoding, e.g. {@code identity} or {@code gzip}.
20+
* <p>Set the stream encoding, e.g. {@code identity} or {@code gzip},</p>
2121
*
22-
* It must be called before sending any message, otherwise {@code identity} will be used.
22+
* <ul>
23+
* <li>The encoding must be set before sending any message, otherwise {@code identity} will be used.</li>
24+
* <li>The encoding should also match the opposite endpoint expectations.</li>
25+
* </ul>
2326
*
2427
* @param encoding the target message encoding
2528
* @return a reference to this, so the API can be used fluently

vertx-grpc-docs/src/main/asciidoc/server.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,10 @@ Anemic JSON is also supported with Vert.x `JsonObject`
316316

317317
=== Compression
318318

319-
You can compress response messages by setting the response encoding *prior* before sending any message
319+
You can compress response messages by setting the response encoding *prior* before sending any message.
320+
321+
Before setting the encoding, you should use {@link io.vertx.grpc.server.GrpcServerResponse#acceptedEncodings()} to ensure
322+
the client RPC supports the encoding algorithm.
320323

321324
[source,java]
322325
----

vertx-grpc-docs/src/main/java/examples/GrpcServerExamples.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ public void anemicJson(GrpcServer server) {
170170
}
171171

172172
public void responseCompression(GrpcServerResponse<Empty, Item> response) {
173-
response.encoding("gzip");
173+
if (response.acceptedEncodings().contains("gzip")) {
174+
response.encoding("gzip");
175+
}
174176

175177
// Write items after encoding has been defined
176178
response.write(Item.newBuilder().setValue("item-1").build());

vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerResponse.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.vertx.grpc.common.GrpcWriteStream;
2222
import io.vertx.grpc.common.WireFormat;
2323

24+
import java.util.Set;
25+
2426
@VertxGen
2527
public interface GrpcServerResponse<Req, Resp> extends GrpcWriteStream<Resp> {
2628

@@ -48,6 +50,13 @@ public interface GrpcServerResponse<Req, Resp> extends GrpcWriteStream<Resp> {
4850
@Fluent
4951
GrpcServerResponse<Req, Resp> format(WireFormat format);
5052

53+
/**
54+
* @return the set of accepted encodings sent by the client, note that {@code identity} should not be part of this set.
55+
* This can be used to set the response {@link #encoding(String) encoding} to ensure the client will accept
56+
* the encoding. This is a glorified wrapper for the {@code grpc-accept-encoding} header.
57+
*/
58+
Set<String> acceptedEncodings();
59+
5160
/**
5261
* @return the {@link MultiMap} to write metadata trailers
5362
*/

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,23 @@
2626
import io.vertx.grpc.server.GrpcServerResponse;
2727
import io.vertx.grpc.server.StatusException;
2828

29-
import java.util.Map;
30-
import java.util.Objects;
29+
import java.util.*;
30+
import java.util.regex.Pattern;
3131

3232
/**
3333
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
3434
*/
3535
public abstract class GrpcServerResponseImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcServerResponseImpl<Req, Resp>, Resp> implements GrpcServerResponse<Req, Resp> {
3636

37+
private static final Pattern COMMA_SEPARATOR = Pattern.compile(" *, *");
38+
private static final Set<String> GZIP_ACCEPT_ENCODING = Collections.singleton("gzip");
39+
3740
private final GrpcServerRequestImpl<Req, Resp> request;
3841
private final HttpServerResponse httpResponse;
3942
private GrpcStatus status = GrpcStatus.OK;
4043
private String statusMessage;
4144
private boolean trailersOnly;
45+
private Set<String> acceptedEncodings;
4246

4347
public GrpcServerResponseImpl(ContextInternal context,
4448
GrpcServerRequestImpl<Req, Resp> request,
@@ -98,6 +102,27 @@ public GrpcStatus status() {
98102
return status;
99103
}
100104

105+
@Override
106+
public Set<String> acceptedEncodings() {
107+
if (acceptedEncodings == null) {
108+
String acceptEncodingHeader = request.headers().get("grpc-accept-encoding");
109+
if (acceptEncodingHeader != null) {
110+
if (acceptEncodingHeader.equals("gzip")) {
111+
acceptedEncodings = GZIP_ACCEPT_ENCODING;
112+
} else {
113+
acceptedEncodings = new HashSet<>(2);
114+
String[] encodings = COMMA_SEPARATOR.split(acceptEncodingHeader);
115+
for (String encoding : encodings) {
116+
acceptedEncodings.add(encoding.trim());
117+
}
118+
}
119+
} else {
120+
acceptedEncodings = Collections.emptySet();
121+
}
122+
}
123+
return acceptedEncodings;
124+
}
125+
101126
protected boolean sendCancel() {
102127
if (!isTrailersSent()) {
103128
status(GrpcStatus.CANCELLED);

vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerRequestTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,23 @@
5050
public class ServerRequestTest extends ServerTest {
5151

5252
@Override
53-
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding) {
53+
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding, DecompressorRegistry decompressors) {
5454
startServer(GrpcServer.server(vertx).callHandler(UNARY, call -> {
5555
call.handler(helloRequest -> {
5656
Reply helloReply = Reply.newBuilder().setMessage("Hello " + helloRequest.getName()).build();
5757
if (!requestEncoding.equals("identity")) {
5858
should.assertEquals(requestEncoding, call.encoding());
5959
}
6060
GrpcServerResponse<Request, Reply> response = call.response();
61+
if (response.acceptedEncodings().contains(responseEncoding)) {
62+
response.encoding(responseEncoding);
63+
}
6164
response
62-
.encoding(responseEncoding)
6365
.end(helloReply);
6466
});
6567
}));
6668

67-
super.testUnary(should, requestEncoding, responseEncoding);
69+
super.testUnary(should, requestEncoding, responseEncoding, decompressors);
6870
}
6971

7072
@Test

vertx-grpc-server/src/test/java/io/vertx/tests/server/ServerTest.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,7 @@
1010
*/
1111
package io.vertx.tests.server;
1212

13-
import io.grpc.CallOptions;
14-
import io.grpc.Channel;
15-
import io.grpc.ClientCall;
16-
import io.grpc.ClientInterceptor;
17-
import io.grpc.ClientInterceptors;
18-
import io.grpc.ForwardingClientCall;
19-
import io.grpc.ForwardingClientCallListener;
20-
import io.grpc.ManagedChannelBuilder;
21-
import io.grpc.Metadata;
22-
import io.grpc.MethodDescriptor;
23-
import io.grpc.Status;
24-
import io.grpc.StatusRuntimeException;
13+
import io.grpc.*;
2514
import io.grpc.stub.ClientCallStreamObserver;
2615
import io.grpc.stub.StreamObserver;
2716
import io.vertx.core.MultiMap;
@@ -38,6 +27,8 @@
3827
import io.vertx.tests.common.grpc.*;
3928
import org.junit.Test;
4029

30+
import java.io.IOException;
31+
import java.io.InputStream;
4132
import java.util.*;
4233
import java.util.concurrent.TimeUnit;
4334
import java.util.concurrent.atomic.AtomicInteger;
@@ -54,21 +45,44 @@ public abstract class ServerTest extends ServerTestBase {
5445

5546
@Test
5647
public void testUnary(TestContext should) {
57-
testUnary(should, "identity", "identity");
48+
testUnary(should, "identity", "identity", DecompressorRegistry.getDefaultInstance());
5849
}
5950

6051
@Test
6152
public void testUnaryDecompression(TestContext should) {
62-
testUnary(should, "gzip", "identity");
53+
testUnary(should, "gzip", "identity", DecompressorRegistry.getDefaultInstance());
6354
}
6455

6556
@Test
6657
public void testUnaryCompression(TestContext should) {
67-
testUnary(should, "identity", "gzip");
58+
testUnary(should, "identity", "gzip", DecompressorRegistry.getDefaultInstance());
6859
}
6960

70-
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding) {
61+
@Test
62+
public void testUnaryCompressionWithUnsupportedEncoding(TestContext should) {
63+
testUnary(should, "identity", "gzip", DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false));
64+
}
65+
66+
@Test
67+
public void testUnaryCompressionWithMultipleValues(TestContext should) {
68+
DecompressorRegistry registry = DecompressorRegistry.getDefaultInstance().with(new Decompressor() {
69+
@Override
70+
public String getMessageEncoding() {
71+
return "custom";
72+
}
73+
@Override
74+
public InputStream decompress(InputStream is) throws IOException {
75+
return is;
76+
}
77+
}, true);
78+
testUnary(should, "identity", "gzip", registry);
79+
}
80+
81+
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding, DecompressorRegistry decompressors) {
82+
83+
7184
channel = ManagedChannelBuilder.forAddress("localhost", port)
85+
.decompressorRegistry(decompressors)
7286
.usePlaintext()
7387
.build();
7488

@@ -94,7 +108,7 @@ public void onHeaders(Metadata headers) {
94108
Request request = Request.newBuilder().setName("Julien").build();
95109
Reply res = stub.unary(request);
96110
should.assertEquals("Hello Julien", res.getMessage());
97-
if (!responseEncoding.equals("identity")) {
111+
if (!responseEncoding.equals("identity") && decompressors.lookupDecompressor(responseEncoding) != null) {
98112
should.assertEquals(responseEncoding, responseGrpcEncoding.get());
99113
}
100114
}

vertx-grpcio-server/src/main/java/io/vertx/grpcio/server/impl/GrpcIoServiceBridgeImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,10 @@ public MethodDescriptor<Req, Resp> getMethodDescriptor() {
268268
@Override
269269
public void setCompression(String encoding) {
270270
compressor = CompressorRegistry.getDefaultInstance().lookupCompressor(encoding);
271-
req.response().encoding(encoding);
271+
GrpcServerResponse<Req, Resp> response = req.response();
272+
if (response.acceptedEncodings().contains(encoding)) {
273+
response.encoding(encoding);
274+
}
272275
}
273276

274277
@Override

vertx-grpcio-server/src/test/java/io/vertx/tests/server/ServerBridgeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
public class ServerBridgeTest extends ServerTest {
3636

3737
@Override
38-
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding) {
38+
protected void testUnary(TestContext should, String requestEncoding, String responseEncoding, DecompressorRegistry decompressors) {
3939
TestServiceGrpc.TestServiceImplBase impl = new TestServiceGrpc.TestServiceImplBase() {
4040
@Override
4141
public void unary(Request request, StreamObserver<Reply> responseObserver) {
4242
if (!responseEncoding.equals("identity")) {
43-
((ServerCallStreamObserver<?>)responseObserver).setCompression("gzip");
43+
((ServerCallStreamObserver<?>)responseObserver).setCompression(responseEncoding);
4444
}
4545
if (!requestEncoding.equals("identity")) {
4646
// No way to check the request encoding with the API
@@ -55,7 +55,7 @@ public void unary(Request request, StreamObserver<Reply> responseObserver) {
5555
serverStub.bind(server);
5656
startServer(server);
5757

58-
super.testUnary(should, requestEncoding, responseEncoding);
58+
super.testUnary(should, requestEncoding, responseEncoding, decompressors);
5959
}
6060

6161
@Test
@@ -123,7 +123,7 @@ public void onComplete() {
123123
serverStub.bind(server);
124124
startServer(server);
125125

126-
super.testUnary(should, "identity", "identity");
126+
super.testUnary(should, "identity", "identity", DecompressorRegistry.getDefaultInstance());
127127
}
128128

129129
@Test

0 commit comments

Comments
 (0)