Skip to content

Commit ca43d78

Browse files
authored
inprocess: Support tracing message sizes (grpc#11406)
1 parent a01a9e2 commit ca43d78

File tree

10 files changed

+207
-103
lines changed

10 files changed

+207
-103
lines changed

census/src/test/java/io/grpc/census/CensusModulesTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import io.grpc.ClientInterceptors;
5757
import io.grpc.ClientStreamTracer;
5858
import io.grpc.Context;
59+
import io.grpc.KnownLength;
5960
import io.grpc.Metadata;
6061
import io.grpc.MethodDescriptor;
6162
import io.grpc.ServerCall;
@@ -99,6 +100,7 @@
99100
import io.opencensus.trace.Tracer;
100101
import io.opencensus.trace.propagation.BinaryFormat;
101102
import io.opencensus.trace.propagation.SpanContextParseException;
103+
import java.io.IOException;
102104
import java.io.InputStream;
103105
import java.util.HashSet;
104106
import java.util.List;
@@ -136,7 +138,7 @@ public class CensusModulesTest {
136138
ClientStreamTracer.StreamInfo.newBuilder()
137139
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();
138140

139-
private static class StringInputStream extends InputStream {
141+
private static class StringInputStream extends InputStream implements KnownLength {
140142
final String string;
141143

142144
StringInputStream(String string) {
@@ -149,6 +151,11 @@ public int read() {
149151
// passed to the InProcess server and consumed by MARSHALLER.parse().
150152
throw new UnsupportedOperationException("Should not be called");
151153
}
154+
155+
@Override
156+
public int available() throws IOException {
157+
return string == null ? 0 : string.length();
158+
}
152159
}
153160

154161
private static final MethodDescriptor.Marshaller<String> MARSHALLER =

core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java

Lines changed: 29 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,6 @@ protected abstract InternalServer newServer(
136136
*/
137137
protected abstract String testAuthority(InternalServer server);
138138

139-
/**
140-
* Returns true (which is default) if the transport reports message sizes to StreamTracers.
141-
*/
142-
protected boolean sizesReported() {
143-
return true;
144-
}
145-
146139
protected final Attributes eagAttrs() {
147140
return EAG_ATTRS;
148141
}
@@ -163,9 +156,9 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
163156
* tests in an indeterminate state.
164157
*/
165158
protected InternalServer server;
166-
private ServerTransport serverTransport;
167-
private ManagedClientTransport client;
168-
private MethodDescriptor<String, String> methodDescriptor =
159+
protected ServerTransport serverTransport;
160+
protected ManagedClientTransport client;
161+
protected MethodDescriptor<String, String> methodDescriptor =
169162
MethodDescriptor.<String, String>newBuilder()
170163
.setType(MethodDescriptor.MethodType.UNKNOWN)
171164
.setFullMethodName("service/method")
@@ -182,22 +175,22 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
182175
"tracer-key", Metadata.ASCII_STRING_MARSHALLER);
183176
private final String tracerKeyValue = "tracer-key-value";
184177

185-
private ManagedClientTransport.Listener mockClientTransportListener
178+
protected ManagedClientTransport.Listener mockClientTransportListener
186179
= mock(ManagedClientTransport.Listener.class);
187-
private MockServerListener serverListener = new MockServerListener();
180+
protected MockServerListener serverListener = new MockServerListener();
188181
private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
189-
private final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
182+
protected final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
190183
private final TestClientStreamTracer clientStreamTracer2 = new TestHeaderClientStreamTracer();
191-
private final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
184+
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
192185
clientStreamTracer1, clientStreamTracer2
193186
};
194187
private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
195188
new ClientStreamTracer() {}
196189
};
197190

198-
private final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
191+
protected final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
199192
private final TestServerStreamTracer serverStreamTracer2 = new TestServerStreamTracer();
200-
private final ServerStreamTracer.Factory serverStreamTracerFactory = mock(
193+
protected final ServerStreamTracer.Factory serverStreamTracerFactory = mock(
201194
ServerStreamTracer.Factory.class,
202195
delegatesTo(new ServerStreamTracer.Factory() {
203196
final ArrayDeque<TestServerStreamTracer> tracers =
@@ -857,26 +850,16 @@ public void basicStream() throws Exception {
857850
message.close();
858851
assertThat(clientStreamTracer1.nextOutboundEvent())
859852
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
860-
if (sizesReported()) {
861-
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
862-
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
863-
} else {
864-
assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
865-
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
866-
}
853+
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
854+
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
867855
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
868856
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());
869857

870858
clientStream.halfClose();
871859
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));
872860

873-
if (sizesReported()) {
874-
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
875-
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
876-
} else {
877-
assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L);
878-
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
879-
}
861+
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
862+
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
880863
assertThat(serverStreamTracer1.nextInboundEvent())
881864
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
882865

@@ -907,25 +890,15 @@ public void basicStream() throws Exception {
907890
assertNotNull("message expected", message);
908891
assertThat(serverStreamTracer1.nextOutboundEvent())
909892
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
910-
if (sizesReported()) {
911-
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
912-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
913-
} else {
914-
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
915-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
916-
}
893+
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
894+
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
917895
assertTrue(clientStreamTracer1.getInboundHeaders());
918896
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
919897
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
920898
assertThat(clientStreamTracer1.nextInboundEvent())
921899
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
922-
if (sizesReported()) {
923-
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
924-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
925-
} else {
926-
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
927-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
928-
}
900+
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
901+
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
929902

930903
message.close();
931904
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
@@ -1285,17 +1258,10 @@ public void onReady() {
12851258
serverStream.close(Status.OK, new Metadata());
12861259
assertTrue(clientStreamTracer1.getOutboundHeaders());
12871260
assertTrue(clientStreamTracer1.getInboundHeaders());
1288-
if (sizesReported()) {
1289-
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
1290-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
1291-
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
1292-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
1293-
} else {
1294-
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
1295-
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
1296-
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
1297-
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
1298-
}
1261+
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
1262+
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
1263+
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
1264+
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
12991265
assertNull(clientStreamTracer1.getInboundTrailers());
13001266
assertSame(status, clientStreamTracer1.getStatus());
13011267
// There is a race between client cancelling and server closing. The final status seen by the
@@ -2184,7 +2150,7 @@ private static void runIfNotNull(Runnable runnable) {
21842150
}
21852151
}
21862152

2187-
private static void startTransport(
2153+
protected static void startTransport(
21882154
ManagedClientTransport clientTransport,
21892155
ManagedClientTransport.Listener listener) {
21902156
runIfNotNull(clientTransport.start(listener));
@@ -2202,7 +2168,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
22022168
}
22032169
}
22042170

2205-
private static class MockServerListener implements ServerListener {
2171+
public static class MockServerListener implements ServerListener {
22062172
public final BlockingQueue<MockServerTransportListener> listeners
22072173
= new LinkedBlockingQueue<>();
22082174
private final SettableFuture<?> shutdown = SettableFuture.create();
@@ -2233,7 +2199,7 @@ public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit uni
22332199
}
22342200
}
22352201

2236-
private static class MockServerTransportListener implements ServerTransportListener {
2202+
public static class MockServerTransportListener implements ServerTransportListener {
22372203
public final ServerTransport transport;
22382204
public final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<>();
22392205
private final SettableFuture<?> terminated = SettableFuture.create();
@@ -2281,8 +2247,8 @@ public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit)
22812247
}
22822248
}
22832249

2284-
private static class ServerStreamListenerBase implements ServerStreamListener {
2285-
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
2250+
public static class ServerStreamListenerBase implements ServerStreamListener {
2251+
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
22862252
// Would have used Void instead of Object, but null elements are not allowed
22872253
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
22882254
private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
@@ -2341,8 +2307,8 @@ public void closed(Status status) {
23412307
}
23422308
}
23432309

2344-
private static class ClientStreamListenerBase implements ClientStreamListener {
2345-
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
2310+
public static class ClientStreamListenerBase implements ClientStreamListener {
2311+
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
23462312
// Would have used Void instead of Object, but null elements are not allowed
23472313
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
23482314
private final SettableFuture<Metadata> headers = SettableFuture.create();
@@ -2399,7 +2365,7 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
23992365
}
24002366
}
24012367

2402-
private static class StreamCreation {
2368+
public static class StreamCreation {
24032369
public final ServerStream stream;
24042370
public final String method;
24052371
public final Metadata headers;

inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
9494
private ScheduledExecutorService scheduledExecutorService;
9595
private int maxInboundMetadataSize = Integer.MAX_VALUE;
9696
private boolean transportIncludeStatusCause = false;
97+
private long assumedMessageSize = -1;
9798

9899
private InProcessChannelBuilder(@Nullable SocketAddress directAddress, @Nullable String target) {
99100

@@ -117,10 +118,6 @@ public ClientTransportFactory buildClientTransportFactory() {
117118
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
118119
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
119120
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);
120-
121-
// By default, In-process transport should not be retriable as that leaks memory. Since
122-
// there is no wire, bytes aren't calculated so buffer limit isn't respected
123-
managedChannelImplBuilder.disableRetry();
124121
}
125122

126123
@Internal
@@ -225,9 +222,24 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
225222
return this;
226223
}
227224

225+
/**
226+
* Assumes RPC messages are the specified size. This avoids serializing
227+
* messages for metrics and retry memory tracking. This can dramatically
228+
* improve performance when accurate message sizes are not needed and if
229+
* nothing else needs the serialized message.
230+
* @param assumedMessageSize length of InProcess transport's messageSize.
231+
* @return this
232+
* @throws IllegalArgumentException if assumedMessageSize is negative.
233+
*/
234+
public InProcessChannelBuilder assumedMessageSize(long assumedMessageSize) {
235+
checkArgument(assumedMessageSize >= 0, "assumedMessageSize must be >= 0");
236+
this.assumedMessageSize = assumedMessageSize;
237+
return this;
238+
}
239+
228240
ClientTransportFactory buildTransportFactory() {
229-
return new InProcessClientTransportFactory(
230-
scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
241+
return new InProcessClientTransportFactory(scheduledExecutorService,
242+
maxInboundMetadataSize, transportIncludeStatusCause, assumedMessageSize);
231243
}
232244

233245
void setStatsEnabled(boolean value) {
@@ -243,15 +255,17 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
243255
private final int maxInboundMetadataSize;
244256
private boolean closed;
245257
private final boolean includeCauseWithStatus;
258+
private long assumedMessageSize;
246259

247260
private InProcessClientTransportFactory(
248261
@Nullable ScheduledExecutorService scheduledExecutorService,
249-
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
262+
int maxInboundMetadataSize, boolean includeCauseWithStatus, long assumedMessageSize) {
250263
useSharedTimer = scheduledExecutorService == null;
251264
timerService = useSharedTimer
252265
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
253266
this.maxInboundMetadataSize = maxInboundMetadataSize;
254267
this.includeCauseWithStatus = includeCauseWithStatus;
268+
this.assumedMessageSize = assumedMessageSize;
255269
}
256270

257271
@Override
@@ -263,7 +277,7 @@ public ConnectionClientTransport newClientTransport(
263277
// TODO(carl-mastrangelo): Pass channelLogger in.
264278
return new InProcessTransport(
265279
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
266-
options.getEagAttributes(), includeCauseWithStatus);
280+
options.getEagAttributes(), includeCauseWithStatus, assumedMessageSize);
267281
}
268282

269283
@Override

0 commit comments

Comments
 (0)