Skip to content

Commit f3046ac

Browse files
committed
cronet: Add API in channel builder for specifying buffer size to use to read from the transport.
Fixes grpc#12198
1 parent e2ea747 commit f3046ac

File tree

3 files changed

+31
-9
lines changed

3 files changed

+31
-9
lines changed

cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
@ExperimentalApi("There is no plan to make this API stable, given transport API instability")
5555
public final class CronetChannelBuilder extends ForwardingChannelBuilder2<CronetChannelBuilder> {
5656

57+
private static final int DEFAULT_READ_BUFFER_SIZE = 4 * 1024;
58+
5759
/** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */
5860
public static abstract class StreamBuilderFactory {
5961
public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder(
@@ -109,6 +111,7 @@ public static CronetChannelBuilder forAddress(String name, int port) {
109111
private boolean trafficStatsUidSet;
110112
private int trafficStatsUid;
111113
private Network network;
114+
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
112115

113116
private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) {
114117
final class CronetChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
@@ -142,6 +145,15 @@ public CronetChannelBuilder maxMessageSize(int maxMessageSize) {
142145
return this;
143146
}
144147

148+
/**
149+
* Sets the buffer size to read from the network. Default to {@link #DEFAULT_READ_BUFFER_SIZE}.
150+
*/
151+
public CronetChannelBuilder readBufferSize(int readBufferSize) {
152+
checkArgument(readBufferSize >= 0, "readBufferSize must be >= 0");
153+
this.readBufferSize = readBufferSize;
154+
return this;
155+
}
156+
145157
/**
146158
* Sets the Cronet channel to always use PUT instead of POST. Defaults to false.
147159
*/
@@ -226,7 +238,8 @@ ClientTransportFactory buildTransportFactory() {
226238
trafficStatsTag,
227239
trafficStatsUidSet,
228240
trafficStatsUid,
229-
network),
241+
network,
242+
readBufferSize),
230243
MoreExecutors.directExecutor(),
231244
scheduledExecutorService,
232245
maxMessageSize,
@@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory {
247260
private final boolean usingSharedScheduler;
248261
private final boolean useGetForSafeMethods;
249262
private final boolean usePutForIdempotentMethods;
263+
private final int readBufferSize;
250264

251265
private CronetTransportFactory(
252266
StreamBuilderFactory streamFactory,
@@ -256,7 +270,8 @@ private CronetTransportFactory(
256270
boolean alwaysUsePut,
257271
TransportTracer transportTracer,
258272
boolean useGetForSafeMethods,
259-
boolean usePutForIdempotentMethods) {
273+
boolean usePutForIdempotentMethods,
274+
int readBufferSize) {
260275
usingSharedScheduler = timeoutService == null;
261276
this.timeoutService = usingSharedScheduler
262277
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
@@ -267,6 +282,7 @@ private CronetTransportFactory(
267282
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
268283
this.useGetForSafeMethods = useGetForSafeMethods;
269284
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
285+
this.readBufferSize = readBufferSize;
270286
}
271287

272288
@Override
@@ -275,7 +291,8 @@ public ConnectionClientTransport newClientTransport(
275291
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
276292
return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(),
277293
options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize,
278-
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods);
294+
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods,
295+
readBufferSize);
279296
}
280297

281298
@Override

cronet/src/main/java/io/grpc/cronet/CronetClientStream.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
* Client stream for the cronet transport.
6060
*/
6161
class CronetClientStream extends AbstractClientStream {
62-
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
6362
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
6463
private static final String LOG_TAG = "grpc-java-cronet";
6564

@@ -85,6 +84,7 @@ class CronetClientStream extends AbstractClientStream {
8584
private final Collection<Object> annotations;
8685
private final TransportState state;
8786
private final Sink sink = new Sink();
87+
private final int readBufferSize;
8888
private StreamBuilderFactory streamFactory;
8989

9090
CronetClientStream(
@@ -102,7 +102,8 @@ class CronetClientStream extends AbstractClientStream {
102102
CallOptions callOptions,
103103
TransportTracer transportTracer,
104104
boolean useGetForSafeMethods,
105-
boolean usePutForIdempotentMethods) {
105+
boolean usePutForIdempotentMethods,
106+
int readBufferSize) {
106107
super(
107108
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
108109
useGetForSafeMethods && method.isSafe());
@@ -120,6 +121,7 @@ class CronetClientStream extends AbstractClientStream {
120121
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
121122
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
122123
callOptions);
124+
this.readBufferSize = readBufferSize;
123125

124126
// Tests expect the "plain" deframer behavior, not MigratingDeframer
125127
// https://github.com/grpc/grpc-java/issues/7140
@@ -309,7 +311,7 @@ public void bytesRead(int processedBytes) {
309311
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
310312
Log.v(LOG_TAG, "BidirectionalStream.read");
311313
}
312-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
314+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
313315
}
314316
}
315317

@@ -429,7 +431,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
429431
Log.v(LOG_TAG, "BidirectionalStream.read");
430432
}
431433
reportHeaders(info.getAllHeadersAsList(), false);
432-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
434+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
433435
}
434436

435437
@Override

cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class CronetClientTransport implements ConnectionClientTransport {
6565
private final boolean useGetForSafeMethods;
6666
private final boolean usePutForIdempotentMethods;
6767
private final StreamBuilderFactory streamFactory;
68+
private final int readBufferSize;
6869
// Indicates the transport is in go-away state: no new streams will be processed,
6970
// but existing streams may continue.
7071
@GuardedBy("lock")
@@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport {
9293
boolean alwaysUsePut,
9394
TransportTracer transportTracer,
9495
boolean useGetForSafeMethods,
95-
boolean usePutForIdempotentMethods) {
96+
boolean usePutForIdempotentMethods,
97+
int readBufferSize) {
9698
this.address = Preconditions.checkNotNull(address, "address");
9799
this.logId = InternalLogId.allocate(getClass(), address.toString());
98100
this.authority = authority;
@@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport {
108110
.build();
109111
this.useGetForSafeMethods = useGetForSafeMethods;
110112
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
113+
this.readBufferSize = readBufferSize;
111114
}
112115

113116
@Override
@@ -132,7 +135,7 @@ class StartCallback implements Runnable {
132135
final CronetClientStream clientStream = new CronetClientStream(
133136
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
134137
alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods,
135-
usePutForIdempotentMethods);
138+
usePutForIdempotentMethods, readBufferSize);
136139

137140
@Override
138141
public void run() {

0 commit comments

Comments
 (0)