Skip to content

Commit 31fdb6c

Browse files
aymanm-googleejona86
authored andcommitted
Add CRONET_READ_BUFFER_SIZE_KEY API to CronetClientStream
By default, CronetClientStreams would use a 4KB buffer to read data from Cronet. This can be inefficient especially if the amount of data being read is huge (~MBs) as each read callback operation incur overhead from Cronet itself (e.g. Context switch, JNI calls). The alternative would be to immediately bump the default to a bigger number but that would incur an increase in memory usage. So in order to safely experiment on this, An OptionKey is introduced which allows setting a per-stream value which will be controlled in a controlled environment to ensure we find the best new default.
1 parent 470219f commit 31fdb6c

File tree

3 files changed

+60
-3
lines changed

3 files changed

+60
-3
lines changed

cronet/src/main/java/io/grpc/cronet/CronetClientStream.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
* Client stream for the cronet transport.
6060
*/
6161
class CronetClientStream extends AbstractClientStream {
62-
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
6362
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
6463
private static final String LOG_TAG = "grpc-java-cronet";
6564

@@ -69,6 +68,12 @@ class CronetClientStream extends AbstractClientStream {
6968

7069
static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY =
7170
CallOptions.Key.create("cronet-annotations");
71+
/**
72+
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
73+
* size leads to less overhead but more memory consumption. The current default value is 4KB.
74+
*/
75+
static final CallOptions.Key<Integer> CRONET_READ_BUFFER_SIZE_KEY =
76+
CallOptions.Key.createWithDefault("cronet-read-buffer-size", 4 * 1024);
7277

7378
private final String url;
7479
private final String userAgent;
@@ -85,6 +90,8 @@ class CronetClientStream extends AbstractClientStream {
8590
private final Collection<Object> annotations;
8691
private final TransportState state;
8792
private final Sink sink = new Sink();
93+
@VisibleForTesting
94+
final int readBufferSize;
8895
private StreamBuilderFactory streamFactory;
8996

9097
CronetClientStream(
@@ -120,6 +127,7 @@ class CronetClientStream extends AbstractClientStream {
120127
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
121128
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
122129
callOptions);
130+
this.readBufferSize = callOptions.getOption(CRONET_READ_BUFFER_SIZE_KEY);
123131

124132
// Tests expect the "plain" deframer behavior, not MigratingDeframer
125133
// https://github.com/grpc/grpc-java/issues/7140
@@ -309,7 +317,7 @@ public void bytesRead(int processedBytes) {
309317
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
310318
Log.v(LOG_TAG, "BidirectionalStream.read");
311319
}
312-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
320+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
313321
}
314322
}
315323

@@ -429,7 +437,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
429437
Log.v(LOG_TAG, "BidirectionalStream.read");
430438
}
431439
reportHeaders(info.getAllHeadersAsList(), false);
432-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
440+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
433441
}
434442

435443
@Override

cronet/src/main/java/io/grpc/cronet/InternalCronetCallOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ public static CallOptions withAnnotation(CallOptions callOptions, Object annotat
3636
return CronetClientStream.withAnnotation(callOptions, annotation);
3737
}
3838

39+
public static CallOptions withReadBufferSize(CallOptions callOptions, int size) {
40+
return callOptions.withOption(CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY, size);
41+
}
42+
43+
/**
44+
* Returns Cronet read buffer size for gRPC included in the given {@code callOptions}. Read
45+
* buffer can be customized via {@link #withReadBufferSize(CallOptions, int)}.
46+
*/
47+
public static int getReadBufferSize(CallOptions callOptions) {
48+
return callOptions.getOption(CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY);
49+
}
50+
3951
/**
4052
* Returns Cronet annotations for gRPC included in the given {@code callOptions}. Annotations
4153
* are attached via {@link #withAnnotation(CallOptions, Object)}.

cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package io.grpc.cronet;
1818

19+
import static io.grpc.cronet.CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY;
1920
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
21+
import static org.junit.Assert.assertEquals;
2022
import static org.junit.Assert.assertFalse;
2123
import static org.junit.Assert.assertSame;
2224
import static org.junit.Assert.assertTrue;
@@ -92,6 +94,41 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
9294
assertFalse(stream.idempotent);
9395
}
9496

97+
@Test
98+
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
99+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
100+
CronetTransportFactory transportFactory =
101+
(CronetTransportFactory) builder.buildTransportFactory();
102+
CronetClientTransport transport =
103+
(CronetClientTransport)
104+
transportFactory.newClientTransport(
105+
new InetSocketAddress("localhost", 443),
106+
new ClientTransportOptions(),
107+
channelLogger);
108+
CronetClientStream stream = transport.newStream(
109+
method, new Metadata(), CallOptions.DEFAULT, tracers);
110+
111+
assertEquals(4 * 1024, stream.readBufferSize);
112+
}
113+
114+
@Test
115+
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
116+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
117+
CronetTransportFactory transportFactory =
118+
(CronetTransportFactory) builder.buildTransportFactory();
119+
CronetClientTransport transport =
120+
(CronetClientTransport)
121+
transportFactory.newClientTransport(
122+
new InetSocketAddress("localhost", 443),
123+
new ClientTransportOptions(),
124+
channelLogger);
125+
CronetClientStream stream = transport.newStream(
126+
method, new Metadata(),
127+
CallOptions.DEFAULT.withOption(CRONET_READ_BUFFER_SIZE_KEY, 32 * 1024), tracers);
128+
129+
assertEquals(32 * 1024, stream.readBufferSize);
130+
}
131+
95132
@Test
96133
public void scheduledExecutorService_default() {
97134
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);

0 commit comments

Comments
 (0)