Skip to content

Commit 2e8437a

Browse files
fix: propagate maxSize to PbjGrpcDatagramReader (#757)
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent 9cc32d7 commit 2e8437a

File tree

8 files changed

+72
-22
lines changed

8 files changed

+72
-22
lines changed

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ private void receiveRepliesLoop() {
174174
: null);
175175

176176
// read data from stream
177-
final PbjGrpcDatagramReader datagramReader = new PbjGrpcDatagramReader();
177+
final PbjGrpcDatagramReader datagramReader =
178+
new PbjGrpcDatagramReader(grpcClient.getConfig().maxIncomingBufferSize());
178179
while (isStreamOpen() && !clientStream.trailers().isDone() && clientStream.hasEntity()) {
179180
final Http2FrameData frameData;
180181
try {

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
/**
1212
* Configuration for PBJ GRPC client.
1313
* @param maxSize the maximum size of messages that the client is able to receive, defaults to Codec.DEFAULT_MAX_SIZE.
14+
* @param maxIncomingBufferSize the max size of an incoming buffer for receiving messages. Must be larger than
15+
* the `maxSize` to account for protobuf metadata as well as support high rate of ingress
16+
* of multiple messages, especially in case of server or bidi streaming.
17+
* Defaults to Codec.DEFAULT_MAX_SIZE * 5.
1418
*/
1519
public record PbjGrpcClientConfig(
1620
/** A read timeout. Duration.ofSeconds(10) is a good default. */
@@ -37,7 +41,8 @@ public record PbjGrpcClientConfig(
3741
* Note that the encoding must be registered as a `Decompressor` with `GrpcCompression` to actually be supported.
3842
*/
3943
Set<String> acceptEncodings,
40-
int maxSize) {
44+
int maxSize,
45+
int maxIncomingBufferSize) {
4146

4247
/** For backward compatibility before encodings were introduced. */
4348
public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional<String> authority, String contentType) {
@@ -48,7 +53,8 @@ public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional<String> autho
4853
contentType,
4954
GrpcCompression.IDENTITY,
5055
GrpcCompression.getDecompressorNames(),
51-
Codec.DEFAULT_MAX_SIZE);
56+
Codec.DEFAULT_MAX_SIZE,
57+
Codec.DEFAULT_MAX_SIZE * 5);
5258
}
5359

5460
/** For backward compatibility before maxSize was introduced. */
@@ -59,6 +65,14 @@ public PbjGrpcClientConfig(
5965
String contentType,
6066
String encoding,
6167
Set<String> acceptEncodings) {
62-
this(readTimeout, tls, authority, contentType, encoding, acceptEncodings, Codec.DEFAULT_MAX_SIZE);
68+
this(
69+
readTimeout,
70+
tls,
71+
authority,
72+
contentType,
73+
encoding,
74+
acceptEncodings,
75+
Codec.DEFAULT_MAX_SIZE,
76+
Codec.DEFAULT_MAX_SIZE * 5);
6377
}
6478
}

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReader.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ record Datagram(int compressedFlag, BufferData data) {}
3434
*/
3535
static final int PREFIX_LENGTH = 5;
3636

37-
// These are arbitrary limits, but they seem sane and support the immediate use-case.
38-
// It would be nice to make them configurable in the future.
37+
// This is an arbitrary initial size, but it seems reasonable.
3938
private static final int INITIAL_BUFFER_SIZE = 1024;
40-
private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024;
4139

4240
/**
4341
* A buffer for incoming data. We copy the incoming BufferData objects into this buffer
@@ -49,7 +47,12 @@ record Datagram(int compressedFlag, BufferData data) {}
4947
* byte arrays when adding data to the buffer. Also, it doesn't support a capacity limit which
5048
* may be important to prevent OOM. For this reason, we use a low-level circular byte array here.
5149
*/
52-
private byte[] buffer = new byte[INITIAL_BUFFER_SIZE];
50+
private byte[] buffer;
51+
52+
/**
53+
* The maximum size of the buffer.
54+
*/
55+
private final int maxBufferSize;
5356

5457
/** Where we read data from. */
5558
private int readPosition = 0;
@@ -60,6 +63,11 @@ record Datagram(int compressedFlag, BufferData data) {}
6063
/** The length of the actual data added to the reader. Note that the buffer is circular. */
6164
private int length = 0;
6265

66+
PbjGrpcDatagramReader(final int maxBufferSize) {
67+
this.maxBufferSize = maxBufferSize;
68+
this.buffer = new byte[Math.min(INITIAL_BUFFER_SIZE, maxBufferSize)];
69+
}
70+
6371
/**
6472
* Add a new piece of data to this reader. It may be a complete GRPC datagram, or a piece of it,
6573
* maybe even a piece containing a tail of one datagram and a head of another.
@@ -161,12 +169,12 @@ private void ensureCapacity(final int minCapacity) {
161169
}
162170

163171
final int newLength = buffer.length + (minCapacity - currentCapacity);
164-
if (newLength > MAX_BUFFER_SIZE) {
172+
if (newLength > maxBufferSize) {
165173
throw new BufferOverflowException();
166174
}
167175

168176
// Prefer to double the size each time. But resort to the newLength if it's greater, and respect the max limit.
169-
final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), MAX_BUFFER_SIZE);
177+
final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), maxBufferSize);
170178
final byte[] newBuffer = new byte[actualNewLength];
171179

172180
if (length > 0) {

pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReaderTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.junit.jupiter.api.Test;
1414

1515
public class PbjGrpcDatagramReaderTest {
16+
private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024;
17+
1618
@Test
1719
void checkBufferOverflow() {
18-
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
20+
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);
1921

2022
// First, test the happy case, fill up the buffer to the current max limit (note that it's hard-coded here):
2123
BufferData goodData = BufferData.create("a".repeat(10 * 1024 * 1024));
@@ -28,7 +30,7 @@ void checkBufferOverflow() {
2830

2931
@Test
3032
void testSingleCompleteDatagram() {
31-
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
33+
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);
3234

3335
// Trivial case of a zero-size datagram
3436
BufferData zeroData = BufferData.create(new byte[] {0, 0, 0, 0, 0});
@@ -66,7 +68,7 @@ void testSingleCompleteDatagram() {
6668

6769
@Test
6870
void testSplitDatagram() {
69-
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
71+
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);
7072

7173
// This is very similar to the many bytes long datagram test above, but we feed the reader
7274
// little by little instead of adding the entire datagram at once:
@@ -97,7 +99,7 @@ void testSplitDatagram() {
9799

98100
@Test
99101
void testFlipCircularBuffer() {
100-
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
102+
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);
101103

102104
// The initial size is currently 1024, so fill it up almost completely:
103105
String dataString = "a".repeat(1000);
@@ -177,7 +179,7 @@ private void testDatagrams(final PbjGrpcDatagramReader reader, final List<String
177179

178180
@Test
179181
void testEnlargePartiallyFilledBuffer() {
180-
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
182+
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);
181183

182184
// Add two datagrams of 1000 bytes, which will enlarge the initial 1024 bytes buffer
183185
testDatagrams(reader, List.of("a".repeat(1000), "b".repeat(1000)));

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/PbjGrpcBench.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,12 @@ static GreeterInterface.GreeterClient createClient(final int port, final String[
8080
grpcClient = GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS);
8181
} else {
8282
grpcClient = GrpcTestUtils.createGrpcClient(
83-
port, GrpcTestUtils.PROTO_OPTIONS, encodings[0], Set.of(encodings), Codec.DEFAULT_MAX_SIZE);
83+
port,
84+
GrpcTestUtils.PROTO_OPTIONS,
85+
encodings[0],
86+
Set.of(encodings),
87+
Codec.DEFAULT_MAX_SIZE,
88+
Codec.DEFAULT_MAX_SIZE * 5);
8489
}
8590

8691
return new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);

pbj-integration-tests/src/main/java/com/hedera/pbj/integration/grpc/GrpcTestUtils.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,17 @@ public static GrpcClient createGrpcClient(final int port, final ServiceInterface
4141
requestOptions,
4242
GrpcCompression.IDENTITY,
4343
GrpcCompression.getDecompressorNames(),
44-
Codec.DEFAULT_MAX_SIZE);
44+
Codec.DEFAULT_MAX_SIZE,
45+
Codec.DEFAULT_MAX_SIZE * 5);
4546
}
4647

4748
public static GrpcClient createGrpcClient(
4849
final int port,
4950
final ServiceInterface.RequestOptions requestOptions,
5051
String encoding,
5152
Set<String> acceptEncodings,
52-
int maxSize) {
53+
int maxSize,
54+
int maxIncomingBufferSize) {
5355
final Tls tls = Tls.builder().enabled(false).build();
5456
final WebClient webClient =
5557
WebClient.builder().baseUri("http://localhost:" + port).tls(tls).build();
@@ -59,7 +61,14 @@ public static GrpcClient createGrpcClient(
5961
requestOptions.authority().isPresent() ? requestOptions.authority() : Optional.of("localhost:" + port);
6062

6163
final PbjGrpcClientConfig config = new PbjGrpcClientConfig(
62-
READ_TIMEOUT, tls, authority, requestOptions.contentType(), encoding, acceptEncodings, maxSize);
64+
READ_TIMEOUT,
65+
tls,
66+
authority,
67+
requestOptions.contentType(),
68+
encoding,
69+
acceptEncodings,
70+
maxSize,
71+
maxIncomingBufferSize);
6372

6473
return new PbjGrpcClient(webClient, config);
6574
}

pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ void testUnaryMethodReceivingExtraLargePayload() {
124124
GrpcTestUtils.PROTO_OPTIONS,
125125
GrpcCompression.IDENTITY,
126126
GrpcCompression.getDecompressorNames(),
127-
Codec.DEFAULT_MAX_SIZE * 2);
127+
Codec.DEFAULT_MAX_SIZE * 2,
128+
Codec.DEFAULT_MAX_SIZE * 5);
128129
final GreeterInterface.GreeterClient client =
129130
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
130131

pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ void testPbjGrpcClientCompressionWithGoogleGrpcServer() {
3030
HelloReply.newBuilder().message("Hello " + request.name()).build());
3131

3232
try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
33-
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
33+
port.port(),
34+
GrpcTestUtils.PROTO_OPTIONS,
35+
"gzip",
36+
Set.of("gzip"),
37+
Codec.DEFAULT_MAX_SIZE,
38+
Codec.DEFAULT_MAX_SIZE * 5)) {
3439
final GreeterInterface.GreeterClient client =
3540
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
3641

@@ -109,7 +114,12 @@ void testPbjGrpcServerCompressionWithPbjClient() {
109114
HelloReply.newBuilder().message("Hello " + request.name()).build());
110115

111116
try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
112-
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
117+
port.port(),
118+
GrpcTestUtils.PROTO_OPTIONS,
119+
"gzip",
120+
Set.of("gzip"),
121+
Codec.DEFAULT_MAX_SIZE,
122+
Codec.DEFAULT_MAX_SIZE * 5)) {
113123
final GreeterInterface.GreeterClient client =
114124
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
115125

0 commit comments

Comments
 (0)