Skip to content

Commit 9cc32d7

Browse files
feat: support maxSize in PbjGrpcClient (#753)
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent d395714 commit 9cc32d7

File tree

7 files changed

+87
-14
lines changed

7 files changed

+87
-14
lines changed

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,12 @@ private void receiveRepliesLoop() {
208208
datagram.compressedFlag() == 1 ? decompressor.decompress(bytes) : bytes;
209209

210210
try {
211-
final ReplyT reply = replyCodec.parse(replyBytes);
211+
final ReplyT reply = replyCodec.parse(
212+
replyBytes.toReadableSequentialData(),
213+
false,
214+
false,
215+
Codec.DEFAULT_MAX_DEPTH,
216+
grpcClient.getConfig().maxSize());
212217
pipeline.onNext(reply);
213218
} catch (ParseException e) {
214219
pipeline.onError(e);

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package com.hedera.pbj.grpc.client.helidon;
33

4+
import com.hedera.pbj.runtime.Codec;
45
import com.hedera.pbj.runtime.grpc.GrpcCompression;
56
import io.helidon.common.tls.Tls;
67
import java.time.Duration;
@@ -9,6 +10,7 @@
910

1011
/**
1112
* Configuration for PBJ GRPC client.
13+
* @param maxSize the maximum size of messages that the client is able to receive, defaults to Codec.DEFAULT_MAX_SIZE.
1214
*/
1315
public record PbjGrpcClientConfig(
1416
/** A read timeout. Duration.ofSeconds(10) is a good default. */
@@ -34,15 +36,29 @@ public record PbjGrpcClientConfig(
3436
* e.g. "identity", "gzip", etc.
3537
* Note that the encoding must be registered as a `Decompressor` with `GrpcCompression` to actually be supported.
3638
*/
37-
Set<String> acceptEncodings) {
39+
Set<String> acceptEncodings,
40+
int maxSize) {
3841

42+
/** For backward compatibility before encodings were introduced. */
3943
public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional<String> authority, String contentType) {
4044
this(
4145
readTimeout,
4246
tls,
4347
authority,
4448
contentType,
4549
GrpcCompression.IDENTITY,
46-
GrpcCompression.getDecompressorNames());
50+
GrpcCompression.getDecompressorNames(),
51+
Codec.DEFAULT_MAX_SIZE);
52+
}
53+
54+
/** For backward compatibility before maxSize was introduced. */
55+
public PbjGrpcClientConfig(
56+
Duration readTimeout,
57+
Tls tls,
58+
Optional<String> authority,
59+
String contentType,
60+
String encoding,
61+
Set<String> acceptEncodings) {
62+
this(readTimeout, tls, authority, contentType, encoding, acceptEncodings, Codec.DEFAULT_MAX_SIZE);
4763
}
4864
}

pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.hedera.pbj.runtime.grpc.GrpcStatus;
2222
import com.hedera.pbj.runtime.grpc.Pipeline;
2323
import com.hedera.pbj.runtime.grpc.ServiceInterface;
24+
import com.hedera.pbj.runtime.io.ReadableSequentialData;
2425
import com.hedera.pbj.runtime.io.buffer.Bytes;
2526
import io.helidon.common.buffers.BufferData;
2627
import io.helidon.common.tls.Tls;
@@ -269,7 +270,14 @@ public void testReceiveRepliesLoopSingleReply(final boolean isTimeout) throws Ex
269270
doReturn(bufferData).when(data).data();
270271

271272
final Object reply = mock(Object.class);
272-
doReturn(reply).when(replyCodec).parse(eq(Bytes.wrap(new byte[] {6})));
273+
doReturn(reply)
274+
.when(replyCodec)
275+
.parse(
276+
any(ReadableSequentialData.class),
277+
eq(false),
278+
eq(false),
279+
eq(Codec.DEFAULT_MAX_DEPTH),
280+
eq(Codec.DEFAULT_MAX_SIZE));
273281

274282
runnable.run();
275283

@@ -324,7 +332,14 @@ public void testReceiveRepliesLoopParseException() throws Exception {
324332
doReturn(bufferData).when(data).data();
325333

326334
final ParseException exception = new ParseException("test");
327-
doThrow(exception).when(replyCodec).parse(eq(Bytes.wrap(new byte[] {6})));
335+
doThrow(exception)
336+
.when(replyCodec)
337+
.parse(
338+
any(ReadableSequentialData.class),
339+
eq(false),
340+
eq(false),
341+
eq(Codec.DEFAULT_MAX_DEPTH),
342+
eq(Codec.DEFAULT_MAX_SIZE));
328343

329344
runnable.run();
330345

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/PbjGrpcBench.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.hedera.pbj.grpc.helidon.PbjRouting;
66
import com.hedera.pbj.integration.grpc.GrpcTestUtils;
77
import com.hedera.pbj.integration.grpc.PortsAllocator;
8+
import com.hedera.pbj.runtime.Codec;
89
import com.hedera.pbj.runtime.grpc.GrpcClient;
910
import com.hedera.pbj.runtime.grpc.Pipeline;
1011
import com.hedera.pbj.runtime.grpc.ServiceInterface;
@@ -78,8 +79,8 @@ static GreeterInterface.GreeterClient createClient(final int port, final String[
7879
if (encodings == null || encodings.length == 0) {
7980
grpcClient = GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS);
8081
} else {
81-
grpcClient =
82-
GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS, encodings[0], Set.of(encodings));
82+
grpcClient = GrpcTestUtils.createGrpcClient(
83+
port, GrpcTestUtils.PROTO_OPTIONS, encodings[0], Set.of(encodings), Codec.DEFAULT_MAX_SIZE);
8384
}
8485

8586
return new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);

pbj-integration-tests/src/main/java/com/hedera/pbj/integration/grpc/GrpcTestUtils.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.hedera.pbj.grpc.client.helidon.PbjGrpcClient;
55
import com.hedera.pbj.grpc.client.helidon.PbjGrpcClientConfig;
6+
import com.hedera.pbj.runtime.Codec;
67
import com.hedera.pbj.runtime.grpc.GrpcClient;
78
import com.hedera.pbj.runtime.grpc.GrpcCompression;
89
import com.hedera.pbj.runtime.grpc.ServiceInterface;
@@ -35,14 +36,20 @@ public static void sleep(final GrpcClient grpcClient) {
3536
}
3637

3738
public static GrpcClient createGrpcClient(final int port, final ServiceInterface.RequestOptions requestOptions) {
38-
return createGrpcClient(port, requestOptions, GrpcCompression.IDENTITY, GrpcCompression.getDecompressorNames());
39+
return createGrpcClient(
40+
port,
41+
requestOptions,
42+
GrpcCompression.IDENTITY,
43+
GrpcCompression.getDecompressorNames(),
44+
Codec.DEFAULT_MAX_SIZE);
3945
}
4046

4147
public static GrpcClient createGrpcClient(
4248
final int port,
4349
final ServiceInterface.RequestOptions requestOptions,
4450
String encoding,
45-
Set<String> acceptEncodings) {
51+
Set<String> acceptEncodings,
52+
int maxSize) {
4653
final Tls tls = Tls.builder().enabled(false).build();
4754
final WebClient webClient =
4855
WebClient.builder().baseUri("http://localhost:" + port).tls(tls).build();
@@ -52,7 +59,7 @@ public static GrpcClient createGrpcClient(
5259
requestOptions.authority().isPresent() ? requestOptions.authority() : Optional.of("localhost:" + port);
5360

5461
final PbjGrpcClientConfig config = new PbjGrpcClientConfig(
55-
READ_TIMEOUT, tls, authority, requestOptions.contentType(), encoding, acceptEncodings);
62+
READ_TIMEOUT, tls, authority, requestOptions.contentType(), encoding, acceptEncodings, maxSize);
5663

5764
return new PbjGrpcClient(webClient, config);
5865
}

pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
import com.hedera.pbj.integration.grpc.GrpcTestUtils;
99
import com.hedera.pbj.integration.grpc.PortsAllocator;
10+
import com.hedera.pbj.runtime.Codec;
1011
import com.hedera.pbj.runtime.grpc.GrpcClient;
12+
import com.hedera.pbj.runtime.grpc.GrpcCompression;
1113
import com.hedera.pbj.runtime.grpc.GrpcException;
1214
import com.hedera.pbj.runtime.grpc.Pipeline;
1315
import java.util.ArrayList;
@@ -108,6 +110,32 @@ void testUnaryMethodHappyCase() {
108110
}
109111
}
110112

113+
@Test
114+
void testUnaryMethodReceivingExtraLargePayload() {
115+
try (final PortsAllocator.Port port = GrpcTestUtils.PORTS.acquire();
116+
final GrpcServerGreeterHandle server = serverFactory.apply(port.port())) {
117+
server.start();
118+
final String extraLongString = "a".repeat(Codec.DEFAULT_MAX_SIZE * 2);
119+
server.setSayHello(
120+
request -> HelloReply.newBuilder().message(extraLongString).build());
121+
122+
final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
123+
port.port(),
124+
GrpcTestUtils.PROTO_OPTIONS,
125+
GrpcCompression.IDENTITY,
126+
GrpcCompression.getDecompressorNames(),
127+
Codec.DEFAULT_MAX_SIZE * 2);
128+
final GreeterInterface.GreeterClient client =
129+
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
130+
131+
final HelloRequest request =
132+
HelloRequest.newBuilder().name("test name").build();
133+
final HelloReply reply = client.sayHello(request);
134+
135+
assertEquals(extraLongString, reply.message());
136+
}
137+
}
138+
111139
@Test
112140
void testServerStreamingMethodHappyCase() {
113141
try (final PortsAllocator.Port port = GrpcTestUtils.PORTS.acquire();

pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.hedera.pbj.grpc.helidon.PbjRouting;
88
import com.hedera.pbj.integration.grpc.GrpcTestUtils;
99
import com.hedera.pbj.integration.grpc.PortsAllocator;
10+
import com.hedera.pbj.runtime.Codec;
1011
import com.hedera.pbj.runtime.grpc.GrpcClient;
1112
import io.grpc.ManagedChannel;
1213
import io.grpc.ManagedChannelBuilder;
@@ -28,8 +29,8 @@ void testPbjGrpcClientCompressionWithGoogleGrpcServer() {
2829
server.setSayHello(request ->
2930
HelloReply.newBuilder().message("Hello " + request.name()).build());
3031

31-
try (final GrpcClient grpcClient =
32-
GrpcTestUtils.createGrpcClient(port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"))) {
32+
try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
33+
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
3334
final GreeterInterface.GreeterClient client =
3435
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
3536

@@ -107,8 +108,8 @@ void testPbjGrpcServerCompressionWithPbjClient() {
107108
handle.setSayHello(request ->
108109
HelloReply.newBuilder().message("Hello " + request.name()).build());
109110

110-
try (final GrpcClient grpcClient =
111-
GrpcTestUtils.createGrpcClient(port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"))) {
111+
try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
112+
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
112113
final GreeterInterface.GreeterClient client =
113114
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
114115

0 commit comments

Comments
 (0)