Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ private class PendingStream extends DelayedStream {
private volatile Status lastPickStatus;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
super("connecting_and_lb");
this.args = args;
this.tracers = tracers;
}
Expand Down
17 changes: 14 additions & 3 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* necessary.
*/
class DelayedStream implements ClientStream {
private final String bufferContext;
/** {@code true} once realStream is valid and all pending calls have been drained. */
private volatile boolean passThrough;
/**
Expand All @@ -64,6 +65,14 @@ class DelayedStream implements ClientStream {
// No need to synchronize; start() synchronization provides a happens-before
private List<Runnable> preStartPendingCalls = new ArrayList<>();

/**
* Create a delayed stream with debug context {@code bufferContext}. The context is what this
* stream is delayed by (e.g., "connecting", "call_credentials").
*/
public DelayedStream(String bufferContext) {
this.bufferContext = checkNotNull(bufferContext, "bufferContext");
}

@Override
public void setMaxInboundMessageSize(final int maxSize) {
checkState(listener == null, "May only be called before start");
Expand Down Expand Up @@ -104,11 +113,13 @@ public void appendTimeoutInsight(InsightBuilder insight) {
return;
}
if (realStream != null) {
insight.appendKeyValue("buffered_nanos", streamSetTimeNanos - startTimeNanos);
insight.appendKeyValue(
bufferContext + "_delay", "" + (streamSetTimeNanos - startTimeNanos) + "ns");
realStream.appendTimeoutInsight(insight);
} else {
insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos);
insight.append("waiting_for_connection");
insight.appendKeyValue(
bufferContext + "_delay", "" + (System.nanoTime() - startTimeNanos) + "ns");
insight.append("was_still_waiting");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ClientStream returnStream() {
synchronized (lock) {
if (returnedStream == null) {
// apply() has not been called, needs to buffer the requests.
delayedStream = new DelayedStream();
delayedStream = new DelayedStream("call_credentials");
return returnedStream = delayedStream;
} else {
return returnedStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public void pendingStream_appendTimeoutInsight_waitForReady() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[wait_for_ready, buffered_nanos=[0-9]+\\, waiting_for_connection]");
.matches("\\[wait_for_ready, connecting_and_lb_delay=[0-9]+ns\\, was_still_waiting]");
}

@Test
Expand All @@ -759,7 +759,7 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure(
assertThat(insight.toString())
.matches("\\[wait_for_ready, "
+ "Last Pick Failure=Status\\{code=PERMISSION_DENIED, description=null, cause=null\\},"
+ " buffered_nanos=[0-9]+, waiting_for_connection]");
+ " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]");
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/io/grpc/internal/DelayedStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class DelayedStreamTest {
@Mock private ClientStreamListener listener;
@Mock private ClientStream realStream;
@Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
private DelayedStream stream = new DelayedStream();
private DelayedStream stream = new DelayedStream("test_op");

@Test
public void setStream_setAuthority() {
Expand Down Expand Up @@ -450,7 +450,7 @@ public void appendTimeoutInsight_realStreamNotSet() {
InsightBuilder insight = new InsightBuilder();
stream.start(listener);
stream.appendTimeoutInsight(insight);
assertThat(insight.toString()).matches("\\[buffered_nanos=[0-9]+\\, waiting_for_connection]");
assertThat(insight.toString()).matches("\\[test_op_delay=[0-9]+ns\\, was_still_waiting]");
}

@Test
Expand All @@ -469,7 +469,7 @@ public Void answer(InvocationOnMock in) {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
assertThat(insight.toString())
.matches("\\[buffered_nanos=[0-9]+, remote_addr=127\\.0\\.0\\.1:443\\]");
.matches("\\[test_op_delay=[0-9]+ns, remote_addr=127\\.0\\.0\\.1:443\\]");
}

private void callMeMaybe(Runnable r) {
Expand Down